从 Kafka Source 到 MySQL Sink
package com.atguigu.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Kafka2MySQL {
public static void main(String[] args) throws Exception {
// 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 Table API 执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//
// 创建 Kafka Source 表
String sourceDDL = "CREATE TABLE source_table (" +
" id INT, " +
" name STRING, " +
" age INT " +
") WITH (" +
" 'connector' = 'kafka', " +
" 'topic' = 'json-topic', " +
" 'properties.bootstrap.servers' = '192.168.1.7:9092', " +
" 'properties.group.id' = 'kafka2mysql', " +
" 'format' = 'json', " +
// " 'scan.startup.mode' = 'earliest-offset'" +
" 'scan.startup.mode' = 'latest-offset'" +
")";
tableEnv.executeSql(sourceDDL);
TableResult tableResult = tableEnv.executeSql("SELECT * FROM source_table");
// 创建 MySQL Sink 表
/*
CREATE TABLE `output_table` (
`id` int NOT NULL,
`name` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`age` int NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
*/
String sinkDDL = "CREATE TABLE sink_table (" +
" id INT, " +
" name STRING, " +
" age INT " +
") WITH (" +
" 'connector' = 'jdbc', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' = 'jdbc:mysql://localhost:3306/demo', " +
" 'table-name' = 'output_table', " +
" 'username' = 'root', " +
" 'password' = '123456'" +
")";
tableEnv.executeSql(sinkDDL);
// 使用 SQL 将数据从 Source 表写入到 Sink 表
tableEnv.executeSql("INSERT INTO sink_table SELECT id, name, age FROM source_table");
tableResult.print();
}
}