跳转至

RichFunction

package com.atguigu.transfrom;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RichFunctionDemo {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(2);

        DataStreamSource<String> source = env.socketTextStream("192.168.1.7", 9091);
        SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<String, Integer>() {
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                System.out.println(
                        "子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()
                                + ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()
                                + ",调用open()");
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println(
                        "子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()
                                + ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()
                                + ",调用close()");
            }

            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value) + 1;
            }
        });

        /**
         * TODO RichXXXFunction: 富函数
         * 1、多了生命周期管理方法:
         *    open(): 每个子任务,在启动时,调用一次
         *    close():每个子任务,在结束时,调用一次
         *      => 如果是flink程序异常挂掉,不会调用close
         *      => 如果是正常调用 cancel命令,可以close
         * 2、多了一个 运行时上下文
         *    可以获取一些运行时的环境信息,比如 子任务编号、名称、其他的.....
         */
//        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
//        SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<Integer, Integer>() {
//
//            @Override
//            public void open(Configuration parameters) throws Exception {
//                super.open(parameters);
//                System.out.println(
//                        "子任务编号="+getRuntimeContext().getIndexOfThisSubtask()
//                                +",子任务名称="+getRuntimeContext().getTaskNameWithSubtasks()
//                                +",调用open()");
//            }
//
//            @Override
//            public void close() throws Exception {
//                super.close();
//                System.out.println(
//                        "子任务编号="+getRuntimeContext().getIndexOfThisSubtask()
//                                +",子任务名称="+getRuntimeContext().getTaskNameWithSubtasks()
//                                +",调用close()");
//            }
//
//            @Override
//            public Integer map(Integer value) throws Exception {
//                return value + 1;
//            }
//        });

        map.print();

        env.execute();
    }
}