Real-time change replication with Kafka and Debezium

etlworks-real-time-data-streaming-1200

In this article, I demonstrate how to implement [near] real-time Change Data Capture, or CDC, -based change replication for the most popular databases using the following technologies:

  • Native CDC for each source database
  • Apache Kafka
  • Debezium
  • Etlworks Kafka connector with built-in support for Debezium

Overview

Change Data Capture (CDC), as its name suggests, is a design pattern that captures individual data changes instead of dealing with the entire data. Instead of dumping your entire database, using CDC, you would capture just the data changes made to the master database and apply them to the BI databases to keep both of your databases in sync.

Debezium is a CDC tool that can stream changes from Microsoft SQL Server, MySQL, MongoDB, Oracle and PostgreSQL into Kafka, using Kafka Connect.

Kafka Connect is a tool for streaming data between Apache Kafka and external systems. It is used to define connectors that move large collections of data into and out of Kafka.

Etlworks Integrator parses the CDC events emitted to the Kafka topic, automatically transforms events to the DML SQL statements (INSERT/UPDATE/DELETE), and executes SQL statements in the target database in the order they were created. It also handles the collisions and errors, ensuring that the solution is 100% reliable.

Etlworks - CDC with Kafka and Debezium

Anything can be a destination: SQL and NoSQL data sources, online data warehouses such as Snowflake and Redshift, files, API endpoints, etc. The following databases are supported as a source:

  • Microsoft SQL Server
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL

Prerequisites

The solution requires installing and configuring Apache Kafka, Kafka Connect, and Debezium.

Installing and configuring Kafka and Kafka connect (Kafka component required for CDC) is not part of this tutorial. In most cases installing Kafka is as easy as downloading the latest version of the standalone or dockerized Kafka and Zookeeper. Kafka Connect is typically included in all distribution packages so there is nothing to install. Debezium can be installed as a plugin for Kafka Connect by simply copying required libraries to the KAFKA_HOME/plugin folder.

For customers on Enterprise plans, Etlworks installs all required components. We provide assistance with installing components to the self-hosted customers.

Please contact Etlworks support if you want to enable a real-time change replication for your account.

Solution

The CDC events are serialized as JSON or Avro documents and can be transformed using any of the available in Etlworks transformations.

The basic setup for real-time CDC-based change replication in Etlworks is very easy:

  1. Setup CDC for the source database.
  2. Configure Debezium to capture CDC events and publish them to the Kafka topic(s).
  3. Create a change replication flow where the source is a Kafka topic and the destination is a target database table.
  4. Schedule the flow.

1. Setup CDC for the source database

Enabling CDC is different for each database. Please use the following tutorials:

2. Configure Debezium to capture CDC events and publish them to the Kafka topic(s)

Assuming that the Debezium is already installed as a Kafka Connect plugin and up and running, we will be configuring a connector to the source database using Kafka Connect REST API. In this tutorial, we will be using Microsoft SQL Server, but configuring connectors to other databases is equally simple.

Official tutorials:

The Debezium connectors are created using Kafka Connect REST API so make sure either curl or Postman is installed in your development box. In this tutorial, we will be using Postman.

Step 1. Verify that Kafka Connect is installed and running.

The default port for Kafka Connect API is 8083. Assuming that it runs on localhost the URL for the API endpoint which returns configured connectors is:

http://localhost:8083/connectors

get-connectors.png

Step 2. Create a new connector for Microsoft SQL Server.

ENDPOINT URL: http://localhost:8083/connectors 

METHOD: POST

PAYLOAD (example):

{
 "name": "sqlserver-connector",
 "config": {
 "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
 "tasks.max": "1",
 "database.hostname": "localhost",
 "database.port": "1433",
 "database.user": "sa",
 "database.password": "password",
 "database.dbname": "database_name",
 "database.server.name": "database_server_name",
 "table.whitelist": "comma separated list of fully_qualified_table_names",
 "database.history.kafka.bootstrap.servers": "localhost:9092",
 "database.history.kafka.topic": "dbhistory.database_server_name.database_name",

 "transforms": "unwrap",
 "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
 "transforms.unwrap.drop.tombstones": "false",
 "transforms.unwrap.operation.header": "true",

 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable": "false",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false",
 "include.schema.changes": "false"
 }
}

create-connector.png

CDC Event Flattening

the payload contains two important sections required for integrating Etlworks Kafka connector with Debezium: 

1. The unwap transfromation, which creates a flat version of the CDC event:

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.operation.header": "true",

2. The settings which remove the schema information from the CDC event:

"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",

The naming convention for Kafka topics

Debezium stores CDC events in a separate topic for each table. If the connector was configured using the following parameters:

"database.dbname": "database_name",
"database.server.name": "database_server_name",

the CDC events for the table cdc_test will be stored in a Kafka topic database_server_name.database_name.cdc_test.

Examples of the generated CDC events

Etlworks automatically parses CDC events stored in a Kafka topic so you don’t have to deal with this, but it is still a good idea to learn how exactly the generated events look like for the different DML statements.

Assuming that we are using a JSON format for serialization of the CDC events (default), also assuming that the source table was created using the following SQL:

CREATE TABLE dbo.cdc_test(id INT,
 NAME VARCHAR(255),
 changed DATETIME,
 PRIMARY KEY (id))

the generated CDC events will look like the following:

INSERT INTO dbo.test2 (id,name,changed) values (1,'test1',CURRENT_TIMESTAMP)

Key: {“id”:1}

Value: {“id”:1,”name”:”test1″,”changed”:1552064244733}

Header: __debezium-operation=c

