跳转至

从自定义的 Source 中读取

CustomEvent

package com.atguigu.source;

public class CustomEvent {

    private final String user;
    private final String url;
    private final long timestamp;
    public CustomEvent(String user, String url, long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "CustomEvent{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

CustomSource

package com.atguigu.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class CustomSource<C> implements SourceFunction<CustomEvent> {
    //声明一个标志位
    private boolean runFlag = true;

    @Override
    public void run(SourceContext<CustomEvent> ctx) throws Exception {
        //顶一个随机数
        Random random = new Random();
        //选举范围数据集
        String[] users = {"令狐冲", "依琳", "宁中则", "任盈盈", "岳灵珊"};
        String[] urls = {"./home", "./cret", "./pro?id=21", "./fav"};

        //循环不停的生成数据
        while (runFlag) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Long timeStamp = Calendar.getInstance().getTimeInMillis();
            ctx.collect(new CustomEvent(user, url, timeStamp));
            Thread.sleep(1000L);
        }

    }

    @Override
    public void cancel() {
        runFlag = false;
    }
}

主程序

package com.atguigu.source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CustomSourceDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<CustomEvent> eventStream = env.addSource(new CustomSource<CustomEvent>());//.setParallelism(12);

        eventStream.print();
        env.execute();
    }

}

参考