Real-World Change Data Capture Example

Change data capture (CDC) is a necessity. With CDC changes are synced instantly or near-instantly. In practice, CDC is often used to replicate data between databases in real-time. Essentially, CDC it is a perfect solution for non-intrusive, low-impact real-time data integration.

I will illustrate how the Etlworks change data capture solution works for eLearning company that needs to load data from 1600+ MySQL databases into Snowflake data warehouse. Thanks to Etlworks, company now can make everyday operations data available to non-technical business users. Data warehouse in Snowflake is updated every 5 minutes so users can gain quick, current business insights.

The company requirements

  • Setup data pipeline to load incremental updates from 1600+ MySQL databases into the Snowflake data warehouse.
  • The data pipeline must support INSERTs, UPDATEs, and DELETEs.
  • The data pipeline must be able to automatically pickup new databases and adjust the destination schema if new columns are added to the source tables.
  • The expected volume of data: hundreds of gigabytes, billions of records on initial load, tens of millions updates every day. 
  • The expected number of tables across all MySQL databases: 55000.
  • The number of tables in Snowflake: 35
  • The data pipeline must be extremely resilient to the extract and load errors.
  • The data pipeline is expected to work in a fully automated mode.

Etlworks Solution

Step 1: Setting up MySQL read replica instance in RDS

Etlworks team recommended to stream CDC events from the MySQL read replica.  Our customer setup the MySQL instance in Amazon RDS and configured native MySQL replication from the production instance to the replica. Creating a read replica is an optional step, but it is significantly lessen a load of replication on your MySQL production instance.

The following permissions need to be configured for the MySQL user in the read replica MySQL instance.

PermissionDescription
SELECTIt enables the connector to select rows from tables in databases.
RELOADIt enables the connector the use of the FLUSH statement to clear or reload internal caches, flush tables, or acquire locks.
SHOW DATABASESIt enables the connector to see database names by issuing the SHOW DATABASE statement.
REPLICATION SLAVEIt enables the connector to connect to and read the MySQL server binlog.
REPLICATION CLIENTIt enables the connector the use of following statements: SHOW MASTER STATUS, SHOW SLAVE STATUS, and SHOW BINARY LOGS
LOCK_TABLES Amazon RDS or Amazon Aurora that do not allow a global read lock, table-level locks are used to create a consistent snapshot

The binlog replication for MySQL read replica instance needs to be enabled by setting the following Amazon RDC parameters:

binlog_format: ROW
log_bin_use_v1_row_events: 1
net_read_timeout: 3600
net_write_timeout: 3600
wait_timeout: 86400
Step 2: Setting up flows to extract data from MySQL using CDC

In Etlworks there are two options for extracting data using CDC and loading into the Snowflake:

  1. Create a separate flow to extract data from the database and create CSV files in the local storage and another flow to load CSV files into the Snowflake.
  2. Create a separate flow to extract data from the database and ingest into the messaging queue, such as Kafka.  Create another flow to load data from the queue into the Snowflake.

For this project, we decided to use option 1. Option 2 requires a complicated setup with a separate message queue such as Kafka or Azure Event Hubs.

Typical CDC flow can extract data from multiple tables in multiple databases but having a single flow pulling data from 55000+ tables would be a major bottleneck as it would be limited to a single blocking queue with a limited capacity. It would also create a single point of failure.

The better approach would be to create multiple parallel extract flows, each pulling data from all 35 Snowflake tables in a single database. However, considering that the flow must extract data from 1600 MySQL databases it would be impractical and very resource demanding – we would have to run 1600+ parallel extract flows. 

We decided to create 35 extract flows where each flow is pulling data from a single table in 1600 databases

There are other possible topologies to consider as well, for example:

  • group tables and databases alphabetically;
  • create separate extract flows for the large and high-traffic tables;
  • for the rest of the tables split flows in chunks, each extracting data from a significant number of tables across multiple databases (for example 1000) ;

We recommend selecting the topology that works the best for you, keeping in mind the performance and the maintenance overhead. 

Considering the design choice, we created a table in the Snowflake which has a list of databases to extract data from. This table is used to automatically populate the list of the included tables in a format: database1.table_abc,database2.table_abc,...database1500.table_abc and included databases in a format database1,database2,...database1500.

