从 DataGen Source 到 Print Sink
package com.atguigu.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class DataGen2Print {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 1.创建表环境
// 1.1 写法一:
// EnvironmentSettings settings = EnvironmentSettings.newInstance()
// .inStreamingMode()
// .build();
// StreamTableEnvironment tableEnv = TableEnvironment.create(settings);
// 1.2 写法二
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2.创建表
tableEnv.executeSql("CREATE TABLE source ( \n" +
" id INT, \n" +
" ts BIGINT, \n" +
" vc INT\n" +
") WITH ( \n" +
" 'connector' = 'datagen', \n" +
" 'rows-per-second'='1', \n" +
" 'fields.id.kind'='random', \n" +
" 'fields.id.min'='1', \n" +
" 'fields.id.max'='10', \n" +
" 'fields.ts.kind'='sequence', \n" +
" 'fields.ts.start'='1', \n" +
" 'fields.ts.end'='1000000', \n" +
" 'fields.vc.kind'='random', \n" +
" 'fields.vc.min'='1', \n" +
" 'fields.vc.max'='100'\n" +
");\n");
tableEnv.executeSql("CREATE TABLE sink (\n" +
" id INT, \n" +
" sumVC INT \n" +
") WITH (\n" +
"'connector' = 'print'\n" +
");\n");
// TODO 3.执行查询
// 3.1 使用sql进行查询
// Table table = tableEnv.sqlQuery("select id,sum(vc) as sumVC from source where id>5 group by id ;");
// 把table对象,注册成表名
// tableEnv.createTemporaryView("tmp", table);
// tableEnv.sqlQuery("select * from tmp where id > 7");
// 3.2 用table api来查询
Table source = tableEnv.from("source");
Table result = source
.where($("id").isGreater(5))
.groupBy($("id"))
.aggregate($("vc").sum().as("sumVC"))
.select($("id"), $("sumVC"));
// TODO 4.输出表
// 4.1 sql用法
// tableEnv.executeSql("insert into sink select * from tmp");
// 4.2 tableapi用法
result.executeInsert("sink");
}
}