Demo: PolarDB-X CDC to Elasticsearch¶
This tutorial is to show how to quickly build streaming ETL for PolarDB-X with Flink CDC.
Assuming we are running an e-commerce business. The product and order data stored in PolarDB-X. We want to enrich the orders using the product table, and then load the enriched orders to ElasticSearch in real time.
In the following sections, we will describe how to use Flink PolarDB-X CDC to implement it. All exercises in this tutorial are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation.
Prepare a Linux or MacOS computer with Docker installed.
Starting components required¶
The components required in this demo are all managed in containers, so we will use
docker-compose to start them.
docker-compose.yml file using following contents:
version: '2.1' services: polardbx: polardbx: image: polardbx/polardb-x:2.0.1 container_name: polardbx ports: - "8527:8527" elasticsearch: image: 'elastic/elasticsearch:7.6.0' container_name: elasticsearch environment: - cluster.name=docker-cluster - bootstrap.memory_lock=true - ES_JAVA_OPTS=-Xms512m -Xmx512m - discovery.type=single-node ports: - '9200:9200' - '9300:9300' ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 kibana: image: 'elastic/kibana:7.6.0' container_name: kibana ports: - '5601:5601' volumes: - '/var/run/docker.sock:/var/run/docker.sock'
The Docker Compose environment consists of the following containers:
orderstables will be store in the database. They will be joined enrich the orders.
Elasticsearch: mainly used as a data sink to store enriched orders.
Kibana: used to visualize the data in Elasticsearch.
To start all containers, run the following command in the directory that contains the
docker-compose up -d
This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly. We can also visit http://localhost:5601/ to see if Kibana is running normally.
Preparing data in databases¶
Preparing data in PolarDB-X¶
Enter PolarDB-X Database:
mysql -h127.0.0.1 -P8527 -upolardbx_root -p"123456"
Create tables and populate data:
-- PolarDB-X CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ) AUTO_INCREMENT = 101; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"spare tire","24 inch spare tire"); CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL -- Whether order has been placed ) AUTO_INCREMENT = 10001; INSERT INTO orders VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
Enriching orders and load to ElasticSearch¶
Use Flink SQL to join the
order table with the
products table to enrich orders and write to the Elasticsearch.
-- Flink SQL Flink SQL> INSERT INTO enriched_orders SELECT o.order_id, o.order_date, o.customer_name, o.price, o.product_id, o.order_status, p.name, p.description FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id;
Now, the enriched orders should be shown in Kibana.
Visit http://localhost:5601/app/kibana#/management/kibana/index_pattern to create an index pattern
Visit http://localhost:5601/app/kibana#/discover to find the enriched orders.
Next, do some change in the databases, and then the enriched orders shown in Kibana will be updated after each step in real time.
Insert a new order in PolarDB-X
--PolarDB-X INSERT INTO orders VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
Update the order status in PolarDB-X
--PolarDB-X UPDATE orders SET order_status = true WHERE order_id = 10004;
Delete the order in PolarDB-X
--PolarDB-X DELETE FROM orders WHERE order_id = 10004;
The changes of enriched orders in Kibana are as follows:
After finishing the tutorial, run the following command to stop all containers in the directory of
Run the following command to stop the Flink cluster in the directory of Flink