Data Engineering — Tue Mar 03

← Home | ← data-engineering

Global Shock Part 4: Build a Daily Oil Price Pipeline on AWS with CloudFormation and Lambda

Tue Mar 03
#aws #lambda #cloudformation #s3 #eventbridge #python #data engineering #energy #oil

Sometimes the most useful data engineering projects are not glamorous.

They are small, deliberate systems that wake up, do one job properly, and go back to sleep.

This is one of those.

In this guide, I’ll walk through how I built the first stage of the Global Shock pipeline: a simple AWS-based process that fetches Brent and WTI crude oil spot prices every morning, stores raw snapshots in S3, transforms them into a merged time series, and publishes a clean JSON file for the website to use.

The target outcome is straightforward:

  • one CloudFormation stack
  • two S3 buckets
  • three Lambda functions
  • one EventBridge schedule
  • one processed latest.json file your frontend can read

And the whole thing runs for roughly pocket money each month.


What this pipeline does

Every morning at 08:00 UTC, AWS fetches Brent and WTI crude prices from the US Energy Information Administration (EIA), stores the raw data in S3, merges the series into a historical dataset going back to 1986, and republishes the processed output for the Data & Grit website graph.

That gives you three useful properties immediately:

  • a repeatable daily ingest
  • raw-data traceability
  • a clean published data product for charts and later analysis

This is stage one, not the whole Global Shock story. The job here is to build a reliable backbone.


Why use the EIA API instead of yfinance

The original prototype used yfinance, but it failed once the code moved into Lambda.

The problem was not just dependency packaging. The bigger production issue was that Yahoo Finance was returning empty responses from AWS-hosted Lambda environments, which made it unreliable in the exact runtime where the pipeline needed to live.

That is why I switched the pipeline to the EIA API.

It solved the actual production problem:

  • Lambda could call it reliably
  • the data source was official
  • the data was spot-price based rather than a rough proxy from elsewhere
  • the ingestor no longer needed a heavy Python layer, because urllib and boto3 were enough for the current setup

The two daily series are:

  • PET.RBRTE.D for Brent crude
  • PET.RWTC.D for WTI crude

What you are going to build

The stack is intentionally small.

CloudFormation creates:

  • a raw S3 bucket
  • a processed S3 bucket
  • an ingestor Lambda
  • a transformer Lambda
  • an optional API Lambda
  • an EventBridge rule that fires daily at cron(0 8 * * ? *)

The runtime flow is:

  1. EventBridge triggers the ingestor at 08:00 UTC
  2. the ingestor fetches Brent and WTI from EIA
  3. it writes raw JSON files into the raw bucket
  4. S3 triggers the transformer
  5. the transformer merges the series and computes the Brent/WTI spread
  6. it writes a dated output file and overwrites processed/latest.json
  7. your website reads the latest processed file

That’s the pipeline end to end.


Step 1: Create the repository structure first

Before I touched AWS, I laid out the repo cleanly first.

Use this structure:

GlobalShock/
├── lambdas/
│   ├── ingestor/
│   │   └── handler.py
│   ├── transformer/
│   │   └── handler.py
│   └── api/
│       └── handler.py
├── infra/
│   └── cloudformation.yaml
├── deploy.sh
├── README.md
└── .gitignore

It also keeps the infrastructure and Lambda code separated, which makes the stack much easier to work with.

Step 2: Build the CloudFormation stack first

I wanted the buckets, IAM, Lambdas, and schedule in place first, so the rest of the pipeline had a proper frame.

Your CloudFormation stack in infra/cloudformation.yaml should be responsible for:

  • creating the two S3 buckets
  • setting the raw bucket lifecycle to expire raw snapshots after 90 days
  • creating one IAM role shared by the Lambdas
  • creating the three Lambda functions
  • wiring the EventBridge rule to the ingestor
  • granting Lambda permission where needed

What the buckets are for

The raw bucket stores one snapshot per ticker per day.

The processed bucket stores the merged output your site reads from.

I parameterised the bucket names so the stack stays reusable:

  • ${ProjectName}-raw-${AccountId}
  • ${ProjectName}-processed-${AccountId}

What the role needs

The Lambdas share one IAM role with S3 read/write access. For this stage, the role mainly needs:

  • CloudWatch Logs write permissions
  • read/write permissions on both pipeline buckets

What the schedule should be

