Window Aggregate
package com.atguigu.window;
import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class WindowAggregateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("192.168.1.7", 9091)
.map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
// 1. 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// 2. 窗口函数: 增量聚合 Aggregate
/**
* 1、属于本窗口的第一条数据来,创建窗口,创建累加器
* 2、增量聚合: 来一条计算一条, 调用一次add方法
* 3、窗口输出时调用一次getresult方法
* 4、输入、中间累加器、输出 类型可以不一样,非常灵活
*/
SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(
/**
* 第一个类型: 输入数据的类型
* 第二个类型: 累加器的类型,存储的中间计算结果的类型
* 第三个类型: 输出的类型
*/
new AggregateFunction<WaterSensor, Integer, String>() {
/**
* 创建累加器,初始化累加器
* @return
*/
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
/**
* 聚合逻辑
* @param value
* @param accumulator
* @return
*/
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add方法,value="+value);
return accumulator + value.getVc();
}
/**
* 获取最终结果,窗口触发时输出
* @param accumulator
* @return
*/
@Override
public String getResult(Integer accumulator) {
System.out.println("调用getResult方法");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
// 只有会话窗口才会用到
System.out.println("调用merge方法");
return null;
}
}
);
aggregate.print();
env.execute();
}
}