Real-World Change Data Capture Example

Change data capture (CDC) is a necessity. With CDC changes are synced instantly or near-instantly. In practice, CDC is often used to replicate data between databases in real-time. Essentially, CDC it is a perfect solution for non-intrusive, low-impact real-time data integration.

I will illustrate how the Etlworks change data capture solution works for eLearning company that needs to load data from 1600+ MySQL databases into Snowflake data warehouse. Thanks to Etlworks, company now can make everyday operations data available to non-technical business users. Data warehouse in Snowflake is updated every 5 minutes so users can gain quick, current business insights.

The company requirements

  • Setup data pipeline to load incremental updates from 1600+ MySQL databases into the Snowflake data warehouse.
  • The data pipeline must support INSERTs, UPDATEs, and DELETEs.
  • The data pipeline must be able to automatically pickup new databases and adjust the destination schema if new columns are added to the source tables.
  • The expected volume of data: hundreds of gigabytes, billions of records on initial load, tens of millions updates every day. 
  • The expected number of tables across all MySQL databases: 55000.
  • The number of tables in Snowflake: 35
  • The data pipeline must be extremely resilient to the extract and load errors.
  • The data pipeline is expected to work in a fully automated mode.

Etlworks Solution

Step 1: Setting up MySQL read replica instance in RDS

Etlworks team recommended to stream CDC events from the MySQL read replica.  Our customer setup the MySQL instance in Amazon RDS and configured native MySQL replication from the production instance to the replica. Creating a read replica is an optional step, but it is significantly lessen a load of replication on your MySQL production instance.

The following permissions need to be configured for the MySQL user in the read replica MySQL instance.

PermissionDescription
SELECTIt enables the connector to select rows from tables in databases.
RELOADIt enables the connector the use of the FLUSH statement to clear or reload internal caches, flush tables, or acquire locks.
SHOW DATABASESIt enables the connector to see database names by issuing the SHOW DATABASE statement.
REPLICATION SLAVEIt enables the connector to connect to and read the MySQL server binlog.
REPLICATION CLIENTIt enables the connector the use of following statements: SHOW MASTER STATUS, SHOW SLAVE STATUS, and SHOW BINARY LOGS
LOCK_TABLES Amazon RDS or Amazon Aurora that do not allow a global read lock, table-level locks are used to create a consistent snapshot

The binlog replication for MySQL read replica instance needs to be enabled by setting the following Amazon RDC parameters:

binlog_format: ROW
log_bin_use_v1_row_events: 1
net_read_timeout: 3600
net_write_timeout: 3600
wait_timeout: 86400
Step 2: Setting up flows to extract data from MySQL using CDC

In Etlworks there are two options for extracting data using CDC and loading into the Snowflake:

  1. Create a separate flow to extract data from the database and create CSV files in the local storage and another flow to load CSV files into the Snowflake.
  2. Create a separate flow to extract data from the database and ingest into the messaging queue, such as Kafka.  Create another flow to load data from the queue into the Snowflake.

For this project, we decided to use option 1. Option 2 requires a complicated setup with a separate message queue such as Kafka or Azure Event Hubs.

Typical CDC flow can extract data from multiple tables in multiple databases but having a single flow pulling data from 55000+ tables would be a major bottleneck as it would be limited to a single blocking queue with a limited capacity. It would also create a single point of failure.

The better approach would be to create multiple parallel extract flows, each pulling data from all 35 Snowflake tables in a single database. However, considering that the flow must extract data from 1600 MySQL databases it would be impractical and very resource demanding – we would have to run 1600+ parallel extract flows. 

We decided to create 35 extract flows where each flow is pulling data from a single table in 1600 databases

There are other possible topologies to consider as well, for example:

  • group tables and databases alphabetically;
  • create separate extract flows for the large and high-traffic tables;
  • for the rest of the tables split flows in chunks, each extracting data from a significant number of tables across multiple databases (for example 1000) ;

We recommend selecting the topology that works the best for you, keeping in mind the performance and the maintenance overhead. 

Considering the design choice, we created a table in the Snowflake which has a list of databases to extract data from. This table is used to automatically populate the list of the included tables in a format: database1.table_abc,database2.table_abc,...database1500.table_abc and included databases in a format database1,database2,...database1500.