Your EventBridge rule is:

ScheduleExpression: "cron(0 8 * * ? *)"

That means the ingestor runs at 08:00 UTC daily, which is late enough that the previous day’s close is available.

Important: avoid the S3/Lambda circular dependency

One real issue I hit was a circular dependency between the bucket and the Lambda permission. The bucket wanted to reference the Lambda for notifications, while the Lambda permission wanted to reference the bucket. I fixed that by moving the permission out separately into AWS::Lambda::Permission.

CloudFormation template

AWSTemplateFormatVersion: "2010-09-09"
Description: >
  UK Heating Oil / Global Shock pipeline.
  Ingests BZ=F and CL=F from yfinance daily,
  transforms to a merged time series, stores in S3.

Parameters:
  ProjectName:
    Type: String
    Default: oil-pipeline

  Environment:
    Type: String
    Default: prod
    AllowedValues: [prod, dev]

  AllowedCorsOrigin:
    Type: String
    Default: https://yourdomain.com
    Description: Frontend origin allowed to read processed files via browser

Resources:

  RawBucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    UpdateReplacePolicy: Retain
    Properties:
      BucketName: !Sub "${ProjectName}-raw-${AWS::AccountId}"
      VersioningConfiguration:
        Status: Enabled
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      LifecycleConfiguration:
        Rules:
          - Id: expire-raw-after-90-days
            Status: Enabled
            ExpirationInDays: 90
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: "s3:ObjectCreated:*"
            Filter:
              S3Key:
                Rules:
                  - Name: prefix
                    Value: raw/
                  - Name: suffix
                    Value: snapshot.json
            Function: !GetAtt TransformerFunction.Arn

  ProcessedBucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    UpdateReplacePolicy: Retain
    Properties:
      BucketName: !Sub "${ProjectName}-processed-${AWS::AccountId}"
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      CorsConfiguration:
        CorsRules:
          - AllowedHeaders: ["*"]
            AllowedMethods: [GET]
            AllowedOrigins:
              - !Ref AllowedCorsOrigin
            MaxAge: 3600

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: s3-access
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - s3:PutObject
                  - s3:GetObject
                  - s3:ListBucket
                Resource:
                  - !GetAtt RawBucket.Arn
                  - !Sub "${RawBucket.Arn}/*"
                  - !GetAtt ProcessedBucket.Arn
                  - !Sub "${ProcessedBucket.Arn}/*"

  IngestorFunction:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.12
      Handler: handler.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Timeout: 120
      MemorySize: 256
      Environment:
        Variables:
          S3_BUCKET_RAW: !Ref RawBucket
      Code:
        ZipFile: |
          def lambda_handler(event, context):
              return {"statusCode": 200, "body": "placeholder - deploy real code"}

  TransformerFunction:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.12
      Handler: handler.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Timeout: 60
      MemorySize: 512
      Environment:
        Variables:
          S3_BUCKET_RAW: !Ref RawBucket
          S3_BUCKET_PROCESSED: !Ref ProcessedBucket
      Code:
        ZipFile: |
          def lambda_handler(event, context):
              return {"statusCode": 200, "body": "placeholder - deploy real code"}

  ApiFunction:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.12
      Handler: handler.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Timeout: 30
      MemorySize: 256
      Environment:
        Variables:
          S3_BUCKET_PROCESSED: !Ref ProcessedBucket
      Code:
        ZipFile: |
          def lambda_handler(event, context):
              return {"statusCode": 200, "body": "placeholder - deploy real code"}

  TransformerS3Permission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt TransformerFunction.Arn
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      SourceArn: !GetAtt RawBucket.Arn

  DailyIngestorRule:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub "${ProjectName}-${Environment}-daily-ingest"
      ScheduleExpression: "cron(0 8 * * ? *)"
      State: ENABLED
      Targets:
        - Id: IngestorTarget
          Arn: !GetAtt IngestorFunction.Arn

  IngestorEventPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt IngestorFunction.Arn
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn: !GetAtt DailyIngestorRule.Arn

Outputs:
  RawBucketName:
    Value: !Ref RawBucket
    Export:
      Name: !Sub "${ProjectName}-${Environment}-raw-bucket"

  ProcessedBucketName:
    Value: !Ref ProcessedBucket
    Export:
      Name: !Sub "${ProjectName}-${Environment}-processed-bucket"

  IngestorFunctionArn:
    Value: !GetAtt IngestorFunction.Arn

  TransformerFunctionArn:
    Value: !GetAtt TransformerFunction.Arn

  ApiFunctionArn:
    Value: !GetAtt ApiFunction.Arn

