通过 SideOutput 分流
package com.atguigu.split;
import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* TODO 分流: 奇数、偶数拆分成不同流
*/
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 默认端口 8081
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
// s1,4,6
.socketTextStream("192.168.1.7", 9091)
.map(new WaterSensorMapFunction());
/**
* TODO 使用侧输出流 实现分流
* 需求: watersensor的数据,s1、s2的数据分别分开
*
* TODO 总结步骤:
* 1、使用 process算子
* 2、定义 OutputTag对象
* 3、调用 ctx.output
* 4、通过主流 获取 测流
*/
/**
* 创建OutputTag对象
* 第一个参数: 标签名
* 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation
*/
OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
SingleOutputStreamOperator<WaterSensor> process = sensorDS
.process(
new ProcessFunction<WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
String id = value.getId();
if ("s1".equals(id)) {
// 如果是 s1,放到侧输出流s1中
/**
* 上下文ctx 调用ouput,将数据放入侧输出流
* 第一个参数: Tag对象
* 第二个参数: 放入侧输出流中的 数据
*/
ctx.output(s1Tag, value);
} else if ("s2".equals(id)) {
// 如果是 s2,放到侧输出流s2中
ctx.output(s2Tag, value);
} else {
// 非s1、s2的数据,放到主流中
out.collect(value);
}
}
}
);
// 从主流中,根据标签 获取 侧输出流
SideOutputDataStream<WaterSensor> s1 = process.getSideOutput(s1Tag);
SideOutputDataStream<WaterSensor> s2 = process.getSideOutput(s2Tag);
// 打印主流
process.print("主流-非s1、s2");
//打印 侧输出流
s1.printToErr("s1");
s2.printToErr("s2");
env.execute();
}
}