Data Engineering

Python CSV Pipelines for Data Engineering — Cleaning $100K Spreadsheets

All articles
📊 🐍

From Spreadsheet Chaos to BigQuery Gold — 10 Years of Mess, One Night of Cleanup

If you've been living under a rock, you already know that "data engineering" looks sexy in job postings — distributed pipelines, Kubernetes, real-time streams. But 80% of real data engineering is unglamorous: clients send you ten years of Excel sheets with inconsistent date formats, duplicate entries, encoding errors, and hand-typed fuzzy joins. You spend three days reading raw data and two days cleaning it. You spend two hours building the transform. This is the pattern I use to automate that cleanup: pandas for initial read, duckdb for fast SQL transforms, pyarrow for parquet output, BigQuery load. It's opinionated, boring, and turns spreadsheet chaos into queryable datasets in one script.

Why CSV/Spreadsheet Cleanup Is the Boring Half of Data Engineering

Architects talk about Airflow DAGs and Kafka streams. What they don't talk about: client exports are garbage. Dates are text. Amounts are formatted with commas. Phone numbers have parentheses. Dupes exist because someone copy-pasted a row. Columns were renamed three times and now nobody knows which version is authoritative. You could spend a month building a "proper" ELT framework, or you could spend one night writing a pandas script that reads the file, applies 15 transformations, dedupes intelligently, and outputs clean parquet. Choose night-script.

The unlock: duckdb. It's SQL for in-memory data, meaning you write joins, window functions, and aggregations instead of pandas chain calls. You read the messy CSV with pandas, load it into duckdb's in-memory store, write your transforms as SQL, and output clean parquet. No Postgres. No Docker. Ship it as a single Python script. Clients run it on their laptop.

The Pattern: Read → Normalize → Dedupe → Validate → Parquet → BigQuery

Step 1: Read raw CSVs into pandas. Let pandas auto-infer types if the data is clean enough, but always pass dtype hints for columns you know are problematic (dates, IDs, amounts). Immediately drop entirely-empty rows and columns — that kills 30% of garbage before transforms even start.

import pandas as pd
import duckdb
import pyarrow.parquet as pq

# Read with dtype hints for problem fields
df = pd.read_csv(
  'messy_export.csv',
  dtype={
    'customer_id': str,
    'phone': str,
    'date_created': str,  # parse later
    'amount': str  # remove commas first
  }
)

# Drop fully-empty rows/cols
df = df.dropna(how='all').dropna(axis=1, how='all')

# Trim whitespace
df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

print(f"Loaded {len(df)} rows, {len(df.columns)} cols")

Step 2: Load into duckdb and normalize dates, amounts, encodings. Window functions find dupes (same customer_id, email, and amount within 7 days = likely dupe). Fuzzy join on names using Levenshtein distance if you're merging customer lists across multiple exports.

conn = duckdb.connect(':memory:')
conn.register('raw_data', df)

# Normalize and dedupe in SQL
normalized = conn.execute("""
  SELECT
    UPPER(TRIM(customer_id)) AS customer_id,
    REGEXP_REPLACE(phone, '[^0-9+]', '') AS phone,
    STRPTIME(date_created, '%m/%d/%Y')::DATE AS date_created,
    CAST(REGEXP_REPLACE(amount, '[^0-9.]', '') AS FLOAT) AS amount_usd,
    ROW_NUMBER() OVER (
      PARTITION BY customer_id
      ORDER BY date_created DESC
    ) AS row_rank
  FROM raw_data
  WHERE customer_id IS NOT NULL
    AND amount <> '0'
) AS normalized
SELECT
  *
FROM normalized
WHERE row_rank = 1  -- keep only latest per customer
""").df()

print(f"After dedup: {len(normalized)} rows")

