Flink MySQL CDC
package com.atguigu.source;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MySQL_CDC_Demo {
public static void main(String[] args) throws Exception {
// 创建 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
/**
* 表需要有主键,否则会报错: Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys
CREATE TABLE `ws` (
`id` varchar(20) COLLATE utf8mb4_general_ci NOT NULL,
`ts` int DEFAULT NULL,
`vc` bigint DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
*/
// 使用 MySQL CDC 连接器构建数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("127.0.0.1") // MySQL 主机名
.port(3306) // MySQL 端口号
.databaseList("demo") // 监听的数据库名称
.tableList("demo.ws") // 监听的表名
.username("root") // MySQL 用户名
.password("123456") // MySQL 密码
.deserializer(new StringDebeziumDeserializationSchema()) // 反序列化数据为 JSON 格式
.startupOptions(StartupOptions.initial()) // 从 binlog 开始读取
.build();
// 添加源到数据流
DataStreamSource<String> mysqlStream = env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MySQL Source");
// 处理并打印数据
mysqlStream.print();
// 启动作业
env.execute("Flink MySQL CDC Example");
}
}