UPDATE dbo.cdc_test SET name = 'updated-test1-1', changed=CURRENT_TIMESTAMP 
WHERE id = 1

Key: {“id”:1}

Value: {“id”:1,”name”:”updated-test1-1″,”changed”:1552064295845}

Header: __debezium-operation=u

DELETE FROM WHERE id = 1

Key: {“id”:1}

Value: none

Header: __debezium-operation=DELETE

Configuring serialization format for DATE/TIME fields

As you probably noticed, the value of the timestamp field changed is generated as Unix epoch time in miliseconds. You can convert it to the human (and database) readable format using TimestampConverter transformation:

"transforms":"unwrap,convert",
 
"transforms.convert.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert.target.type":"string",
"transforms.convert.field":"changed",
"transforms.convert.format":"yyyy-MM-dd HH:mm:ss"

3. Create a change replication flow

Step 1. Create source Kafka connection. When creating connection select Debezium as CDC provider. Enter a wildcard topic name in the format database_server_name.database_name.*.

Step 2. Create a JSON format with all default settings.

Step 3. Create a connection to the destination database. Disable the auto-commit.

Step 4. Create a new flow by selecting Stream data from queue to database from the gallery.

Step 5. Add a new source-to-destination transformation where:

  • the FROM connection is the connection to Kafka created in step 1.
  • the FROM format is the JSON format created in step 2.
  • the FROM is a topic name for the specific table.
  • the TO is a destination table name.
  • the TO connection is the destination database connection created in step 3.

debezium-cdc-transformation.png

Step 6. Click the MAPPING button, select Parameters tab and configure the following parameters:

  • Stream Data – enabled
  • Use Bind Variables – enabled
  • Create Tables and Adjust Fields – enabled
  • Action – Record
  • Lookup Fields – the unique field(s) for the record. In our example the unique field is ID.

cdc-transformation.png

Step 7. If the foreign constraints are disabled (or do not exist) in the destination database, you can enable processing of each transformation in a parallel thread.

parallel-cdc.png

Step 8. Add the source-to-destination transformations for all tables with enabled CDC. You can duplicate the existing transformation and change the topic name (the format is server_name.database_name.table_name), the destination table name, and the lookup field(s).

4. Schedule the flow

Schedule the flow to be executed in real-time.

Etlworks CEO: Efficient data structures and real-time data streaming

Data-Mgmt

Search Business Analytics recently interviewed Etlworks CEO, Maksym Sherbinin about the importance of creating efficient data structures and real-time data streaming.

Here are his insights:

Q: To what extent does defaulting to JSON as a data storage mechanism contribute to decreased analytics performance and larger file sizes?

A: There are two factors that contribute to JSON (as well as CSV and XML) not being an optimal format for storing and processing large datasets:

  • The lack of schema enforcement, which results in the same data being injected and stored multiple times in the data warehouse, definitely has a direct impact on performance, quality of service, and cost.
  • JSON-encoded datasets cannot be stored in a parallel manner, decreasing its usefulness for systems built on top of the Hadoop and other frameworks used for distributed processing.

Q: What are the existing alternatives for developers to create more efficient data structures for different kinds of analytics workloads in terms of data/file formats?

A: There are two very distinct alternatives:

  • Column-based databases optimized for data warehousing and massively parallel processing, specifically Snowflake and Amazon Redshift.
  • Formats, optimized for use in Hadoop clusters and data streaming: Optimized Row Columnar (ORC), Avro, and Parquet.

Q: What is the current state of tools that can automate some of this process (i.e. are there tools that can look at how an app works with data and recommend a better file format or data structure, and perhaps compile or transform it into these more efficient formats like traditional programming language compilers)?

A: When it comes to encoding data in any of the Hadoop-optimized formats, such as Avro and Parquet, there are great open-source libraries available for practically any programming language known to humankind. Converting data stored somewhere else (for example in the database) to the Avro or Parquet datasets and storing them in the distributed file system can be performed using home-grown tools or (preferred) a good ETL tool, which can connect to any source, map, transform, and store data in the Hadoop cluster. Below are some of the any-to-any ETL tools which support various sources, Avro and Parquet as a destination format, and Hadoop Distributed File System (HDFS) as a destination:

  • Etlworks
  • Talend
  • Informatica

The other trend is real-time data streaming that uses distributed platforms such as Apache Kafka. The Alooma enterprise pipeline is a good example of the service, which can be used to stream large datasets to analytical platforms using datasets encoded as Avro or Parquet.

Additionally, there is a growing list of tools which can be used to inject data into the column-based data warehouses, built for the cloud, such as Snowflake and Amazon Redshift. Depending on your requirements you might consider the following tools:

  • Matillion ETL
  • Etlworks Integrator
  • AWS Glue

Finally, selecting the best format for the job is not something any tool should be automatically doing for you, but having multiple options is a must. The ability to experiment and prototype with different formats and storage options, as well as injection techniques (batch load, streaming, SQL, etc.) is why a good ETL tool goes a long way. It saves you time, money and provides a peace of mind.


Q: What are your thoughts on the best practices for finding the balance between identifying the most efficient data structures for a given analytics use case, and quickly experimenting with different analytics use cases?

A: This article describes how Uber went from the general use database, to JSON-encoded datasets, to the Hadoop-based big data platform. Granted, not all business require something as extreme, so common sense is your best friend. A typical relational database such as MySQL or PostgreSQL is perfectly capable of storing and processing relatively large analytical datasets (millions of records). Models with hundreds of millions and billions of records require specialized solutions, using either column-based data warehouses or Hadoop-like clusters with a parallel file system.