Step 3: Build the ingestor Lambda

Once the stack exists, the first Lambda to implement is the ingestor.

This function is triggered by EventBridge every morning and fetches the two EIA series:

  • brent_crudePET.RBRTE.D
  • wti_crudePET.RWTC.D

It then writes raw JSON files into paths like:

raw/brent_crude/YYYY/MM/DD/snapshot.json
raw/wti_crude/YYYY/MM/DD/snapshot.json

That gives me a simple daily partition layout that is easy to inspect and easy to backfill.

I also added a backfill flag so the same Lambda can handle both the daily run and the historical load. I used that once to backfill the series to 2 January 1986.

The ingestor is straightforward. The EIA API key lives in Lambda environment variables, the series map is hardcoded in the function, and the backfill flag decides whether the run is fetching the latest records or the full history. Each run writes a raw snapshot into a dated S3 path, which gives me traceability as well as a clean handoff into the transformer.

Ingestor code

"""
Lambda: oil_price_ingestor
Triggered by: EventBridge (daily cron) or manual invoke with {"backfill": true}
Purpose: Fetch Brent and WTI spot prices from EIA API, write raw snapshot to S3.

Backfill mode:
  Invoke manually: aws lambda invoke --function-name oil_price_ingestor \
      --payload '{"backfill": true}' response.json
  This fetches the full available EIA history (~8,000+ daily records per series)
  and writes to the same raw/ prefix structure as the daily run, so the transformer
  can rebuild processed/latest.json from the full dataset.

EIA series:
  - PET.RBRTE.D  = Brent crude spot price (USD/barrel, daily)
  - PET.RWTC.D   = WTI crude spot price (USD/barrel, daily)
"""

import json
import os
import boto3
import urllib.request
import urllib.parse
from datetime import datetime, timezone
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

S3_BUCKET_RAW = os.environ["S3_BUCKET_RAW"]
EIA_API_KEY = os.environ["EIA_API_KEY"]

SERIES = {
    "brent_crude": "PET.RBRTE.D",
    "wti_crude":   "PET.RWTC.D",
}

# EIA max page size. Their API hard-caps at 5000 per request.
EIA_PAGE_SIZE = 5000
# Daily run: fetch the last 30 days so we never miss a day on weekends/holidays.
DAILY_FETCH_DAYS = 30

s3 = boto3.client("s3")


def fetch_eia_series(series_id: str, is_backfill: bool) -> list[dict]:
    """
    Fetch a single EIA v2 series. Returns list of {date, value} dicts sorted
    ascending by date.

    In backfill mode we paginate through the entire available history.
    In daily mode we fetch only the most recent DAILY_FETCH_DAYS records —
    enough to cover weekends and public holidays without over-fetching.
    """
    base_params = {
        "api_key": EIA_API_KEY,
        "data[]": "value",
        "sort[0][column]": "period",
        "sort[0][direction]": "asc",
        "length": EIA_PAGE_SIZE,
    }

    all_records: list[dict] = []
    offset = 0

    while True:
        params = dict(base_params)
        params["offset"] = offset

        # Daily mode: only fetch recent records by fixing the offset at 0
        # and using a small length. We re-sort descending so "latest N" is natural.
        if not is_backfill:
            params["sort[0][direction]"] = "desc"
            params["length"] = DAILY_FETCH_DAYS

        url = (
            f"https://api.eia.gov/v2/seriesid/{series_id}?"
            + urllib.parse.urlencode(params, doseq=True)
        )
        logger.info(f"EIA request: series={series_id} offset={offset}")

        req = urllib.request.Request(url, headers={"Accept": "application/json"})
        with urllib.request.urlopen(req, timeout=30) as resp:
            data = json.loads(resp.read().decode())

        response_obj = data.get("response", {})
        rows = response_obj.get("data", [])
        total = int(response_obj.get("total", 0))

        if not rows:
            logger.info(f"EIA {series_id}: no more rows at offset={offset}")
            break

        for row in rows:
            val = row.get("value")
            if val is None:
                continue
            try:
                all_records.append({
                    "date": row["period"],
                    "value": round(float(val), 4),
                })
            except (ValueError, KeyError):
                pass

        offset += len(rows)

        # Daily mode: single page only
        if not is_backfill:
            break

        # Backfill mode: keep paging until we've consumed everything
        if offset >= total or len(rows) == 0:
            logger.info(f"EIA {series_id}: finished pagination at offset={offset}/{total}")
            break

    # Ensure ascending date order regardless of fetch direction
    all_records.sort(key=lambda r: r["date"])

    logger.info(f"EIA {series_id}: total records fetched = {len(all_records)}")
    return all_records