The basic idea is that we can use a single MySQL CDC connection where the Included Databases and Included Tables are set as {tokens}, populated at run-time by JavaScript.

Step 3: Setting up flows to load data in Snowflake

Once again, there are multiple options to consider.

Option 1. Initially we setup a flow that loads the staged CSV files into Snowflake by using the CDC MERGE action which applies INSERTs/UPDATEs/DELETEs in order in which CDC events were originated in the source database.

For each CSV file the flow does the following:

  1. Creates a temporary table in Snowflake.
  2. Executes COPY INTO command to load the file “as is” into the temp table.
  3. Uses Snowflake MERGE command to merge data in the temp table with the data in the actual table.

This approach guarantees the smallest latency but is more resource consuming and requires more Snowflake credits (can be more expensive from the Snowflake standpoint). 

Option 2. Skip Delete events and only apply INSERTs and UPDATEs. It can be a bit faster compared to option 1. 

Option 3. Always INSERT data into the staging tables in Snowflake, then periodically execute a SQL script to populate the actual tables by de-duplicating the staging tables and removing ‘d’ (delete) records. 

This option is very popular when customers are OK with longer delays between data being updated in the source and finally available in the data warehouse for consumption by the BI tools. 

After extensive testing, our customer decided to use option 3. The main reason was the fact that [almost] real-time approach (option 1) uses Snowflake cloud services and consumes extra Snowflake credits.

Step 4. Scheduling extract and load flows

The Extract Flows

In Etlworks, it is possible to schedule a flow to run continuously until there is nothing to do, then stop for a configurable number of seconds and restart. We recommended this schedule type for 35 extract flows.  The extract flows are running until they are automatically stopped to let the system add new databases. The customer set the delay between restarts to 2 hours. 

The load flow

The load flow is loading files into Snowflake in batches. We recommended running it every few minutes so it could clear the queue as often as possible. The customer set it to run every 5 minutes. 

On average the pipelines load tens of millions of records into Snowflake daily, but there are days when the number of records jumps to hundreds of millions. As configured the pipelines can easily handle extracting and loading billions of records a day.

Conclusion

The Etlworks platform enables Snowflake users to leverage the low-impact, real-time change data capture from all major databases: MySQL, Postgres, SQL Server, Oracle, DB2, MongoDB.

Moving data continuously, as new database transactions occur makes it possible for Snowflake users to maintain the real-time data pipelines that feed Snowflake’s fast and flexible storage and analytics solutions.

If you would like a brief demo of CDC to Snowflake, please schedule a demo.

Change Data Capture (CDC) & Why It Matters

change-data-capture

What is change data capture? Does it have any importance or bearing on the work that you do? We’ll answer the first question shortly, but the answer to the second question is most certainly “yes.”

What Is Change Data Capture?

Change data capture (CDC) solves data integration problems by monitoring, inserting, updating, and deleting changes to records or activity on table data.

The first step in this process is to identify that a change has been made to a data source. To do so, a CDC procedure will inspect the relevant table to see if there is a delta between the old values and new values, based on what’s stored in the source systems, then update the pertinent record accordingly.

There are several good methods for doing this — high watermark, diff, and database triggers, for example. However, one of the superior approaches to track data changes is log-based CDC.

While interpreting the changes in the transaction log is difficult, the biggest benefit of log-based change data capture is the asynchronous nature of the CDC: changes are captured independent of the source application performing the changes. With log-based CDC, new database transactions – including inserts, updates, and deletes – are read from source databases’ transaction or redo logs. The changes are captured without making application level changes and without having to scan operational tables, both of which add additional workload and reduce source systems’ performance.

By allowing you to capture and extract only changed data, change data capture eliminates or reduces the need for batch windows, replacing bulk load updates with continuous streaming or incremental loading of data. Change data capture helps you improve efficiency by reducing redundant data replication and delivery, reducing data warehousing costs and facilitating real-time data integration across your data stores.

Why Change Data Capture Matters

There are many advantages of using transaction logs for CDC:

  • This solution is transparent to databases and has minimal impact on the database.
  • Near real-time publishing of the changes to the target system. This means the business can take accurate decisions based on the most current data.
  • No impact on the transactions at the source application, no additional SQL load on the system.
  • Because CDC transfers only incremental changes it reduced the cost of transferring data.
  • This approach maintains the order in which original transactions were committed. This is important when the target application depends on the order of transactions in the source system. The ordering guarantees are most often desired by target applications.
  • The target system can take time to process the messages.