The basic idea is that we can use a single MySQL CDC connection where the Included Databases and Included Tables are set as {tokens}, populated at run-time by JavaScript.

Step 3: Setting up flows to load data in Snowflake

Once again, there are multiple options to consider.

Option 1. Initially we setup a flow that loads the staged CSV files into Snowflake by using the CDC MERGE action which applies INSERTs/UPDATEs/DELETEs in order in which CDC events were originated in the source database.

For each CSV file the flow does the following:

  1. Creates a temporary table in Snowflake.
  2. Executes COPY INTO command to load the file “as is” into the temp table.
  3. Uses Snowflake MERGE command to merge data in the temp table with the data in the actual table.

This approach guarantees the smallest latency but is more resource consuming and requires more Snowflake credits (can be more expensive from the Snowflake standpoint). 

Option 2. Skip Delete events and only apply INSERTs and UPDATEs. It can be a bit faster compared to option 1. 

Option 3. Always INSERT data into the staging tables in Snowflake, then periodically execute a SQL script to populate the actual tables by de-duplicating the staging tables and removing ‘d’ (delete) records. 

This option is very popular when customers are OK with longer delays between data being updated in the source and finally available in the data warehouse for consumption by the BI tools. 

After extensive testing, our customer decided to use option 3. The main reason was the fact that [almost] real-time approach (option 1) uses Snowflake cloud services and consumes extra Snowflake credits.

Step 4. Scheduling extract and load flows

The Extract Flows

In Etlworks, it is possible to schedule a flow to run continuously until there is nothing to do, then stop for a configurable number of seconds and restart. We recommended this schedule type for 35 extract flows.  The extract flows are running until they are automatically stopped to let the system add new databases. The customer set the delay between restarts to 2 hours. 

The load flow

The load flow is loading files into Snowflake in batches. We recommended running it every few minutes so it could clear the queue as often as possible. The customer set it to run every 5 minutes. 

On average the pipelines load tens of millions of records into Snowflake daily, but there are days when the number of records jumps to hundreds of millions. As configured the pipelines can easily handle extracting and loading billions of records a day.

Conclusion

The Etlworks platform enables Snowflake users to leverage the low-impact, real-time change data capture from all major databases: MySQL, Postgres, SQL Server, Oracle, DB2, MongoDB.

Moving data continuously, as new database transactions occur makes it possible for Snowflake users to maintain the real-time data pipelines that feed Snowflake’s fast and flexible storage and analytics solutions.

If you would like a brief demo of CDC to Snowflake, please schedule a demo.

Etlworks Marketo Integration

etlworks-marketo-data-integration

What is Marketo?

Marketo is a cloud-lead management and marketing solution. The product range of Marketo is provided on a subscription basis and covers Lead Management, Sales Insights, Revenue Cycle Analytics and Social Marketing applications. It helps organizations automate and measure marketing engagement, tasks, and workflows, including those for email, mobile, social, and digital ads.

What is Etlworks?

Etlworks is a cloud-native integration platform helps businesses automate manual data management tasks, ensure data that are far more accurate, accelerate business workflows, and provide greater operational visibility to an organization.

After a few minutes setup, Etlworks replicates all your applications, databases, events and files into a high-performance data warehouse like Snowflake or Amazon Redshift, so that you can then use your favorite BI or analytics tools. Create reports, monitor custom dashboards, and more instantly from the cloud.

Connect Marketo to Anything

Etlworks offers connectivity to Marketo’s APIs enabling you to work with key Marketo entities including Lead, Activity, List, Opportunity, OpportunityRole as well as Custom Objects. Etlworks exposes both the SOAP and REST APIs for Marketo ensuring you can handle any integration task.

Use the Etlworks Marketo connector for data integration between Marketo and your CRM system, such as Salesforce, MS Dynamics, SugarCRM, HubSpot, and NetSuite; collaboration or survey tools; webinar platforms; data services; marketing databases; and more.

Etlworks Marketo connector free you to focus on insights, so your company will be faster and more efficient at optimizing your marketing performance and improving your campaigns’ ROI.

Etlworks partnered with CData to provide access to the Marketo API using industry standard JDBC protocol.

Let’s do it!

Connecting to Marketo

