TableFunction
package com.atguigu.sql;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class MyTableFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> strDS = env.fromElements(
"hello flink",
"hello world hi",
"hello java"
);
Table sensorTable = tableEnv.fromDataStream(strDS, $("words"));
tableEnv.createTemporaryView("str", sensorTable);
// TODO 2.注册函数
tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class);
// TODO 3.调用 自定义函数
// 3.1 交叉联结
tableEnv
// 3.1 交叉联结
// .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))")
// 3.2 带 on true 条件的 左联结
// .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true")
// 重命名侧向表中的字段
.sqlQuery("SELECT words, newWord, newLength FROM str LEFT JOIN LATERAL TABLE(SplitFunction(words)) AS T(newWord, newLength) ON TRUE")
.execute()
.print();
}
// TODO 1.继承 TableFunction<返回的类型>
// 类型标注: Row包含两个字段:word和length
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
// 返回是 void,用 collect方法输出
public void eval(String str) {
for (String word : str.split(" ")) {
collect(Row.of(word, word.length()));
}
}
}
}