Step 3: Validate output before load. Count nulls per column, check for obvious date range outliers (if you're expecting 2010–2024 and see 1901, something broke), verify phone/email format. Fail loudly if validation fails — don't silently load garbage into BigQuery.

Step 4: Write parquet with snappy compression. Parquet is columnar, compressed, and preserves type info. BigQuery reads it natively. Keep one parquet file per export run so you can audit lineage later.

output_path = f'cleaned_customers_{datetime.now().strftime("%Y%m%d_%H%M%S")}.parquet'
table = pa.Table.from_pandas(normalized)
pq.write_table(table, output_path, compression='snappy')

# Upload to BigQuery
from google.cloud import bigquery
client = bigquery.Client()

job_config = bigquery.LoadJobConfig(
  write_disposition="WRITE_APPEND",
  skip_leading_rows=0
)
job = client.load_table_from_file(
  open(output_path, 'rb'),
  'project.dataset.customers',
  job_config=job_config
)
job.result()
print(f"Loaded {job.output_rows} rows to BigQuery")

Handling Dirty Data — 6 Patterns That Catch 95% of Errors

Date chaos. CSVs have dates as "2/14/2023", "Feb 14, 2023", "2023-02-14", "14/02/2023" in the same column. Use duckdb's TRY_STRPTIME to attempt multiple formats and null-out failures, then flag rows where conversion failed so you can audit them.

Encoding disasters. If you see mojibake (Chinese characters appearing as ??? or ü becoming ü), read the CSV with encoding detection: `chardet.detect(open('file.csv', 'rb').read())['encoding']`.

Fuzzy dupes. Two rows, customer_id='123' and customer_id='123 ' (trailing space). Upper + trim kills most. For name-based fuzzy match, duckdb supports `jaro_similarity()` or use rapidfuzz's Levenshtein in pandas before SQL.

Silent nulls. Columns contain "N/A", "—", "null", empty strings, and actual NULLs mixed together. Normalize all to NULL in SQL: `CASE WHEN amount IN ('N/A', '—', '') THEN NULL ELSE amount END`.

Duplicated metadata. Some clients export each customer once but include multiple subsidiary rows or line items. If you need aggregate-level rollup, use GROUP BY + SUM early, then left-join the details back if needed.

Type leakage. A "customer_id" column contains both numeric IDs and alphanumeric codes. Keep it as text, filter strictly on pattern in duckdb, and document the split in your output schema.

Frequently Asked Questions

Why duckdb and not just pandas?

Pandas is great for column ops but joins and window functions are verbose. DuckDB is SQL, meaning it's declarative, debuggable, and fast. Write SQL, not `.merge(..., how='left').groupby().agg()`.

Do I need Airflow to run this?

No. Schedule the Python script with a cron job or GitHub Actions. If you're loading multiple exports daily, wire it into a cloud function (Netlify, AWS Lambda, GCP Cloud Function) so clients can trigger it on upload.

How do I handle schema changes?

Version your script in git. Use duckdb's schema inspection (`DESCRIBE table`) to alert if new columns appear. For major schema changes, create a new parquet file with a version suffix and document the migration.

Can I automate this for multiple clients?

Yes. Parameterize the script: pass in file path, column names to keep, dedup key, and output destination. Loop over a client config JSON. One script, many clients.

What about large files (1GB+)?

Pandas loads everything into RAM, duckdb does too by default. For streaming, read the CSV in chunks with `pd.read_csv(..., chunksize=100000)`, apply transforms per chunk, and append to a parquet file. But honestly: if you're regularly dealing with 1GB CSVs, the client should be exporting to a database, not Excel.

How do I know when the script broke?

Add validation checkpoints: row count before/after each step, null counts per column, date range bounds. If any check fails, raise an exception and email the result. Don't silently load bad data.

The Bottom Line

Real data engineering isn't Kafka. It's a three-hour Python script that turns ten years of spreadsheet chaos into clean parquet. Pandas reads the mess, duckdb transforms it with SQL, pyarrow outputs parquet, BigQuery ingests it. One night of scripting buys you weeks of clean data. See how we approach data pipelines or read our deep dive on attribution models in BigQuery.

Let us make some quick suggestions?
Please provide your full name.
Please provide your phone number.
Please provide a valid phone number.
Please provide your email address.
Please provide a valid email address.
Please provide your brand name or website.
Please provide your brand name or website.