Real-World Change Data Capture Example

Change data capture (CDC) is a necessity. With CDC changes are synced instantly or near-instantly. In practice, CDC is often used to replicate data between databases in real-time. Essentially, CDC it is a perfect solution for non-intrusive, low-impact real-time data integration.

I will illustrate how the Etlworks change data capture solution works for eLearning company that needs to load data from 1600+ MySQL databases into Snowflake data warehouse. Thanks to Etlworks, company now can make everyday operations data available to non-technical business users. Data warehouse in Snowflake is updated every 5 minutes so users can gain quick, current business insights.

The company requirements

  • Setup data pipeline to load incremental updates from 1600+ MySQL databases into the Snowflake data warehouse.
  • The data pipeline must support INSERTs, UPDATEs, and DELETEs.
  • The data pipeline must be able to automatically pickup new databases and adjust the destination schema if new columns are added to the source tables.
  • The expected volume of data: hundreds of gigabytes, billions of records on initial load, tens of millions updates every day. 
  • The expected number of tables across all MySQL databases: 55000.
  • The number of tables in Snowflake: 35
  • The data pipeline must be extremely resilient to the extract and load errors.
  • The data pipeline is expected to work in a fully automated mode.

Etlworks Solution

Step 1: Setting up MySQL read replica instance in RDS

Etlworks team recommended to stream CDC events from the MySQL read replica.  Our customer setup the MySQL instance in Amazon RDS and configured native MySQL replication from the production instance to the replica. Creating a read replica is an optional step, but it is significantly lessen a load of replication on your MySQL production instance.

The following permissions need to be configured for the MySQL user in the read replica MySQL instance.

PermissionDescription
SELECTIt enables the connector to select rows from tables in databases.
RELOADIt enables the connector the use of the FLUSH statement to clear or reload internal caches, flush tables, or acquire locks.
SHOW DATABASESIt enables the connector to see database names by issuing the SHOW DATABASE statement.
REPLICATION SLAVEIt enables the connector to connect to and read the MySQL server binlog.
REPLICATION CLIENTIt enables the connector the use of following statements: SHOW MASTER STATUS, SHOW SLAVE STATUS, and SHOW BINARY LOGS
LOCK_TABLES Amazon RDS or Amazon Aurora that do not allow a global read lock, table-level locks are used to create a consistent snapshot

The binlog replication for MySQL read replica instance needs to be enabled by setting the following Amazon RDC parameters:

binlog_format: ROW
log_bin_use_v1_row_events: 1
net_read_timeout: 3600
net_write_timeout: 3600
wait_timeout: 86400
Step 2: Setting up flows to extract data from MySQL using CDC

In Etlworks there are two options for extracting data using CDC and loading into the Snowflake:

  1. Create a separate flow to extract data from the database and create CSV files in the local storage and another flow to load CSV files into the Snowflake.
  2. Create a separate flow to extract data from the database and ingest into the messaging queue, such as Kafka.  Create another flow to load data from the queue into the Snowflake.

For this project, we decided to use option 1. Option 2 requires a complicated setup with a separate message queue such as Kafka or Azure Event Hubs.

Typical CDC flow can extract data from multiple tables in multiple databases but having a single flow pulling data from 55000+ tables would be a major bottleneck as it would be limited to a single blocking queue with a limited capacity. It would also create a single point of failure.

The better approach would be to create multiple parallel extract flows, each pulling data from all 35 Snowflake tables in a single database. However, considering that the flow must extract data from 1600 MySQL databases it would be impractical and very resource demanding – we would have to run 1600+ parallel extract flows. 

We decided to create 35 extract flows where each flow is pulling data from a single table in 1600 databases

There are other possible topologies to consider as well, for example:

  • group tables and databases alphabetically;
  • create separate extract flows for the large and high-traffic tables;
  • for the rest of the tables split flows in chunks, each extracting data from a significant number of tables across multiple databases (for example 1000) ;

We recommend selecting the topology that works the best for you, keeping in mind the performance and the maintenance overhead. 

Considering the design choice, we created a table in the Snowflake which has a list of databases to extract data from. This table is used to automatically populate the list of the included tables in a format: database1.table_abc,database2.table_abc,...database1500.table_abc and included databases in a format database1,database2,...database1500.

The basic idea is that we can use a single MySQL CDC connection where the Included Databases and Included Tables are set as {tokens}, populated at run-time by JavaScript.

Step 3: Setting up flows to load data in Snowflake

Once again, there are multiple options to consider.

Option 1. Initially we setup a flow that loads the staged CSV files into Snowflake by using the CDC MERGE action which applies INSERTs/UPDATEs/DELETEs in order in which CDC events were originated in the source database.

For each CSV file the flow does the following:

  1. Creates a temporary table in Snowflake.
  2. Executes COPY INTO command to load the file “as is” into the temp table.
  3. Uses Snowflake MERGE command to merge data in the temp table with the data in the actual table.

This approach guarantees the smallest latency but is more resource consuming and requires more Snowflake credits (can be more expensive from the Snowflake standpoint). 

Option 2. Skip Delete events and only apply INSERTs and UPDATEs. It can be a bit faster compared to option 1. 

Option 3. Always INSERT data into the staging tables in Snowflake, then periodically execute a SQL script to populate the actual tables by de-duplicating the staging tables and removing ‘d’ (delete) records. 

This option is very popular when customers are OK with longer delays between data being updated in the source and finally available in the data warehouse for consumption by the BI tools. 

After extensive testing, our customer decided to use option 3. The main reason was the fact that [almost] real-time approach (option 1) uses Snowflake cloud services and consumes extra Snowflake credits.

Step 4. Scheduling extract and load flows

The Extract Flows

In Etlworks, it is possible to schedule a flow to run continuously until there is nothing to do, then stop for a configurable number of seconds and restart. We recommended this schedule type for 35 extract flows.  The extract flows are running until they are automatically stopped to let the system add new databases. The customer set the delay between restarts to 2 hours. 

The load flow

The load flow is loading files into Snowflake in batches. We recommended running it every few minutes so it could clear the queue as often as possible. The customer set it to run every 5 minutes. 

On average the pipelines load tens of millions of records into Snowflake daily, but there are days when the number of records jumps to hundreds of millions. As configured the pipelines can easily handle extracting and loading billions of records a day.

Conclusion

The Etlworks platform enables Snowflake users to leverage the low-impact, real-time change data capture from all major databases: MySQL, Postgres, SQL Server, Oracle, DB2, MongoDB.

Moving data continuously, as new database transactions occur makes it possible for Snowflake users to maintain the real-time data pipelines that feed Snowflake’s fast and flexible storage and analytics solutions.

If you would like a brief demo of CDC to Snowflake, please schedule a demo.