跳转至

ScalarFunction

package com.atguigu.sql;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class MyScalarFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );

        Table sensorTable = tableEnv.fromDataStream(sensorDS);
        tableEnv.createTemporaryView("sensor", sensorTable);

        tableEnv.createTemporaryFunction("HashFunction", HashFunction.class);  // TODO 2.注册函数

        // TODO 3.调用 自定义函数
        // 3.1 sql用法
//        tableEnv.sqlQuery("select HashFunction(id) from sensor")
//                .execute()  // 调用了 sql 的 execute,就不需要 env.execute()
//                .print();

        // 3.2 table api 用法
        sensorTable
                .select(call("HashFunction",$("id")))
                .execute()
                .print();
    }

    // TODO 1.定义 自定义函数的实现类
    public static class HashFunction extends ScalarFunction{
        // 接受任意类型的输入,返回 INT型输出
        public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
            return o.hashCode();
        }
    }
}