GCP Iceberg Lakehouse ELT Pipeline
A fully configurable, orchestrator-agnostic Spark ETL pipeline built to ingest structured data from MySQL/RDBMS databases directly into an Apache Iceberg-based Google Cloud Storage Lakehouse.
Deployment Orchestration: This codebase is completely scheduler-agnostic. It is designed to be deployed through any modern orchestration tool (e.g., Apache Airflow, Prefect, Dagster, or AWS Glue Scheduler). In production, you can manage ingestion metadata via table-level YAML/HOCON files, which a DAG generator parses to schedule Spark jobs dynamically via Livy operators or Google Cloud Dataproc.
Data Pipeline Execution Workflow
* Once data is written into GCS in Apache Iceberg format, it can be queried concurrently by BigQuery, Spark SQL, or Trino after setting up IAM permissions, allowing analytics, dashboards, and AI/ML applications to consume it seamlessly.
Modular Pipeline Architecture
The core strength of our ETL framework is its decoupled, modular design. The Extract, Transform, and Load phases function as completely independent stages that can be developed, tested, and maintained separately.
Using a composition DSL (~>), these stages are pluggable and reusable across pipelines. For example, if you swap the source from a JDBC database to GCS files, you only need to change the extraction stage; your downstream transformation logic and Iceberg loading components remain completely unchanged. Furthermore, all transformation rules are fully parameter-driven, allowing you to easily configure filters, column renaming, datatype casting, and data-masking rules directly from configuration profiles.
1
2val etl: ETL = Extract ~> Transformation ~> IcebergLoad
3Pipeline Configurable Parameters
All aspects of the pipeline execution are highly configurable at the table level using HOCON or YAML profiles.
1. Extraction Parameters (Extract)
Ingestion type. Supports incremental (reads from last checkpoint), full (full dump and overwrite), and full-update.
Integrates directly with GCP Secret Manager to load database credentials securely (no plain text passwords in configs).
Controls parallelization of read operations, splitting queries across multiple Spark executors.
Sets limits on execution duration to protect MySQL/RDBMS databases from long-running runaway queries (must be seconds or greater).
Name of the database field (e.g. timestamp or ID) used as a checkpoint bookmark for incremental ingestion.
Defines the maximum time period processed in a single pipeline run to control data batch size.
The target row count per batch to prevent out-of-memory errors on Spark workers.
A set of generic options passed directly to the Spark DataFrameReader.
2. Transformation Parameters (Transform)
Using the DSL, developers can easily inject custom transformations: Transform = FStage[DataFrame, DataFrame]. You can perform projection, column renaming, datatype castings, metadata generation, and hashing sensitive columns before the data is written.
Uses schema definitions configured via extract.schema or file path extract.schema-file to validate incoming data structures dynamically.
3. Target Load Parameters (Load)
Target Apache Iceberg table identifier (e.g. catalog.namespace.table).
Determines write mode: Append (default for incremental type) or Overwrite (default for full update).
List of columns that define the table primary key, enablingupsert behavior during MERGE INTO operations.
Defines partition fields to optimize Iceberg metadata queries and physical file layout in GCS.
Enables conditional micro-matching resolution (e.g. updates only occur if incoming record timestamp is greater than target, using s.updated_at >= t.updated_at in MERGE).
HOCON writer options passed to Spark DataFrameWriter during loading.
Why GCS with Apache Iceberg format instead of direct BigQuery storage?
- Interoperability & No Vendor Lock-in: By storing data in GCS in open-standard Apache Iceberg format, you are not locked into any single analytical engine. It can be queried concurrently by Spark SQL, Trino, Flink, StarRocks, and BigQuery (via BigLake external tables) after simple permission setup.
- Storage Cost Efficiency: GCS provides a significantly lower cost model for storing large volumes of historical data compared to BigQuery active storage rates.
- Decoupled Compute & Storage: Different business units (Analytics, AI/ML, BI teams) can use their preferred query engines directly over the same data catalog without competing for native BigQuery slots or incurring high query-scanning costs.
HOCON Configuration Example
Below is a complete table configuration showing how easy it is to specify ingestion details:
1# Table ingestion using HOCON configuration example, it can be YMAL file configuration as well.
2extract {
3
4 secret = "projects/my-gcp-project/secrets/mysql-db-credentials/versions/latest"
5 jdbc {
6 database = "sales_db"
7 table = "orders"
8 extract-type = "incremental" // It can be incremental, full, or full-update
9 num-partitions = 8 // It will split the data based on number of partitions
10 query-timeout = "60s" // It will kill the query if it exceeds the timeout
11 incremental-field = "incremental column of the jdbc table"
12 max-batch-interval = "12h" // It will process the data in batches of 12 hours
13 soft-batch-limit = 100000 // It will process the data in batches of 100000 rows, avoid OOM on spark side.
14 }
15}
16
17load {
18 iceberg.table = "prod_catalog.sales.orders" // Schema name and table name.
19 save-mode = "Append" // Append, Update, Delete
20 primary-key = ["order_id"] // Primary key for the table
21 partition-by = ["order_date"] // Partition key for the table in Iceberg
22 iceberg.precombine-field = "jdbc.incremental-field" // Precombine field for the table (It should be same as jdbc.incremental-field)
23}Want to see how this pipeline performs compared to Google Cloud's serverless ingestion?
Compare with Google Cloud Datastream

