Streaming write data from MySQL to Iceberg using the following Flink SQL:
-- Flink SQL
Flink SQL> INSERT INTO all_users_sink select * from user_source;
It will start a streaming job which will synchronize historical and incremental data from MySQL to Iceberg continuously.
The running job can be found in Flink UI, and it looks like:
Then, we can use the following command to see the files written to Iceberg:
docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/
It should look like:
The actual files may differ in your environment, but the structure of the directory should be similar.
Make some changes in the MySQL databases, and then the data in Iceberg table
all_users_sink will also change in real time.
(3.1) Insert a new user in table
INSERT INTO db_1.user_1 VALUES (111,"user_111","Shanghai","123567891234","email@example.com");
(3.2) Update a user in table
UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;
(3.3) Delete a user in table
DELETE FROM db_2.user_2 WHERE id=220;
After executing each step, we can query the table
SELECT * FROM all_users_sink in Flink SQL CLI to see the changes.
The final query result is as follows:
From the latest result in Iceberg, we can see that there is a new record of
(db_1, user_1, 111), and the address of
(db_1, user_2, 120) has been updated to
Besides, the record of
(db_2, user_2, 220) has been deleted. The result is exactly the same with the changes we did in MySQL.