Together these advantages enable the building of streaming data pipelines that help to share application data across a business. This means that businesses are getting fed insights that are up to date and accurate based on the latest data being fed from across many systems. The decisions made from these insights help businesses to remain competitive in their respective markets.

How Etlworks can help

Etlworks is a high-performance data integration platform with the next-generation change data capture technology.

Etlworks supports native log-based change data capture for PostgreSQL, SQL Server, MySQL, Oracle, DB2 and MongoDB databases. Our intuitive visual interface makes it easy to set up, monitor, and manage your data pipelines, eliminating the need for scripting and ensuring quick time-to-value. Unlike other tools that support CDC, there are only two moving parts – your database and Etlworks. You will be up and running with CDC in a matter of minutes!

Learn More about Log-Based CDC with Etlworks for Real-Time Data Replication

Contact Us! 

Etlworks Marketo Integration

etlworks-marketo-data-integration

What is Marketo?

Marketo is a cloud-lead management and marketing solution. The product range of Marketo is provided on a subscription basis and covers Lead Management, Sales Insights, Revenue Cycle Analytics and Social Marketing applications. It helps organizations automate and measure marketing engagement, tasks, and workflows, including those for email, mobile, social, and digital ads.

What is Etlworks?

Etlworks is a cloud-native integration platform helps businesses automate manual data management tasks, ensure data that are far more accurate, accelerate business workflows, and provide greater operational visibility to an organization.

After a few minutes setup, Etlworks replicates all your applications, databases, events and files into a high-performance data warehouse like Snowflake or Amazon Redshift, so that you can then use your favorite BI or analytics tools. Create reports, monitor custom dashboards, and more instantly from the cloud.

Connect Marketo to Anything

Etlworks offers connectivity to Marketo’s APIs enabling you to work with key Marketo entities including Lead, Activity, List, Opportunity, OpportunityRole as well as Custom Objects. Etlworks exposes both the SOAP and REST APIs for Marketo ensuring you can handle any integration task.

Use the Etlworks Marketo connector for data integration between Marketo and your CRM system, such as Salesforce, MS Dynamics, SugarCRM, HubSpot, and NetSuite; collaboration or survey tools; webinar platforms; data services; marketing databases; and more.

Etlworks Marketo connector free you to focus on insights, so your company will be faster and more efficient at optimizing your marketing performance and improving your campaigns’ ROI.

Etlworks partnered with CData to provide access to the Marketo API using industry standard JDBC protocol.

Let’s do it!

Connecting to Marketo

Step 1. Obtaining the OAuthClientId and OAuthClientSecret Values. To obtain the OAuthClientIdand OAuthClientSecret, navigate to the LaunchPoint option on the Admin area. Click the View Details link for the desired service. A window containing the authentication credentials is displayed.

Step 2. Obtaining the REST Endpoint URL. The RESTEndpoint can be found on your Marketo Admin area on the Integration -> Web Services option in the REST API section. Note the Identity Endpoint will not be needed.

Step 3. Enable Marketo connector for your Etlworks account. Contact support@etlworks.com to enable connector.

Step 4. Create a Marketo connection to work with data in Marketo.

Stored Procedures

Stored Procedures are available to complement the data available from the REST Data Model. Sometimes it is necessary to update data available from a view using a stored procedure because the data does not provide for direct, table-like, two-way updates. In these situations, the retrieval of the data is done using the appropriate view or table, while the update is done by calling a stored procedure. Stored procedures take a list of parameters and return back a dataset that contains the collection of tuples that constitute the response.

To call stored procure from the SQL flow or from Before/After SQL use EXEC sp_name params=value syntax. Example:

EXEC SelectEntries ObjectName = 'Account'

Extracting data from Marketo

Note: extracting data from Marketo is similar to extracting data from the relational database.

Step 1. Create a Marketo connection which will be used as a source (FROM).

Step 2. Create a destination connection, for example, a connection to the relational database, and if needed a format (format is not needed if the destination is a database or well-known API).

Step 3. Create a flow where the source is a database and the destination is a connection created in step 2, for example, relational database.

