跳转至

Time Window

package com.atguigu.window;

import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class TimeWindowDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("192.168.1.7", 9091)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // 滚动窗口,窗口长度10秒
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//滑动窗口,长度10s,步长5s
//                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));//会话窗口,间隔5s
//                .window(ProcessingTimeSessionWindows.withDynamicGap(
//                        new SessionWindowTimeGapExtractor<WaterSensor>() {
//                            @Override
//                            public long extract(WaterSensor element) {
//                                // 从数据中提取ts,作为间隔,单位ms
//                                return element.getTs() * 1000L;
//                            }
//                        }
//                ));// 会话窗口,动态间隔,每条来的数据都会更新 间隔时间

        SingleOutputStreamOperator<String> process = sensorWS
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                            /**
                             * 全窗口函数计算逻辑:  窗口触发时才会调用一次,统一计算窗口的所有数据
                             * @param s   分组的key
                             * @param context  上下文
                             * @param elements 存的数据
                             * @param out      采集器
                             * @throws Exception
                             */
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                // 上下文可以拿到window对象,还有其他东西:侧输出流 等等
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );

        process.print();

        env.execute();
    }
}

/**
 *  触发器、移除器: 现成的几个窗口,都有默认的实现,一般不需要自定义
 *
 *  以 时间类型的 滚动窗口 为例,分析原理:
    TODO 1、窗口什么时候触发 输出?
            时间进展 >= 窗口的最大时间戳(end - 1ms)

    TODO 2、窗口是怎么划分的?
            start= 向下取整,取窗口长度的整数倍
            end = start + 窗口长度

            窗口左闭右开 ==》 属于本窗口的 最大时间戳 = end - 1ms

    TODO 3、窗口的生命周期?
            创建: 属于本窗口的第一条数据来的时候,现new的,放入一个singleton单例的集合中
            销毁(关窗): 时间进展 >=  窗口的最大时间戳(end - 1ms) + 允许迟到的时间(默认0)

    remainder = (timestamp - offset) % windowSize;
 ·  (13s - 0 )% 10 = 3
    (27s - 0 )% 10 = 7
    if (remainder < 0) {
        return timestamp - (remainder + windowSize);
    } else {
        return timestamp - remainder;
        13 -3 = 10
        27 - 7 = 20
    }

 */