Step 1. Obtaining the OAuthClientId and OAuthClientSecret Values. To obtain the OAuthClientIdand OAuthClientSecret, navigate to the LaunchPoint option on the Admin area. Click the View Details link for the desired service. A window containing the authentication credentials is displayed.

Step 2. Obtaining the REST Endpoint URL. The RESTEndpoint can be found on your Marketo Admin area on the Integration -> Web Services option in the REST API section. Note the Identity Endpoint will not be needed.

Step 3. Enable Marketo connector for your Etlworks account. Contact support@etlworks.com to enable connector.

Step 4. Create a Marketo connection to work with data in Marketo.

Stored Procedures

Stored Procedures are available to complement the data available from the REST Data Model. Sometimes it is necessary to update data available from a view using a stored procedure because the data does not provide for direct, table-like, two-way updates. In these situations, the retrieval of the data is done using the appropriate view or table, while the update is done by calling a stored procedure. Stored procedures take a list of parameters and return back a dataset that contains the collection of tuples that constitute the response.

To call stored procure from the SQL flow or from Before/After SQL use EXEC sp_name params=value syntax. Example:

EXEC SelectEntries ObjectName = 'Account'

Extracting data from Marketo

Note: extracting data from Marketo is similar to extracting data from the relational database.

Step 1. Create a Marketo connection which will be used as a source (FROM).

Step 2. Create a destination connection, for example, a connection to the relational database, and if needed a format (format is not needed if the destination is a database or well-known API).

Step 3. Create a flow where the source is a database and the destination is a connection created in step 2, for example, relational database.

mceclip0

Step 4. Add new source-to-destination transformation.

Step 5. Select Marketo connection created in step 1 as a source connection and select the Marketo object you are extracting data from:mceclip0 (1)

Step 6. Select TO connection, format (if needed) and object (for example database table) to load data into.

mceclip3

Step 7. Click MAPPING and optionally enter Source Query (you don’t need a query if you are extracting data from the Marketo object unconditionally).

Step 8. Optionally define the per-field mapping.

salesforce-mapping (1)

Step 9. Add more transformations if needed.

Loading data in Marketo

Note: loading data in Marketo is similar to loading data into relational database.

Step 1. Create a source connection and a format (if needed).

Step 2. Create destination Marketo connection.

Step 3. Create a flow where the destination is a database.

Step 4. Add new source-to-destination transformation.

Step 5. Select FROM and TO connections and objects (also a FROM format if needed).

mceclip5

Step 6. Optionally define the per-field mapping.

Step 7. Add more transformations if needed.

Browsing data in Marketo

You must have a Marketo connection to browse objects and run SQL queries.

Use Explorer to browse data and metadata in Marketo as well as execute DML and SELECT queries against Marketo connection.

mceclip4

Ready to get started?

Contact Etlworks today to connect your Marketo instance with Etlworks and unlock the ability to read and replicate many of the objects to your data destination.

Loading data in Snowflake

etlworks-snowflake

In this blog post, I will be talking about building a reliable data injection pipeline for Snowflake.

Snowflake is a data warehouse built for the cloud. It works across multiple clouds and combines the power of data warehousing, the flexibility of big data platforms, and the elasticity of the cloud.

Based on the Snowflake documentation, loading data is a two-step process:
  1. Upload (i.e. stage) one or more data files into either an internal stage (i.e. within Snowflake) or an external location.
  2. Use the COPY INTO command to load the contents of the staged file(s) into a Snowflake database table.

It is obvious that one step is missing: preparing data files to be loaded in Snowflake.

If steps 1-3 do not look complicated to you, let’s add more details.

Typically, developers that are tasked with loading data into any data warehouse dealing with the following issues:

  • How to build a reliable injection pipeline, which loads hundreds of millions of records every day.
  • How to load only recent changes (incremental replication).
  • How to transform data before loading into the data warehouse.
  • How to transform data after loading into the data warehouse.
  • How to deal with changed metadata (table structure) in both the source and in the destination.
  • How to load data from nested datasets, typically returned by the web services (in addition to loading data from the relational databases).

This is just a short list of hand-picked problems. The good news is that Snowflake is built from the ground up to help with bulk-loading data, thanks to the very robust COPY INTO command, and continues-loading using Snowpipe.

Any Snowflake injection pipeline should at least be utilizing the COPY INTO command and, possibly Snowpipe.