mceclip0

Step 4. Add new source-to-destination transformation.

Step 5. Select Marketo connection created in step 1 as a source connection and select the Marketo object you are extracting data from:mceclip0 (1)

Step 6. Select TO connection, format (if needed) and object (for example database table) to load data into.

mceclip3

Step 7. Click MAPPING and optionally enter Source Query (you don’t need a query if you are extracting data from the Marketo object unconditionally).

Step 8. Optionally define the per-field mapping.

salesforce-mapping (1)

Step 9. Add more transformations if needed.

Loading data in Marketo

Note: loading data in Marketo is similar to loading data into relational database.

Step 1. Create a source connection and a format (if needed).

Step 2. Create destination Marketo connection.

Step 3. Create a flow where the destination is a database.

Step 4. Add new source-to-destination transformation.

Step 5. Select FROM and TO connections and objects (also a FROM format if needed).

mceclip5

Step 6. Optionally define the per-field mapping.

Step 7. Add more transformations if needed.

Browsing data in Marketo

You must have a Marketo connection to browse objects and run SQL queries.

Use Explorer to browse data and metadata in Marketo as well as execute DML and SELECT queries against Marketo connection.

mceclip4

Ready to get started?

Contact Etlworks today to connect your Marketo instance with Etlworks and unlock the ability to read and replicate many of the objects to your data destination.

ETL/ELT all your data into Amazon Redshift DW

amazon_integration

Amazon Redshift is fast, scalable, and easy-to-use, making it a popular data warehouse solution. Redshift is straightforward to query with SQL, efficient for analytical queries and can be a simple add-on for any organization operating its tech stack on AWS.

Amazon Web Services have many benefits. Whether you choose it for the pay as you go pricing, high performance, and speed or its versatile and flexible services provided, we are here to present you the best data loading approaches that work for us.

Etlworks allows users to load your data from cloud storages and APIs, SQL and NoSQL databases, web services to Amazon Redshift data warehouse in a few simple steps. You can configure and schedule the flow using intuitive drag and drop interface and let Etlworks do the rest.

Etlworks supports not just one-time data loading operation. It can help you to integrate your data sources with Amazon Redshift and automate updating your Amazon Redshift with fresh data with no additional effort or involvement!

Today we are going to examine how to load data into Amazon Redshift.

A typical Redshift flow performs the following operations:

  • Extract data from the source.
  • Create CSV files.
  • Compress files using the gzip algorithm.
  • Copy files into Amazon S3 bucket.
  • Check to see if the destination Amazon Redshift table exists, and if it does not – creates the table using metadata from the source.
  • Execute the Amazon Redshift COPY command.
  • Clean up the remaining files.

There are some prerequisites have to be met, before you can design a flow that loads data into Amazon Redshift:

Now, you are ready to create a Redshift flow. Start by opening the Flows window, clicking the + button, and typing redshift into the search field:

redshift-flows

Continue by selecting the flow type, adding source-to-destination transformations and entering the transformation parameters:

redshift-transformation

You can select one of the following sources (FROM) for the Redshift flow:

  • API – use any appropriate string as the source (FROM) name
  • Web Service – use any appropriate string as the source (FROM) name
  • File – use the source file name or a wildcard filename as the source (FROM) name
  • Database – use the table name as the source (FROM) name
  • CDC – use the fully qualified table name as the source (FROM) name
  • Queue – use the queue topic name as the source (FROM) name

For most of the Redshift flows, the destination (TO) is going to be Amazon S3 connection. To configure the final destination, click the Connections tab and select the available Amazon Redshift connection.

redshift-connection

Amazon Redshift can load data from CSVJSON, and Avro formats but Etlwoks supports loading only from CSV so you will need to create a new CSV format and set it as a destination format. If you are loading large datasets into Amazon Redshift, consider configuring a format to split the document into smaller files. Amazon Redshift can load files in parallel, also transferring smaller files over the network can be faster.

If necessary, you can create a mapping  between the source and destination (Redshift) fields.

Mapping is not required, but please remember that if a source field name is not supported by Redshift, it will return an error and the data will not be loaded into the database. For example, if you are loading data from Google Analytics, the output (source) is going to include fields with the prefix ga: ( ga:user, ga:browser, etc. ). Unfortunately, Amazon Redshift does not support fields with a : , so the data will be rejected. If that happens, you can use mapping to rename the destination fields.

