Change data capture (CDC) is a necessity. With CDC changes are synced instantly or near-instantly. In practice, CDC is often used to replicate data between databases in real-time. Essentially, CDC it is a perfect solution for non-intrusive, low-impact real-time data integration.
I will illustrate how the Etlworks change data capture solution works for eLearning company that needs to load data from 1600+ MySQL databases into Snowflake data warehouse. Thanks to Etlworks, company now can make everyday operations data available to non-technical business users. Data warehouse in Snowflake is updated every 5 minutes so users can gain quick, current business insights.
The company requirements
- Setup data pipeline to load incremental updates from 1600+ MySQL databases into the Snowflake data warehouse.
- The data pipeline must support INSERTs, UPDATEs, and DELETEs.
- The data pipeline must be able to automatically pickup new databases and adjust the destination schema if new columns are added to the source tables.
- The expected volume of data: hundreds of gigabytes, billions of records on initial load, tens of millions updates every day.
- The expected number of tables across all MySQL databases: 55000.
- The number of tables in Snowflake: 35
- The data pipeline must be extremely resilient to the extract and load errors.
- The data pipeline is expected to work in a fully automated mode.
Step 1: Setting up MySQL read replica instance in RDS
Etlworks team recommended to stream CDC events from the MySQL read replica. Our customer setup the MySQL instance in Amazon RDS and configured native MySQL replication from the production instance to the replica. Creating a read replica is an optional step, but it is significantly lessen a load of replication on your MySQL production instance.
The following permissions need to be configured for the MySQL user in the read replica MySQL instance.
|It enables the connector to select rows from tables in databases.|
|It enables the connector the use of the |
|It enables the connector to see database names by issuing the |
|It enables the connector to connect to and read the MySQL server binlog.|
|It enables the connector the use of following statements: |
|Amazon RDS or Amazon Aurora that do not allow a global read lock, table-level locks are used to create a consistent snapshot|
The binlog replication for MySQL read replica instance needs to be enabled by setting the following Amazon RDC parameters:
binlog_format: ROW log_bin_use_v1_row_events: 1 net_read_timeout: 3600 net_write_timeout: 3600 wait_timeout: 86400
Step 2: Setting up flows to extract data from MySQL using CDC
In Etlworks there are two options for extracting data using CDC and loading into the Snowflake:
- Create a separate flow to extract data from the database and create CSV files in the local storage and another flow to load CSV files into the Snowflake.
- Create a separate flow to extract data from the database and ingest into the messaging queue, such as Kafka. Create another flow to load data from the queue into the Snowflake.
For this project, we decided to use option 1. Option 2 requires a complicated setup with a separate message queue such as Kafka or Azure Event Hubs.
Typical CDC flow can extract data from multiple tables in multiple databases but having a single flow pulling data from 55000+ tables would be a major bottleneck as it would be limited to a single blocking queue with a limited capacity. It would also create a single point of failure.
The better approach would be to create multiple parallel extract flows, each pulling data from all 35 Snowflake tables in a single database. However, considering that the flow must extract data from 1600 MySQL databases it would be impractical and very resource demanding – we would have to run 1600+ parallel extract flows.
We decided to create 35 extract flows where each flow is pulling data from a single table in 1600 databases
There are other possible topologies to consider as well, for example:
- group tables and databases alphabetically;
- create separate extract flows for the large and high-traffic tables;
- for the rest of the tables split flows in chunks, each extracting data from a significant number of tables across multiple databases (for example 1000) ;
We recommend selecting the topology that works the best for you, keeping in mind the performance and the maintenance overhead.
Considering the design choice, we created a table in the Snowflake which has a list of databases to extract data from. This table is used to automatically populate the list of the included tables in a format:
database1.table_abc,database2.table_abc,...database1500.table_abc and included databases in a format
Step 3: Setting up flows to load data in Snowflake
Once again, there are multiple options to consider.
Option 1. Initially we setup a flow that loads the staged CSV files into Snowflake by using the CDC MERGE action which applies INSERTs/UPDATEs/DELETEs in order in which CDC events were originated in the source database.
For each CSV file the flow does the following:
- Creates a temporary table in Snowflake.
- Executes COPY INTO command to load the file “as is” into the temp table.
- Uses Snowflake MERGE command to merge data in the temp table with the data in the actual table.
This approach guarantees the smallest latency but is more resource consuming and requires more Snowflake credits (can be more expensive from the Snowflake standpoint).
Option 2. Skip Delete events and only apply INSERTs and UPDATEs. It can be a bit faster compared to option 1.
Option 3. Always INSERT data into the staging tables in Snowflake, then periodically execute a SQL script to populate the actual tables by de-duplicating the staging tables and removing ‘d’ (delete) records.
This option is very popular when customers are OK with longer delays between data being updated in the source and finally available in the data warehouse for consumption by the BI tools.
After extensive testing, our customer decided to use option 3. The main reason was the fact that [almost] real-time approach (option 1) uses Snowflake cloud services and consumes extra Snowflake credits.
Step 4. Scheduling extract and load flows
The Extract Flows
In Etlworks, it is possible to schedule a flow to run continuously until there is nothing to do, then stop for a configurable number of seconds and restart. We recommended this schedule type for 35 extract flows. The extract flows are running until they are automatically stopped to let the system add new databases. The customer set the delay between restarts to 2 hours.
The load flow
The load flow is loading files into Snowflake in batches. We recommended running it every few minutes so it could clear the queue as often as possible. The customer set it to run every 5 minutes.
On average the pipelines load tens of millions of records into Snowflake daily, but there are days when the number of records jumps to hundreds of millions. As configured the pipelines can easily handle extracting and loading billions of records a day.
The Etlworks platform enables Snowflake users to leverage the low-impact, real-time change data capture from all major databases: MySQL, Postgres, SQL Server, Oracle, DB2, MongoDB.
Moving data continuously, as new database transactions occur makes it possible for Snowflake users to maintain the real-time data pipelines that feed Snowflake’s fast and flexible storage and analytics solutions.
If you would like a brief demo of CDC to Snowflake, please schedule a demo.