The simplest ETL process that loads data into the Snowflake will look like this:
  1. Extract data from the source and create CSV (also JSON, XML, and other formats) data files.
  2. Archive files using gz compression algorithm.
  3. Copy data files into the Snowflake stage in Amazon S3 bucket (also Azure blob and local file system).
  4. Execute COPY INTO command using a wildcard file mask to load data into the Snowflake table.
  5. Repeat 1-4 for multiple data sources. Injection ends here.
  6. If needed, execute SQL statements in Snowflake database to transform data. For example, populate dimensions from the staging tables.
The part where you need to build a “reliable data injection pipeline” typically includes:
  • Performance considerations and data streaming.
  • Error-handling and retries.
  • Notifications on success and failure.
  • Reliability when moving files to the staging area in S3 or Azure.

COPY INTO command can load data from the files archived using gz compression algorithm. So, it would make sense to archive all the data files before copying or moving them to the staging area.

  • Cleaning up: what to do with all these data files after they have been loaded (or not loaded) into the Snowflake.
  • Dealing with changing table structure in the source and in the destination.

Snowflake supports transforming data while loading it into a table using the COPY INTO <table> command but it will not allow you to load data with inconsistent structure.

Add the need to handle incremental updates in the source (change replication) and you got yourself a [relatively] complicated project at hands.

As always, there are two options:
  1. Develop home-grown ETL using a combination of scripts and in-house tools.
  2. Develop solution using third-party ETL tool or service.

Assuming that you are ready to choose option 2 (if not, go to paragraph one), let’s discuss

The requirements for the right ETL tool for the job

When selecting the ETL tool or service the questions you should be asking yourself are:

  • How much are you willing to invest in learning?
  • Do you prefer the code-first or the drag&drop approach?
  • Do you need to extract data from the semi-structured and unstructured data sources (typically web services) or all your data is in the relational database?
  • Are you looking for point-to-point integration between well-known data sources (for example, Salesforce->Snowflake ) with the minimum customization, or you need to build a custom integration?
  • Do you need your tool to support change replication?
  • How about real-time or almost real-time ETL?
  • Are you looking for a hosted and managed service, running in the cloud or on-premise solution?
Why Etlworks is the best tool for loading data in Snowflake?

First, just like Snowflake, Etlworks is a cloud-first data integration service. It works perfectly well when installed on-premise, but it really shines in the cloud. When subscribing to the service, you can choose the region that is closest to your Snowflake instance which will make all the difference as far as the fast data load is concerned. Also, you won’t have to worry about managing the service.

Second, in Etlwoks you can build even the most complicated data integration flows and transformations using simple drag&drop interface. No need to learn a new language and no complicated build-test-deploy process.

Third, if you are dealing with heterogeneous data sources, web services, semi-structured or unstructured data, or transformations which go beyond the simple point-to-point, pre-baked integrations  – you are probably limited to just a few tools. Etlworks is one of them.

Last but not least, if you need your tool to support a native change (incremental) replication from relational databases or web services, Etlworks can handle this as well. No programming required.  And it is fast.

How it works

In Etlworks, you can choose from several highly configurable data integration flows, optimized for Snowflake:

  • Extract data from databases and load in Snowflake.
  • Extract data from data objects (including web services) and load in Snowflake.
  • Extract data from well-known APIs (such as Google Analytics) and load in Snowflake.
  • Load existing files in Snowflake.
  • Execute any SQL statement or multiple SQL statements.

Behind the scene, the flows perform complicated transformations and create data files for Snowflake, archive files using gz algorithm before copying to the Snowflake stage in the cloud or in the server storage, automatically create and execute COPY INTO <table> command, and much more. For example, the flow can automatically create a table in Snowflake if it does not exist, or it can purge the data files in case of error (Snowflake can automatically purge the file in case of success).

You can find the actual, step-by-step instructions on how to build Snowflake data integration flows in Etlworks in our documentation.

The extra bonus is that in Etlworks you can connect to the Snowflake database, discover the schemas, tables, and columns, run SQL queries, and share queries with the team. All without ever using Snowflake SQL workbench.  Even better – you can connect to all your data sources and destinations, regardless of the format and location to discover the data and the metadata. Learn more about Etlworks Explorer.