def lambda_handler(event, context):
    run_ts = datetime.now(timezone.utc)
    is_backfill = bool(event.get("backfill", False))

    logger.info(f"Run mode: {'BACKFILL' if is_backfill else 'daily'}")

    results = []

    for name, series_id in SERIES.items():
        try:
            raw_records = fetch_eia_series(series_id, is_backfill)

            if not raw_records:
                logger.warning(f"{name}: empty response from EIA")
                results.append({"series": name, "error": "empty response"})
                continue

            records = [
                {
                    "date": r["date"],
                    "name": name,
                    "series_id": series_id,
                    "close": r["value"],
                    "open": None,
                    "high": None,
                    "low": None,
                    "volume": None,
                    "currency": "USD",
                    "fetched_at": run_ts.isoformat(),
                }
                for r in raw_records
            ]

            date_prefix = run_ts.strftime("%Y/%m/%d")

            # Backfill writes to a dedicated prefix so the transformer can
            # distinguish a full-history snapshot from a daily incremental.
            if is_backfill:
                s3_key = f"raw/{name}/{date_prefix}/snapshot_backfill.json"
            else:
                s3_key = f"raw/{name}/{date_prefix}/snapshot.json"

            payload = {
                "series_id": series_id,
                "name": name,
                "fetched_at": run_ts.isoformat(),
                "is_backfill": is_backfill,
                "record_count": len(records),
                "records": records,
            }

            s3.put_object(
                Bucket=S3_BUCKET_RAW,
                Key=s3_key,
                Body=json.dumps(payload, indent=2),
                ContentType="application/json",
            )

            logger.info(f"Written {len(records)} records → s3://{S3_BUCKET_RAW}/{s3_key}")
            results.append({"series": name, "records": len(records), "s3_key": s3_key})

        except Exception as e:
            logger.error(f"{name} failed: {e}", exc_info=True)
            results.append({"series": name, "error": str(e)})

    return {
        "statusCode": 200,
        "body": {
            "run_at": run_ts.isoformat(),
            "mode": "backfill" if is_backfill else "daily",
            "results": results,
        },
    }

Backfill example

Here’s the command I used for the one-off backfill:

aws lambda invoke \
  --function-name oil-pipeline-ingestor \
  --payload '{"backfill": true}' \
  --cli-binary-format raw-in-base64-out \
  --region eu-west-2 response.json

That one-off backfill produced:

  • 9,852 Brent records
  • 10,123 WTI records

Step 4: Build the transformer Lambda

This is the second core Lambda, and it is where the raw feed becomes a data product.

Your transformer is triggered by the raw files landing in S3. It reads the Brent and WTI snapshots for the day, joins them by date, computes brent_wti_spread, and writes two outputs:

  • a dated processed snapshot
  • processed/latest.json, which always gets overwritten

For the frontend, that pattern keeps things simple.

It means the site only needs one stable URL and does not need to understand your partitioning logic.

What the merged schema looks like

Each row in the series contains:

  • date
  • brent_close
  • wti_close
  • brent_wti_spread

The top level also includes metadata such as:

  • schema_version
  • processed_at
  • currency
  • summary stats

It’s a simple shape to inspect in S3 and easy to plot on the site.

Useful detail to keep

Brent starts later than WTI, so the early rows legitimately contain null Brent values. I left that as-is rather than faking completeness, because the gap is part of the real history.

Transformer code

"""
Lambda: oil_price_transformer
Triggered by: EventBridge (after daily ingestor) or manual invoke after a backfill.
Purpose: Read ALL raw snapshots from S3, merge Brent + WTI on date, write
         processed output.

Key change from v1: instead of reading a single date-partition prefix, this
transformer scans the entire raw/ prefix for each ticker and merges every
record across the full history. This means:
  - A backfill → transformer run rebuilds processed/latest.json with complete history.
  - Daily runs incrementally extend the series by re-merging everything.
  - No data is lost if the transformer was triggered late or missed a day.
"""

