从自定义的 Source 中读取
CustomEvent
package com.atguigu.source;
public class CustomEvent {
private final String user;
private final String url;
private final long timestamp;
public CustomEvent(String user, String url, long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "CustomEvent{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + timestamp +
'}';
}
}
CustomSource
package com.atguigu.source;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class CustomSource<C> implements SourceFunction<CustomEvent> {
//声明一个标志位
private boolean runFlag = true;
@Override
public void run(SourceContext<CustomEvent> ctx) throws Exception {
//顶一个随机数
Random random = new Random();
//选举范围数据集
String[] users = {"令狐冲", "依琳", "宁中则", "任盈盈", "岳灵珊"};
String[] urls = {"./home", "./cret", "./pro?id=21", "./fav"};
//循环不停的生成数据
while (runFlag) {
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
Long timeStamp = Calendar.getInstance().getTimeInMillis();
ctx.collect(new CustomEvent(user, url, timeStamp));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
runFlag = false;
}
}
主程序
package com.atguigu.source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CustomSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<CustomEvent> eventStream = env.addSource(new CustomSource<CustomEvent>());//.setParallelism(12);
eventStream.print();
env.execute();
}
}
参考