ELT for Amazon Redshift

Amazon Redshift provides affordable and nearly unlimited computing power which allows loading data to Amazon Redshift as-is, without pre-aggregation, and processing and transforming all the data quickly when executing analytics queries. Thus, the ETL (Extract-Transform-Load) approach transforms to ELT (Extract-Load-Transform). This may simplify data loading to Amazon Redshift greatly, as you don’t need to think about the necessary transformations.

Etlworks supports executing complex ELT scripts directly into Amazon Redshift which greatly improves performance and reliability of the data injection.

I hope this has been helpful. Go forth and load large amounts of data.

Data Replication Methods

database-replication

Data replication takes data from your source databases — Oracle, MySQL, Microsoft SQL Server, PostgreSQL, MongoDB, etc. — and copies it into your destination data warehouse. After you have identified the data you want to bring in, you need to determine the best way to replicate the data so it meets your business needs.

Choosing the right method

The method you choose impacts the end state of your data. Fortunately, there are data replication methods built to integrate with today’s data warehouses and suit a variety of use cases. At Etlworks, we believe in providing users with as much flexibility as possible. Let’s discuss each of the five methods of data replication and outline the option that may be best for you.

High Watermark (HWM)

The concept of Watermark refers to a flood after-match in which you look at the water stains in a wall to figure how high the water got, which is pretty much what we want to do: figure out which was the last item we updated and move from there on. Therefore, Watermark is a tool to simplify querying for updated objects, which is a very common use case when synchronizing data.

Pros:

  • fast
  • works for all data sources, including all databases, files, and APIs

Cons:

  • does not support deletes
  • requires a dedicated high watermark field in each table

Change Data Capture (CDC)

CDC is an approach to data integration that is based on the identification, capture, and delivery of the changes made to the source database and stored in the database ‘redo log’, also called ‘transaction log’. CDC or Log Replication is the fastest and most reliable way to replicate. It involves querying your database’s internal change log every few seconds, copying the changes into the data warehouse, and incorporating them frequently. CDC is the best method for databases that are being updated continually and fully supports deletes.

Pros:

  • fast
  • no polling from database tables – uses database redo log instead
  • supports deletes
  • enables near real-time replication

Cons:

  • currently supports only Postgres, MySQL, SQL Server, and Oracle
  • some older versions of the databases above do not support CDC
  • requires extra setup in the source database

Database Triggers

Trigger-based change replication can be implemented in many ways but the basic idea is that each table, which participates in a change replication as a source, has triggers for INSERT, UPDATE, and optionally DELETE. The triggers update the shadow table (or tables). The shadow tables may store the entire row to keep track of every single column change, or only the primary key is stored as well as the operation type (insert, update or delete).

Pros:

  • works for any source database which has triggers
  • no extra requirements for the specific version of the database or extra field in each table

Cons:

  • requires adding triggers to all database tables
  • triggers can negatively impact performance

Real-time CDC with Kafka

Apache Kafka is a popular technology to share data between systems and build applications that respond to data events. Etlworks completes Apache Kafka solutions by delivering high-performance real-time data integration.

Etlworks parses the CDC events emitted to the Kafka topic, automatically transforms events to the DML SQL statements (INSERT/UPDATE/DELETE), and executes SQL statements in the target database in the order they were created. It also handles the collisions and errors, ensuring that the solution is 100% reliable.

Pros:

  • fast
  • no polling from database tables
  • supports deletes
  • supports real-time replication

Cons:

  • complex setup (requires Kafka, Zookeeper, Kafka Connect, and Debezium)
  • supports only Postgres, MySQL, SQL Server, Oracle, and MongoDB
  • some older versions of the databases above do not support CDC
  • requires extra setup in the source database

Full refresh

Sometimes the simplest approach is the best. Full refresh replication method is best for small tables, static data, or one-time imports. Because it takes time to perform the full refresh, it’s a typically slower method than the others.

Pros:

  • the simplest to setup
  • can be quite fast for the relatively small datasets (<100K records)
  • works for all data sources

Cons:

  • not recommended for large datasets

Want to learn more about our replication options and what’s best for your data? Talk to us!

%d bloggers like this: