Change Data Capture (CDC) & Why It Matters

change-data-capture

What is change data capture? Does it have any importance or bearing on the work that you do? We’ll answer the first question shortly, but the answer to the second question is most certainly “yes.”

What Is Change Data Capture?

Change data capture (CDC) solves data integration problems by monitoring, inserting, updating, and deleting changes to records or activity on table data.

The first step in this process is to identify that a change has been made to a data source. To do so, a CDC procedure will inspect the relevant table to see if there is a delta between the old values and new values, based on what’s stored in the source systems, then update the pertinent record accordingly.

There are several good methods for doing this — high watermark, diff, and database triggers, for example. However, one of the superior approaches to track data changes is log-based CDC.

While interpreting the changes in the transaction log is difficult, the biggest benefit of log-based change data capture is the asynchronous nature of the CDC: changes are captured independent of the source application performing the changes. With log-based CDC, new database transactions – including inserts, updates, and deletes – are read from source databases’ transaction or redo logs. The changes are captured without making application level changes and without having to scan operational tables, both of which add additional workload and reduce source systems’ performance.

By allowing you to capture and extract only changed data, change data capture eliminates or reduces the need for batch windows, replacing bulk load updates with continuous streaming or incremental loading of data. Change data capture helps you improve efficiency by reducing redundant data replication and delivery, reducing data warehousing costs and facilitating real-time data integration across your data stores.

Why Change Data Capture Matters

There are many advantages of using transaction logs for CDC:

  • This solution is transparent to databases and has minimal impact on the database.
  • Near real-time publishing of the changes to the target system. This means the business can take accurate decisions based on the most current data.
  • No impact on the transactions at the source application, no additional SQL load on the system.
  • Because CDC transfers only incremental changes it reduced the cost of transferring data.
  • This approach maintains the order in which original transactions were committed. This is important when the target application depends on the order of transactions in the source system. The ordering guarantees are most often desired by target applications.
  • The target system can take time to process the messages.

Together these advantages enable the building of streaming data pipelines that help to share application data across a business. This means that businesses are getting fed insights that are up to date and accurate based on the latest data being fed from across many systems. The decisions made from these insights help businesses to remain competitive in their respective markets.

How Etlworks can help

Etlworks is a high-performance data integration platform with the next-generation change data capture technology.

Etlworks supports native log-based change data capture for PostgreSQL, SQL Server, MySQL, Oracle, DB2 and MongoDB databases. Our intuitive visual interface makes it easy to set up, monitor, and manage your data pipelines, eliminating the need for scripting and ensuring quick time-to-value. Unlike other tools that support CDC, there are only two moving parts – your database and Etlworks. You will be up and running with CDC in a matter of minutes!

Learn More about Log-Based CDC with Etlworks for Real-Time Data Replication

Contact Us! 

Real-time change replication with Kafka and Debezium

etlworks-real-time-data-streaming-1200

In this article, I demonstrate how to implement [near] real-time Change Data Capture, or CDC, -based change replication for the most popular databases using the following technologies:

  • Native CDC for each source database
  • Apache Kafka
  • Debezium
  • Etlworks Kafka connector with built-in support for Debezium

Overview

Change Data Capture (CDC), as its name suggests, is a design pattern that captures individual data changes instead of dealing with the entire data. Instead of dumping your entire database, using CDC, you would capture just the data changes made to the master database and apply them to the BI databases to keep both of your databases in sync.

Debezium is a CDC tool that can stream changes from Microsoft SQL Server, MySQL, MongoDB, Oracle and PostgreSQL into Kafka, using Kafka Connect.

Kafka Connect is a tool for streaming data between Apache Kafka and external systems. It is used to define connectors that move large collections of data into and out of Kafka.

Etlworks Integrator parses the CDC events emitted to the Kafka topic, automatically transforms events to the DML SQL statements (INSERT/UPDATE/DELETE), and executes SQL statements in the target database in the order they were created. It also handles the collisions and errors, ensuring that the solution is 100% reliable.

Etlworks - CDC with Kafka and Debezium

Anything can be a destination: SQL and NoSQL data sources, online data warehouses such as Snowflake and Redshift, files, API endpoints, etc. The following databases are supported as a source:

  • Microsoft SQL Server
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL

Prerequisites

The solution requires installing and configuring Apache Kafka, Kafka Connect, and Debezium.

Installing and configuring Kafka and Kafka connect (Kafka component required for CDC) is not part of this tutorial. In most cases installing Kafka is as easy as downloading the latest version of the standalone or dockerized Kafka and Zookeeper. Kafka Connect is typically included in all distribution packages so there is nothing to install. Debezium can be installed as a plugin for Kafka Connect by simply copying required libraries to the KAFKA_HOME/plugin folder.

For customers on Enterprise plans, Etlworks installs all required components. We provide assistance with installing components to the self-hosted customers.

Please contact Etlworks support if you want to enable a real-time change replication for your account.

Solution

The CDC events are serialized as JSON or Avro documents and can be transformed using any of the available in Etlworks transformations.

The basic setup for real-time CDC-based change replication in Etlworks is very easy:

  1. Setup CDC for the source database.
  2. Configure Debezium to capture CDC events and publish them to the Kafka topic(s).
  3. Create a change replication flow where the source is a Kafka topic and the destination is a target database table.
  4. Schedule the flow.

1. Setup CDC for the source database

Enabling CDC is different for each database. Please use the following tutorials:

2. Configure Debezium to capture CDC events and publish them to the Kafka topic(s)

Assuming that the Debezium is already installed as a Kafka Connect plugin and up and running, we will be configuring a connector to the source database using Kafka Connect REST API. In this tutorial, we will be using Microsoft SQL Server, but configuring connectors to other databases is equally simple.

Official tutorials:

The Debezium connectors are created using Kafka Connect REST API so make sure either curl or Postman is installed in your development box. In this tutorial, we will be using Postman.

Step 1. Verify that Kafka Connect is installed and running.

The default port for Kafka Connect API is 8083. Assuming that it runs on localhost the URL for the API endpoint which returns configured connectors is:

http://localhost:8083/connectors

get-connectors.png

Step 2. Create a new connector for Microsoft SQL Server.

ENDPOINT URL: http://localhost:8083/connectors 

METHOD: POST

PAYLOAD (example):

{
 "name": "sqlserver-connector",
 "config": {
 "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
 "tasks.max": "1",
 "database.hostname": "localhost",
 "database.port": "1433",
 "database.user": "sa",
 "database.password": "password",
 "database.dbname": "database_name",
 "database.server.name": "database_server_name",
 "table.whitelist": "comma separated list of fully_qualified_table_names",
 "database.history.kafka.bootstrap.servers": "localhost:9092",
 "database.history.kafka.topic": "dbhistory.database_server_name.database_name",

 "transforms": "unwrap",
 "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
 "transforms.unwrap.drop.tombstones": "false",
 "transforms.unwrap.operation.header": "true",

 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable": "false",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false",
 "include.schema.changes": "false"
 }
}

create-connector.png

CDC Event Flattening

the payload contains two important sections required for integrating Etlworks Kafka connector with Debezium: 

1. The unwap transfromation, which creates a flat version of the CDC event:

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.operation.header": "true",

2. The settings which remove the schema information from the CDC event:

"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",

The naming convention for Kafka topics

Debezium stores CDC events in a separate topic for each table. If the connector was configured using the following parameters:

"database.dbname": "database_name",
"database.server.name": "database_server_name",

the CDC events for the table cdc_test will be stored in a Kafka topic database_server_name.database_name.cdc_test.

Examples of the generated CDC events

Etlworks automatically parses CDC events stored in a Kafka topic so you don’t have to deal with this, but it is still a good idea to learn how exactly the generated events look like for the different DML statements.

Assuming that we are using a JSON format for serialization of the CDC events (default), also assuming that the source table was created using the following SQL:

CREATE TABLE dbo.cdc_test(id INT,
 NAME VARCHAR(255),
 changed DATETIME,
 PRIMARY KEY (id))

the generated CDC events will look like the following:

INSERT INTO dbo.test2 (id,name,changed) values (1,'test1',CURRENT_TIMESTAMP)

Key: {“id”:1}

Value: {“id”:1,”name”:”test1″,”changed”:1552064244733}

Header: __debezium-operation=c

UPDATE dbo.cdc_test SET name = 'updated-test1-1', changed=CURRENT_TIMESTAMP 
WHERE id = 1

Key: {“id”:1}

Value: {“id”:1,”name”:”updated-test1-1″,”changed”:1552064295845}

Header: __debezium-operation=u

DELETE FROM WHERE id = 1

Key: {“id”:1}

Value: none

Header: __debezium-operation=DELETE

Configuring serialization format for DATE/TIME fields

As you probably noticed, the value of the timestamp field changed is generated as Unix epoch time in miliseconds. You can convert it to the human (and database) readable format using TimestampConverter transformation:

"transforms":"unwrap,convert",
 
"transforms.convert.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert.target.type":"string",
"transforms.convert.field":"changed",
"transforms.convert.format":"yyyy-MM-dd HH:mm:ss"

3. Create a change replication flow

Step 1. Create source Kafka connection. When creating connection select Debezium as CDC provider. Enter a wildcard topic name in the format database_server_name.database_name.*.

Step 2. Create a JSON format with all default settings.

Step 3. Create a connection to the destination database. Disable the auto-commit.

Step 4. Create a new flow by selecting Stream data from queue to database from the gallery.

Step 5. Add a new source-to-destination transformation where:

  • the FROM connection is the connection to Kafka created in step 1.
  • the FROM format is the JSON format created in step 2.
  • the FROM is a topic name for the specific table.
  • the TO is a destination table name.
  • the TO connection is the destination database connection created in step 3.

debezium-cdc-transformation.png

Step 6. Click the MAPPING button, select Parameters tab and configure the following parameters:

  • Stream Data – enabled
  • Use Bind Variables – enabled
  • Create Tables and Adjust Fields – enabled
  • Action – Record
  • Lookup Fields – the unique field(s) for the record. In our example the unique field is ID.

cdc-transformation.png

Step 7. If the foreign constraints are disabled (or do not exist) in the destination database, you can enable processing of each transformation in a parallel thread.

parallel-cdc.png

Step 8. Add the source-to-destination transformations for all tables with enabled CDC. You can duplicate the existing transformation and change the topic name (the format is server_name.database_name.table_name), the destination table name, and the lookup field(s).

4. Schedule the flow

Schedule the flow to be executed in real-time.