跳转至

TableFunction

package com.atguigu.sql;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class MyTableFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> strDS = env.fromElements(
                "hello flink",
                "hello world hi",
                "hello java"
        );
        Table sensorTable = tableEnv.fromDataStream(strDS, $("words"));
        tableEnv.createTemporaryView("str", sensorTable);

        // TODO 2.注册函数
        tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class);

        // TODO 3.调用 自定义函数
        // 3.1 交叉联结
        tableEnv
                // 3.1 交叉联结
//                .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))")
                // 3.2 带 on  true 条件的 左联结
//                .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true")
                // 重命名侧向表中的字段
                .sqlQuery("SELECT words, newWord, newLength FROM str LEFT JOIN LATERAL TABLE(SplitFunction(words)) AS T(newWord, newLength) ON TRUE")
                .execute()
                .print();
    }

    // TODO 1.继承 TableFunction<返回的类型>
    // 类型标注: Row包含两个字段:word和length
    @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
    public static class SplitFunction extends TableFunction<Row> {

        // 返回是 void,用 collect方法输出
        public void eval(String str) {
            for (String word : str.split(" ")) {
                collect(Row.of(word, word.length()));
            }
        }
    }
}