Sometimes the most useful data engineering projects are not glamorous.
They are small, deliberate systems that wake up, do one job properly, and go back to sleep.
This is one of those.
In this guide, I’ll walk through how I built the first stage of the Global Shock pipeline: a simple AWS-based process that fetches Brent and WTI crude oil spot prices every morning, stores raw snapshots in S3, transforms them into a merged time series, and publishes a clean JSON file for the website to use.
The target outcome is straightforward:
- one CloudFormation stack
- two S3 buckets
- three Lambda functions
- one EventBridge schedule
- one processed
latest.jsonfile your frontend can read
And the whole thing runs for roughly pocket money each month.
What this pipeline does
Every morning at 08:00 UTC, AWS fetches Brent and WTI crude prices from the US Energy Information Administration (EIA), stores the raw data in S3, merges the series into a historical dataset going back to 1986, and republishes the processed output for the Data & Grit website graph.
That gives you three useful properties immediately:
- a repeatable daily ingest
- raw-data traceability
- a clean published data product for charts and later analysis
This is stage one, not the whole Global Shock story. The job here is to build a reliable backbone.
Why use the EIA API instead of yfinance
The original prototype used yfinance, but it failed once the code moved into Lambda.
The problem was not just dependency packaging. The bigger production issue was that Yahoo Finance was returning empty responses from AWS-hosted Lambda environments, which made it unreliable in the exact runtime where the pipeline needed to live.
That is why I switched the pipeline to the EIA API.
It solved the actual production problem:
- Lambda could call it reliably
- the data source was official
- the data was spot-price based rather than a rough proxy from elsewhere
- the ingestor no longer needed a heavy Python layer, because
urllibandboto3were enough for the current setup
The two daily series are:
PET.RBRTE.Dfor Brent crudePET.RWTC.Dfor WTI crude
What you are going to build
The stack is intentionally small.
CloudFormation creates:
- a raw S3 bucket
- a processed S3 bucket
- an ingestor Lambda
- a transformer Lambda
- an optional API Lambda
- an EventBridge rule that fires daily at
cron(0 8 * * ? *)
The runtime flow is:
- EventBridge triggers the ingestor at 08:00 UTC
- the ingestor fetches Brent and WTI from EIA
- it writes raw JSON files into the raw bucket
- S3 triggers the transformer
- the transformer merges the series and computes the Brent/WTI spread
- it writes a dated output file and overwrites
processed/latest.json - your website reads the latest processed file
That’s the pipeline end to end.
Step 1: Create the repository structure first
Before I touched AWS, I laid out the repo cleanly first.
Use this structure:
GlobalShock/
├── lambdas/
│ ├── ingestor/
│ │ └── handler.py
│ ├── transformer/
│ │ └── handler.py
│ └── api/
│ └── handler.py
├── infra/
│ └── cloudformation.yaml
├── deploy.sh
├── README.md
└── .gitignore
It also keeps the infrastructure and Lambda code separated, which makes the stack much easier to work with.
Step 2: Build the CloudFormation stack first
I wanted the buckets, IAM, Lambdas, and schedule in place first, so the rest of the pipeline had a proper frame.
Your CloudFormation stack in infra/cloudformation.yaml should be responsible for:
- creating the two S3 buckets
- setting the raw bucket lifecycle to expire raw snapshots after 90 days
- creating one IAM role shared by the Lambdas
- creating the three Lambda functions
- wiring the EventBridge rule to the ingestor
- granting Lambda permission where needed
What the buckets are for
The raw bucket stores one snapshot per ticker per day.
The processed bucket stores the merged output your site reads from.
I parameterised the bucket names so the stack stays reusable:
${ProjectName}-raw-${AccountId}${ProjectName}-processed-${AccountId}
What the role needs
The Lambdas share one IAM role with S3 read/write access. For this stage, the role mainly needs:
- CloudWatch Logs write permissions
- read/write permissions on both pipeline buckets
What the schedule should be
Your EventBridge rule is:
ScheduleExpression: "cron(0 8 * * ? *)"
That means the ingestor runs at 08:00 UTC daily, which is late enough that the previous day’s close is available.
Important: avoid the S3/Lambda circular dependency
One real issue I hit was a circular dependency between the bucket and the Lambda permission. The bucket wanted to reference the Lambda for notifications, while the Lambda permission wanted to reference the bucket. I fixed that by moving the permission out separately into AWS::Lambda::Permission.
CloudFormation template
AWSTemplateFormatVersion: "2010-09-09"
Description: >
UK Heating Oil / Global Shock pipeline.
Ingests BZ=F and CL=F from yfinance daily,
transforms to a merged time series, stores in S3.
Parameters:
ProjectName:
Type: String
Default: oil-pipeline
Environment:
Type: String
Default: prod
AllowedValues: [prod, dev]
AllowedCorsOrigin:
Type: String
Default: https://yourdomain.com
Description: Frontend origin allowed to read processed files via browser
Resources:
RawBucket:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
UpdateReplacePolicy: Retain
Properties:
BucketName: !Sub "${ProjectName}-raw-${AWS::AccountId}"
VersioningConfiguration:
Status: Enabled
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
LifecycleConfiguration:
Rules:
- Id: expire-raw-after-90-days
Status: Enabled
ExpirationInDays: 90
NotificationConfiguration:
LambdaConfigurations:
- Event: "s3:ObjectCreated:*"
Filter:
S3Key:
Rules:
- Name: prefix
Value: raw/
- Name: suffix
Value: snapshot.json
Function: !GetAtt TransformerFunction.Arn
ProcessedBucket:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
UpdateReplacePolicy: Retain
Properties:
BucketName: !Sub "${ProjectName}-processed-${AWS::AccountId}"
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
CorsConfiguration:
CorsRules:
- AllowedHeaders: ["*"]
AllowedMethods: [GET]
AllowedOrigins:
- !Ref AllowedCorsOrigin
MaxAge: 3600
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: s3-access
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
- s3:ListBucket
Resource:
- !GetAtt RawBucket.Arn
- !Sub "${RawBucket.Arn}/*"
- !GetAtt ProcessedBucket.Arn
- !Sub "${ProcessedBucket.Arn}/*"
IngestorFunction:
Type: AWS::Lambda::Function
Properties:
Runtime: python3.12
Handler: handler.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 120
MemorySize: 256
Environment:
Variables:
S3_BUCKET_RAW: !Ref RawBucket
Code:
ZipFile: |
def lambda_handler(event, context):
return {"statusCode": 200, "body": "placeholder - deploy real code"}
TransformerFunction:
Type: AWS::Lambda::Function
Properties:
Runtime: python3.12
Handler: handler.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 60
MemorySize: 512
Environment:
Variables:
S3_BUCKET_RAW: !Ref RawBucket
S3_BUCKET_PROCESSED: !Ref ProcessedBucket
Code:
ZipFile: |
def lambda_handler(event, context):
return {"statusCode": 200, "body": "placeholder - deploy real code"}
ApiFunction:
Type: AWS::Lambda::Function
Properties:
Runtime: python3.12
Handler: handler.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 30
MemorySize: 256
Environment:
Variables:
S3_BUCKET_PROCESSED: !Ref ProcessedBucket
Code:
ZipFile: |
def lambda_handler(event, context):
return {"statusCode": 200, "body": "placeholder - deploy real code"}
TransformerS3Permission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt TransformerFunction.Arn
Action: lambda:InvokeFunction
Principal: s3.amazonaws.com
SourceArn: !GetAtt RawBucket.Arn
DailyIngestorRule:
Type: AWS::Events::Rule
Properties:
Name: !Sub "${ProjectName}-${Environment}-daily-ingest"
ScheduleExpression: "cron(0 8 * * ? *)"
State: ENABLED
Targets:
- Id: IngestorTarget
Arn: !GetAtt IngestorFunction.Arn
IngestorEventPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt IngestorFunction.Arn
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt DailyIngestorRule.Arn
Outputs:
RawBucketName:
Value: !Ref RawBucket
Export:
Name: !Sub "${ProjectName}-${Environment}-raw-bucket"
ProcessedBucketName:
Value: !Ref ProcessedBucket
Export:
Name: !Sub "${ProjectName}-${Environment}-processed-bucket"
IngestorFunctionArn:
Value: !GetAtt IngestorFunction.Arn
TransformerFunctionArn:
Value: !GetAtt TransformerFunction.Arn
ApiFunctionArn:
Value: !GetAtt ApiFunction.Arn
Step 3: Build the ingestor Lambda
Once the stack exists, the first Lambda to implement is the ingestor.
This function is triggered by EventBridge every morning and fetches the two EIA series:
brent_crude→PET.RBRTE.Dwti_crude→PET.RWTC.D
It then writes raw JSON files into paths like:
raw/brent_crude/YYYY/MM/DD/snapshot.json
raw/wti_crude/YYYY/MM/DD/snapshot.json
That gives me a simple daily partition layout that is easy to inspect and easy to backfill.
I also added a backfill flag so the same Lambda can handle both the daily run and the historical load. I used that once to backfill the series to 2 January 1986.
The ingestor is straightforward. The EIA API key lives in Lambda environment variables, the series map is hardcoded in the function, and the backfill flag decides whether the run is fetching the latest records or the full history. Each run writes a raw snapshot into a dated S3 path, which gives me traceability as well as a clean handoff into the transformer.
Ingestor code
"""
Lambda: oil_price_ingestor
Triggered by: EventBridge (daily cron)
Purpose: Fetch Brent and WTI spot prices from EIA API, write raw snapshot to S3
EIA series:
- PET.RBRTE.D = Brent crude spot price (USD/barrel, daily)
- PET.RWTC.D = WTI crude spot price (USD/barrel, daily)
"""
import json
import os
import boto3
import urllib.request
import urllib.parse
from datetime import datetime, timezone
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
S3_BUCKET_RAW = os.environ["S3_BUCKET_RAW"]
EIA_API_KEY = os.environ["EIA_API_KEY"]
SERIES = {
"brent_crude": "PET.RBRTE.D",
"wti_crude": "PET.RWTC.D",
}
s3 = boto3.client("s3")
def fetch_eia_series(series_id, is_backfill):
"""Fetch a single EIA series. Returns list of {date, value} dicts."""
base = "https://api.eia.gov/v2/seriesid/{series_id}"
params = {
"api_key": EIA_API_KEY,
"data[]": "value",
"sort[0][column]": "period",
"sort[0][direction]": "asc",
"length": 5000 if is_backfill else 10,
"offset": 0,
}
url = f"https://api.eia.gov/v2/seriesid/{series_id}?" + urllib.parse.urlencode(params)
logger.info(f"Fetching EIA series {series_id}")
all_records = []
offset = 0
while True:
paged_params = dict(params)
paged_params["offset"] = offset
paged_url = f"https://api.eia.gov/v2/seriesid/{series_id}?" + urllib.parse.urlencode(paged_params)
req = urllib.request.Request(paged_url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode())
rows = data.get("response", {}).get("data", [])
if not rows:
break
for row in rows:
val = row.get("value")
if val is not None:
try:
all_records.append({
"date": row["period"],
"value": round(float(val), 4),
})
except (ValueError, KeyError):
pass
# If backfill and there might be more pages
total = data.get("response", {}).get("total", 0)
offset += len(rows)
if not is_backfill or offset >= total or len(rows) == 0:
break
logger.info(f"EIA {series_id}: fetched {len(all_records)} records")
return all_records
def lambda_handler(event, context):
run_ts = datetime.now(timezone.utc)
is_backfill = event.get("backfill", False)
results = []
for name, series_id in SERIES.items():
try:
raw_records = fetch_eia_series(series_id, is_backfill)
if not raw_records:
logger.warning(f"{name}: empty response")
results.append({"series": name, "error": "empty response"})
continue
# Build records in our standard format
records = []
for r in raw_records:
records.append({
"date": r["date"],
"name": name,
"series_id": series_id,
"close": r["value"], # EIA gives daily spot close
"open": None,
"high": None,
"low": None,
"volume": None,
"currency": "USD",
"fetched_at": run_ts.isoformat(),
})
date_prefix = run_ts.strftime("%Y/%m/%d")
s3_key = f"raw/{name}/{date_prefix}/snapshot.json"
payload = {
"series_id": series_id,
"name": name,
"fetched_at": run_ts.isoformat(),
"record_count": len(records),
"records": records,
}
s3.put_object(
Bucket=S3_BUCKET_RAW,
Key=s3_key,
Body=json.dumps(payload, indent=2),
ContentType="application/json",
)
logger.info(f"Written {len(records)} records to s3://{S3_BUCKET_RAW}/{s3_key}")
results.append({"series": name, "records": len(records), "s3_key": s3_key})
except Exception as e:
logger.error(f"{name} failed: {e}", exc_info=True)
results.append({"series": name, "error": str(e)})
return {
"statusCode": 200,
"body": {"run_at": run_ts.isoformat(), "results": results},
}
Backfill example
Here’s the command I used for the one-off backfill:
aws lambda invoke \
--function-name oil-pipeline-ingestor \
--payload '{"backfill": true}' \
--cli-binary-format raw-in-base64-out \
--region eu-west-2 response.json
That one-off backfill produced:
- 9,852 Brent records
- 10,123 WTI records
Step 4: Build the transformer Lambda
This is the second core Lambda, and it is where the raw feed becomes a data product.
Your transformer is triggered by the raw files landing in S3. It reads the Brent and WTI snapshots for the day, joins them by date, computes brent_wti_spread, and writes two outputs:
- a dated processed snapshot
processed/latest.json, which always gets overwritten
For the frontend, that pattern keeps things simple.
It means the site only needs one stable URL and does not need to understand your partitioning logic.
What the merged schema looks like
Each row in the series contains:
datebrent_closewti_closebrent_wti_spread
The top level also includes metadata such as:
schema_versionprocessed_atcurrency- summary stats
It’s a simple shape to inspect in S3 and easy to plot on the site.
Useful detail to keep
Brent starts later than WTI, so the early rows legitimately contain null Brent values. I left that as-is rather than faking completeness, because the gap is part of the real history.
Transformer code
"""
Lambda: oil_price_transformer
Triggered by: S3 event notification on raw bucket (or EventBridge after ingestor)
Purpose: Read latest raw snapshots, merge BZ+CL on date, write processed output
"""
import json
import os
import boto3
import logging
from datetime import datetime, timezone, timedelta
from collections import defaultdict
logger = logging.getLogger()
logger.setLevel(logging.INFO)
S3_BUCKET_RAW = os.environ["S3_BUCKET_RAW"]
S3_BUCKET_PROCESSED = os.environ["S3_BUCKET_PROCESSED"]
TICKER_NAMES = ["brent_crude", "wti_crude"]
s3 = boto3.client("s3")
def lambda_handler(event, context):
run_ts = datetime.now(timezone.utc)
# Determine which date partition to read from
# If triggered by S3 event, parse the key; otherwise use today
date_prefix = _resolve_date_prefix(event, run_ts)
logger.info(f"Processing date prefix: {date_prefix}")
# Read raw files for both tickers
all_records = []
for ticker_name in TICKER_NAMES:
s3_key = f"raw/{ticker_name}/{date_prefix}/snapshot.json"
records = _read_raw(s3_key)
if records:
all_records.extend(records)
logger.info(f"Loaded {len(records)} records from {s3_key}")
else:
logger.warning(f"No records found at {s3_key}")
if not all_records:
logger.error("No records to process, aborting")
return {"statusCode": 200, "body": "no data"}
# Merge records by date
by_date = defaultdict(dict)
for r in all_records:
date = r["date"]
name = r["name"]
by_date[date][name] = {
"open": r["open"],
"high": r["high"],
"low": r["low"],
"close": r["close"],
"volume": r["volume"],
}
# Build merged time series sorted by date
series = []
for date in sorted(by_date.keys()):
entry = {"date": date}
brent = by_date[date].get("brent_crude", {})
wti = by_date[date].get("wti_crude", {})
entry["brent_close"] = brent.get("close")
entry["brent_open"] = brent.get("open")
entry["brent_high"] = brent.get("high")
entry["brent_low"] = brent.get("low")
entry["wti_close"] = wti.get("close")
entry["wti_open"] = wti.get("open")
entry["wti_high"] = wti.get("high")
entry["wti_low"] = wti.get("low")
# Spread (useful for your data science analysis)
if brent.get("close") is not None and wti.get("close") is not None:
entry["brent_wti_spread"] = round(brent["close"] - wti["close"], 4)
else:
entry["brent_wti_spread"] = None
series.append(entry)
# Compute simple summary stats
brent_closes = [r["brent_close"] for r in series if r["brent_close"] is not None]
wti_closes = [r["wti_close"] for r in series if r["wti_close"] is not None]
summary = {
"brent": _stats(brent_closes),
"wti": _stats(wti_closes),
"record_count": len(series),
"date_range": {
"start": series[0]["date"] if series else None,
"end": series[-1]["date"] if series else None,
},
}
# Build final output document
output = {
"schema_version": "1.0",
"processed_at": run_ts.isoformat(),
"currency": "USD",
"summary": summary,
"series": series,
}
# Write to processed bucket
# latest.json: always overwritten — easy for website to fetch
# also write a dated snapshot for reproducibility
dated_key = f"processed/{date_prefix}/oil_prices.json"
latest_key = "processed/latest.json"
for key in [dated_key, latest_key]:
s3.put_object(
Bucket=S3_BUCKET_PROCESSED,
Key=key,
Body=json.dumps(output, indent=2),
ContentType="application/json",
)
logger.info(f"Written s3://{S3_BUCKET_PROCESSED}/{key}")
return {
"statusCode": 200,
"body": {
"processed_at": run_ts.isoformat(),
"records": len(series),
"date_range": summary["date_range"],
},
}
def _read_raw(s3_key):
try:
response = s3.get_object(Bucket=S3_BUCKET_RAW, Key=s3_key)
payload = json.loads(response["Body"].read())
return payload.get("records", [])
except s3.exceptions.NoSuchKey:
return []
except Exception as e:
logger.error(f"Failed to read {s3_key}: {e}")
return []
def _resolve_date_prefix(event, run_ts):
# If triggered by S3 event, extract prefix from the key
try:
key = event["Records"][0]["s3"]["object"]["key"]
# key format: raw/brent_crude/2024/01/15/snapshot.json
parts = key.split("/")
return "/".join(parts[2:5]) # e.g. 2024/01/15
except (KeyError, IndexError):
pass
# Default: today's UTC date
return run_ts.strftime("%Y/%m/%d")
def _stats(values):
if not values:
return {}
return {
"min": round(min(values), 2),
"max": round(max(values), 2),
"latest": round(values[-1], 2),
"count": len(values),
}
Processed output example
{
"schema_version": "1.0",
"processed_at": "YYYY-MM-DDTHH:MM:SSZ",
"currency": "USD",
"summary": {
"brent": {
"min": 0.0,
"max": 0.0,
"latest": 0.0,
"count": 0
},
"wti": {
"min": 0.0,
"max": 0.0,
"latest": 0.0,
"count": 0
}
},
"series": [
{
"date": "YYYY-MM-DD",
"brent_close": 0.0,
"wti_close": 0.0,
"brent_wti_spread": 0.0
}
]
}
Step 5: Add the optional API Lambda
The API Lambda is optional.
lambdas/api/handler.py is a lightweight REST endpoint that sits behind API Gateway if you do not want the frontend to fetch the entire processed JSON file every time. It supports date filtering and field projection, for example:
GET /prices?from=2020-01-01&fields=date,brent_close,wti_close
It’s a useful optimisation, but I didn’t need it for stage one.
In practice, the choice is simple:
- if you want the simplest setup, fetch directly from S3
- if you want smaller frontend payloads or filtering, add API Gateway + Lambda
That keeps the design flexible without making the whole thing feel heavier than it needs to be.
Placeholder: API Lambda code
# PLACEHOLDER:
# Insert GitHub-linked snippet showing:
# - loading latest.json
# - date filtering
# - field projection
# - JSON response helper
Step 6: Connect the website
There are two patterns.
Option A: Direct S3 fetch
This is the simplest approach. Make the processed output readable to the frontend and fetch processed/latest.json directly.
Option B: API Gateway + Lambda
Use the optional API Lambda when you want filtering support or smaller payloads.
I’d start with direct S3 fetch and only add API Gateway if the frontend actually needs filtering.
Step 7: Explain the daily flow clearly
The live flow is:
- 08:00 UTC: EventBridge triggers
oil-pipeline-ingestor - around 08:01: ingestor calls EIA and writes two raw files
- around 08:02: S3 event triggers
oil-pipeline-transformer - around 08:03: transformer merges data and overwrites
processed/latest.json - the site sees the updated data
One nice practical note: on weekends the job simply republishes Friday’s close, which is harmless and avoids unnecessary scheduling complexity.
For this pipeline, keeping it simple is the right trade-off.
What the backfill proved
After the backfill, the merged dataset covered 1986 through March 2026, with:
- 9,852 Brent records
- 10,123 WTI records
- 10,291 merged records
It also captured some genuinely important market history, including the negative WTI print in April 2020.
I kept that in because “weird data” is sometimes just real history.
What it costs
One of the best parts of this build is the cost.
At this scale:
- Lambda is effectively free
- S3 is in the pennies range
- EventBridge is effectively free
- CloudWatch Logs cost pennies
Total: around £0–£2 per month, with a £5 billing alert as a sensible safety net.
For a personal project, that’s a great trade-off: real infrastructure, real automation, almost no running cost.
Final thought
This post is not about showing off architecture.
It’s about building a small, production-shaped data pipeline properly.
That means:
- define the repo structure
- create the CloudFormation stack first
- build the ingestor Lambda
- build the transformer Lambda
- optionally add the API Lambda
- backfill once
- point the website at
latest.json
That’s the point of the project.
It is not trying to look impressive.
It is trying to work every day.
Gareth Winterman