I like small projects that stay close to real operational work. This one takes flat files that look like the kind of exports every data team inherits, cleans them up, stages them, and turns them into a usable marketing audience.
The pipeline uses CSV inputs, validates them with Python, stages them in SQLite, transforms them with SQL, and publishes an eligible audience. It also quarantines bad rows and writes a reconciliation report so there is a basic control layer around the output.

What the pipeline does
- Loads customer, booking, consent, and suppression data from CSV files.
- Keeps only the latest customer snapshot when duplicates exist.
- Uses the latest email consent event per customer.
- Aggregates completed bookings from the last 365 days.
- Excludes inactive customers, suppressed customers, and records with missing email.
- Quarantines invalid booking amounts into a rejected-record file.
Why I built it this way
This structure maps well to a real marketing data operation. In a cloud stack, the raw CSV layer could be an S3 landing zone, the SQLite staging tables could be a warehouse staging area, and the final audience output could feed BI, segmentation, or outbound campaign systems.
I kept the implementation small on purpose. The interesting part here is the sequence of decisions: what gets rejected, what gets deduplicated, what counts as the latest consent, and what gets filtered before an audience is published.
Project files
The project repository lives here: github.com/CodeWrangler55/AudiencePipeline. The links below point to files from the repo root.
- README.md
- run-pipeline.py
- schema.sql
- transform-audience.sql
- diagnostic-queries.sql
- customers.csv
- bookings.csv
- consent-events.csv
- suppressions.csv
- expected audience output
- generated audience output
- reconciliation report
- rejected rows
Pipeline shape
CSV inputs
- customers
- bookings
- consent events
- suppressions
|
v
Python validation and load
- reject invalid booking amounts
- preserve raw row counts
|
v
SQLite staging tables
|
v
SQL transformation
- latest customer snapshot
- latest email consent
- last-365-day booking aggregation
- suppression exclusion
|
v
Published eligible audience + reconciliation artifacts
Python ETL layer
The Python script uses the standard library only. That keeps the attention on the ingestion logic instead of package selection.

def load_bookings(connection: sqlite3.Connection) -> tuple[int, int]:
accepted_rows = []
rejected_rows = []
for row in read_csv(RAW_DIR / "bookings.csv"):
try:
amount = float(Decimal(row["booking_amount"]))
accepted_rows.append(
(
row["booking_id"],
int(row["customer_id"]),
row["booking_date"],
amount,
row["booking_status"],
row["product_name"],
row["source_system"],
)
)
except (InvalidOperation, ValueError):
row["rejection_reason"] = "booking_amount is not numeric"
rejected_rows.append(row)
connection.executemany(
"""
INSERT INTO raw_bookings (
booking_id,
customer_id,
booking_date,
booking_amount,
booking_status,
product_name,
source_system
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
accepted_rows,
)
write_csv(OUTPUT_DIR / "rejected-bookings.csv", rejected_rows)
return len(accepted_rows), len(rejected_rows)
The useful part is that malformed values are handled explicitly instead of leaking downstream and turning into a quieter problem later.
SQL transformation layer
The core SQL does three things that come up constantly in production data work:
- Uses
ROW_NUMBER()to pick the latest customer snapshot. - Uses
ROW_NUMBER()again to pick the latest email consent event. - Aggregates recent completed bookings and anti-joins suppressions.
WITH latest_customer AS (
SELECT
customer_id,
email,
home_state,
loyalty_tier,
is_active,
updated_at,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY updated_at DESC
) AS row_num
FROM raw_customers
),
current_customer AS (
SELECT
customer_id,
email,
home_state,
loyalty_tier,
is_active
FROM latest_customer
WHERE row_num = 1
),
latest_email_consent AS (
SELECT
customer_id,
consent_status,
ROW_NUMBER() OVER (
PARTITION BY customer_id, channel
ORDER BY event_ts DESC
) AS row_num
FROM raw_consent_events
WHERE channel = 'email'
)
SELECT
c.customer_id,
c.email,
c.home_state,
c.loyalty_tier,
b.bookings_last_365_days,
b.total_spend_last_365_days,
b.last_booking_date
FROM current_customer c
JOIN (
SELECT customer_id, consent_status
FROM latest_email_consent
WHERE row_num = 1
) ec
ON c.customer_id = ec.customer_id
JOIN recent_completed_bookings b
ON c.customer_id = b.customer_id
LEFT JOIN active_suppressions s
ON c.customer_id = s.customer_id
WHERE c.is_active = 1
AND c.email IS NOT NULL
AND TRIM(c.email) <> ''
AND ec.consent_status = 'OPT_IN'
AND s.customer_id IS NULL
ORDER BY c.customer_id;
This is the kind of SQL I like to keep nearby: enough logic to be useful, but still readable enough that someone else can audit the business rules without reverse engineering the whole query.
Troubleshooting and data quality
A pipeline is not finished just because it runs. It also needs to make bad inputs and suspicious output visible.

That is why this project writes:
rejected-bookings.csvfor malformed numeric valuesreconciliation.jsonfor row-count visibilitydiagnostic-queries.sqlfor audience-drop investigations
If the audience suddenly shrank, I would check raw counts, verify the latest consent logic, inspect suppressions, and compare inclusion and exclusion conditions one layer at a time.
How to run it
python src/run-pipeline.py
python -m unittest discover tests
What I wanted this project to prove
- Flat-file source data can still be handled with reasonable controls.
- SQL is where a lot of the business logic belongs, especially around deduplication, consent state, and eligibility rules.
- Python is useful here as the orchestration and validation layer.
- Troubleshooting gets easier when row counts, rejects, and exclusion logic are visible.
That is the whole shape of the project: ingest, validate, stage, transform, publish, and leave enough evidence behind that the next person can understand what happened.
Leave a Reply