Stream your database change -CDC(Change Data Capture)

We are in the era of “Data”. No matter what departments you are in within an organisation, from sales, marketing to development, machine learning…… Data is everywhere. Concept of data warehousing(DW) has been invented for decades, to assemble all the data into one single point of place.

Moving data from a database say MySQL to another place(DW) is a very general problem in terms of DW. Batch load with time schedule is the most common way of doing it. However, as data size of a database table might be huge, and batch load might make the data in DW not as updated as possible. Streaming the changes of database in real time has become a REAL need.

In this article, we will go though an end to end change data capture(CDC) from MySQL to PostgreSQL with streaming via Kafka. Tech we will use:

  • MySQL
  • PostgreSQL
  • Kafka
  • StreamSets Data Collector

StreamSets Data Collector is an enterprise grade, open source, continuous big data ingestion infrastructure. It will make the data pipe building very easy.

1. Kafka

Let us set up the Kafka streaming pipe first. With Docker, this could never be easier.

Create a docker-compose.yml file as follows:

version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: <your IP>
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

<your IP>, just filled in the ip you can find via command

ipconfig(ifconfig for Mac), go to “en0” -> “inet”

Then we just need to run the following command to start up our Kafka pipe.

docker-compose up -d --build

You can use the following command to test whether it is up successfully:

echo dump | nc localhost 2181 | grep brokers

In our case, you will see:

/brokers/ids/1002

It means our Kafka brokers are up and running

2. MySQL and PostgreSQL

To start up SQL databases, it would be very easy to use cloud platforms. In our case, we use Google Cloud Platform(GCP), just few buttons clicking, you get your databases up and running.

mysql and postgresql on gcp

For database and table, we create database called “testdb” for both instances. Table named “test” within the testdb. You can create your own.

create table test (
id int not null primary key,
name varchar(50) not null
)

3. StreamSets Data Collector

With Docker, we can easily set up the StreamSets Data Collector.

docker run --restart on-failure -p 18630:18630 -d --name sdc streamsets/datacollector
streamsets

Default username and password are both “admin”

Now, let us start building our pipelines. Since we are using streaming, there will be 2 pipelines, one is producer, the other is consumer.

Producer

CDC-producer pipeline

As we can see above, the first producer pipeline is pretty simple. First we get the MySQL BinaryLog from our source MySQL database, BinaryLog is the most common way to do CDC for MySQL. Then we put the BinaryLog into a Kafka topic.

mysql bin log setting

Above is the MySQL Binary Log setting, make sure “External Libraries” you have uploaded the MySQL driver jar. And “Include Tables” you input the table you want to sync. Other setting is very straightforward.

Above is the Kafka producer setting, to use Kafka components, you have to install the Kafka libraries via “Streamsets Package Manager”. Setting is very easy, put in the IP you set inside the docker-compose.yml file, and input the Topic name, in our case is “mysql_cdc_log”. Data format just use “SDC Record”.

Consumer

As we can see above, the consumer pipeline gets a bit more complex, the Streamsets “JDBC Producer” cannot deal with the MySQL BinaryLog directly very well, that is why we split the “Delete” and “Insert/Update”. It is very easy to get the useful information out of the raw log for these 2 categories.

kafka consumer setting

Kafka consumer setting is simple, need to give a name for the consumer group. Data format is also “SDC Record”.

set table name setting

Table name of MySQL BinaryLog can be dynamic, so table name can be retrieved from the log.

choose operation setting

Since we need to split the “Delete” and “Insert/Update”, we choose based on the sdc.operation.type, for detail type mapping, can refer to Streamsets website.

For “Delete”, we get data from the “oldData” field. For “Insert/Update”, we get data from the “Data” field directly.

write postgresql setting

Final step is to write the destination database PostgreSQL. Choose the “Change Log Format” to be “MySQL Binary Log”. “Default Operation” we select “INSERT”.

4. Testing

Since we have set up all our pipelines, let us test the CDC features now.

At the beginning, both “test” table in source DB and target DB are empty. Now we insert one record to source DB.

test insert one record

As we can see above, the record got synced perfectly. It works!

Then we update this record name from “Peter” to “Tom”.

cec test update

As we can see above, the record got synced perfectly. It works!

Then we delete this record.

cdc test delete

As we can see above, the record got synced perfectly. It works!

In this article, we walked through how to use open source tools to conduct change data capture(CDC) streaming. Streamsets can certainly do batch ETL. With streaming/batch operations, you can easily sync different data sources into one single data warehouse. Which is the very important step to unblock next tasks like data analysis, visualisation, machine learning etc.

Streamsets also has rich support for different cloud products, please check their website for further information.

Hope you find this article useful, thank you all.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store