import json
import os
import boto3
import logging
from datetime import datetime, timezone
from collections import defaultdict

logger = logging.getLogger()
logger.setLevel(logging.INFO)

S3_BUCKET_RAW = os.environ["S3_BUCKET_RAW"]
S3_BUCKET_PROCESSED = os.environ["S3_BUCKET_PROCESSED"]
TICKER_NAMES = ["brent_crude", "wti_crude"]

s3 = boto3.client("s3")


# ---------------------------------------------------------------------------
# S3 helpers
# ---------------------------------------------------------------------------

def list_all_snapshots(ticker_name: str) -> list[str]:
    """
    Return every snapshot key under raw/<ticker_name>/, both daily and backfill.
    Uses paginator to handle >1000 objects cleanly.
    """
    prefix = f"raw/{ticker_name}/"
    paginator = s3.get_paginator("list_objects_v2")
    keys = []
    for page in paginator.paginate(Bucket=S3_BUCKET_RAW, Prefix=prefix):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            if key.endswith(".json"):
                keys.append(key)
    logger.info(f"Found {len(keys)} snapshot file(s) for {ticker_name}")
    return keys


def read_snapshot(s3_key: str) -> list[dict]:
    """Read a single raw snapshot JSON and return its records list."""
    try:
        response = s3.get_object(Bucket=S3_BUCKET_RAW, Key=s3_key)
        payload = json.loads(response["Body"].read())
        return payload.get("records", [])
    except s3.exceptions.NoSuchKey:
        logger.warning(f"Key not found: {s3_key}")
        return []
    except Exception as e:
        logger.error(f"Failed to read {s3_key}: {e}")
        return []


# ---------------------------------------------------------------------------
# Core transform
# ---------------------------------------------------------------------------

def load_all_records(ticker_name: str) -> list[dict]:
    """Load and deduplicate every raw record for a ticker across all snapshots."""
    keys = list_all_snapshots(ticker_name)
    # date → record: later snapshots win (handles corrections / re-fetches)
    by_date: dict[str, dict] = {}
    for key in sorted(keys):  # sorted = chronological; later files overwrite earlier
        for record in read_snapshot(key):
            date = record.get("date")
            if date:
                by_date[date] = record
    logger.info(f"{ticker_name}: {len(by_date)} unique dates after deduplication")
    return list(by_date.values())


def lambda_handler(event, context):
    run_ts = datetime.now(timezone.utc)
    logger.info("Transformer starting full-history merge")

    # Load all records for both tickers
    all_records: list[dict] = []
    for ticker_name in TICKER_NAMES:
        records = load_all_records(ticker_name)
        if records:
            all_records.extend(records)
        else:
            logger.warning(f"No records found for {ticker_name} — continuing without it")

    if not all_records:
        logger.error("No records to process across any ticker, aborting")
        return {"statusCode": 200, "body": "no data"}

    # Merge by date
    by_date: dict[str, dict] = defaultdict(dict)
    for r in all_records:
        by_date[r["date"]][r["name"]] = {
            "open":   r["open"],
            "high":   r["high"],
            "low":    r["low"],
            "close":  r["close"],
            "volume": r["volume"],
        }

    # Build merged time series sorted ascending by date
    series = []
    for date in sorted(by_date.keys()):
        brent = by_date[date].get("brent_crude", {})
        wti   = by_date[date].get("wti_crude", {})

        brent_close = brent.get("close")
        wti_close   = wti.get("close")

        spread = (
            round(brent_close - wti_close, 4)
            if brent_close is not None and wti_close is not None
            else None
        )

        series.append({
            "date":            date,
            "brent_close":     brent_close,
            "brent_open":      brent.get("open"),
            "brent_high":      brent.get("high"),
            "brent_low":       brent.get("low"),
            "wti_close":       wti_close,
            "wti_open":        wti.get("open"),
            "wti_high":        wti.get("high"),
            "wti_low":         wti.get("low"),
            "brent_wti_spread": spread,
        })

    brent_closes = [r["brent_close"] for r in series if r["brent_close"] is not None]
    wti_closes   = [r["wti_close"]   for r in series if r["wti_close"]   is not None]

    summary = {
        "brent": _stats(brent_closes),
        "wti":   _stats(wti_closes),
        "record_count": len(series),
        "date_range": {
            "start": series[0]["date"]  if series else None,
            "end":   series[-1]["date"] if series else None,
        },
    }

    output = {
        "schema_version": "1.0",
        "processed_at":   run_ts.isoformat(),
        "currency":       "USD",
        "summary":        summary,
        "series":         series,
    }

    date_prefix = run_ts.strftime("%Y/%m/%d")
    dated_key   = f"processed/{date_prefix}/oil_prices.json"
    latest_key  = "processed/latest.json"

    body = json.dumps(output, indent=2)
    for key in [dated_key, latest_key]:
        s3.put_object(
            Bucket=S3_BUCKET_PROCESSED,
            Key=key,
            Body=body,
            ContentType="application/json",
        )
        logger.info(f"Written → s3://{S3_BUCKET_PROCESSED}/{key}")

    logger.info(
        f"Done: {len(series)} records, "
        f"{summary['date_range']['start']}{summary['date_range']['end']}"
    )

    return {
        "statusCode": 200,
        "body": {
            "processed_at": run_ts.isoformat(),
            "records":      len(series),
            "date_range":   summary["date_range"],
        },
    }


