跳转至

从 Kafka Source 到 MySQL Sink

package com.atguigu.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Kafka2MySQL {

    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建 Table API 执行环境
         EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//
        // 创建 Kafka Source 表
        String sourceDDL = "CREATE TABLE source_table (" +
                " id INT, " +
                " name STRING, " +
                " age INT " +
                ") WITH (" +
                " 'connector' = 'kafka', " +
                " 'topic' = 'json-topic', " +
                " 'properties.bootstrap.servers' = '192.168.1.7:9092', " +
                " 'properties.group.id' = 'kafka2mysql', " +
                " 'format' = 'json', " +
               // " 'scan.startup.mode' = 'earliest-offset'" +
                " 'scan.startup.mode' = 'latest-offset'" +
                ")";

        tableEnv.executeSql(sourceDDL);
        TableResult tableResult = tableEnv.executeSql("SELECT * FROM source_table");

        // 创建 MySQL Sink 表
        /*
CREATE TABLE `output_table` (
  `id` int NOT NULL,
  `name` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
  `age` int NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
        */
        String sinkDDL = "CREATE TABLE sink_table (" +
                " id INT, " +
                " name STRING, " +
                " age INT " +
                ") WITH (" +
                " 'connector' = 'jdbc', " +
                " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
                " 'url' = 'jdbc:mysql://localhost:3306/demo', " +
                " 'table-name' = 'output_table', " +
                " 'username' = 'root', " +
                " 'password' = '123456'" +
                ")";

        tableEnv.executeSql(sinkDDL);

        // 使用 SQL 将数据从 Source 表写入到 Sink 表
        tableEnv.executeSql("INSERT INTO sink_table SELECT id, name, age FROM source_table");

        tableResult.print();
    }
}