TableStream
package com.atguigu.sql;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TableStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<WaterSensor> sensorDS = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 2L, 2),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3),
new WaterSensor("s3", 4L, 4)
);
// TODO 1. 流转表
Table sensorTable = tableEnv.fromDataStream(sensorDS);
tableEnv.createTemporaryView("sensor", sensorTable);
Table filterTable = tableEnv.sqlQuery("SELECT id, ts, vc FROM sensor WHERE ts > 2");
// TODO 2. 表转流
// 2.1 追加流
tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");
// 2.2 changelog流(结果需要更新)
Table sumTable = tableEnv.sqlQuery("SELECT id, SUM(vc) FROM sensor GROUP BY id");
tableEnv.toChangelogStream(sumTable).print("sum");
// 只要代码中调用了 DataStreamAPI,就需要 execute,否则不需要
env.execute();
}
}