def _stats(values: list[float]) -> dict:
    if not values:
        return {}
    return {
        "min":    round(min(values), 2),
        "max":    round(max(values), 2),
        "latest": round(values[-1], 2),
        "count":  len(values),
    }

Step 5: Add the optional API Lambda

The API Lambda is optional.

lambdas/api/handler.py is a lightweight REST endpoint that sits behind API Gateway if you do not want the frontend to fetch the entire processed JSON file every time. It supports date filtering and field projection, for example:

GET /prices?from=2020-01-01&fields=date,brent_close,wti_close

It’s a useful optimisation, but I didn’t need it for stage one.

In practice, the choice is simple:

  • if you want the simplest setup, fetch directly from S3
  • if you want smaller frontend payloads or filtering, add API Gateway + Lambda

That keeps the design flexible without making the whole thing feel heavier than it needs to be.

Placeholder: API Lambda code

# PLACEHOLDER:
# Insert GitHub-linked snippet showing:
# - loading latest.json
# - date filtering
# - field projection
# - JSON response helper

Step 6: Connect the website

There are two patterns.

Option A: Direct S3 fetch

This is the simplest approach. Make the processed output readable to the frontend and fetch processed/latest.json directly.

Option B: API Gateway + Lambda

Use the optional API Lambda when you want filtering support or smaller payloads.

I’d start with direct S3 fetch and only add API Gateway if the frontend actually needs filtering.


Step 7: Explain the daily flow clearly

The live flow is:

  • 08:00 UTC: EventBridge triggers oil-pipeline-ingestor
  • around 08:01: ingestor calls EIA and writes two raw files
  • around 08:02: S3 event triggers oil-pipeline-transformer
  • around 08:03: transformer merges data and overwrites processed/latest.json
  • the site sees the updated data

One nice practical note: on weekends the job simply republishes Friday’s close, which is harmless and avoids unnecessary scheduling complexity.

For this pipeline, keeping it simple is the right trade-off.


What the backfill proved

After the backfill, the merged dataset covered 1986 through March 2026, with:

  • 9,852 Brent records
  • 10,123 WTI records
  • 10,291 merged records

It also captured some genuinely important market history, including the negative WTI print in April 2020.

I kept that in because “weird data” is sometimes just real history.


What it costs

One of the best parts of this build is the cost.

At this scale:

  • Lambda is effectively free
  • S3 is in the pennies range
  • EventBridge is effectively free
  • CloudWatch Logs cost pennies

Total: around £0–£2 per month, with a £5 billing alert as a sensible safety net.

For a personal project, that’s a great trade-off: real infrastructure, real automation, almost no running cost.


Final thought

This post is not about showing off architecture.

It’s about building a small, production-shaped data pipeline properly.

That means:

  • define the repo structure
  • create the CloudFormation stack first
  • build the ingestor Lambda
  • build the transformer Lambda
  • optionally add the API Lambda
  • backfill once
  • point the website at latest.json

That’s the point of the project.

It is not trying to look impressive.

It is trying to work every day.

Gareth Winterman