跳转至

Flink MySQL CDC

package com.atguigu.source;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MySQL_CDC_Demo {

    public static void main(String[] args) throws Exception {
        // 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        /**
         * 表需要有主键,否则会报错: Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys
         CREATE TABLE `ws` (
         `id` varchar(20) COLLATE utf8mb4_general_ci NOT NULL,
         `ts` int DEFAULT NULL,
         `vc` bigint DEFAULT NULL,
         PRIMARY KEY (`id`)
         ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
         */
        // 使用 MySQL CDC 连接器构建数据源
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("127.0.0.1")                  // MySQL 主机名
                .port(3306)                             // MySQL 端口号
                .databaseList("demo")                // 监听的数据库名称
                .tableList("demo.ws")        // 监听的表名
                .username("root")                       // MySQL 用户名
                .password("123456")                   // MySQL 密码
                .deserializer(new StringDebeziumDeserializationSchema())  // 反序列化数据为 JSON 格式
                .startupOptions(StartupOptions.initial()) // 从 binlog 开始读取
                .build();

        // 添加源到数据流
        DataStreamSource<String> mysqlStream = env.fromSource(
                mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "MySQL Source");

        // 处理并打印数据
        mysqlStream.print();

        // 启动作业
        env.execute("Flink MySQL CDC Example");
    }
}