在demo中,WikipediaEditsSource类作为数据源负责向Flink提供实时消息,今天咱们一起来分析其源码,了解Flink是怎么获取到来自Wiki的实时数据的,这对我们今后做自定义数据源也有很好的参考作用;
官方解释
以下是官网对消息来源的说明,维基百科提供了一个IRC协议的通道,从这个通道可以获取对维基百科所做的编辑行为的日志:
Wikipedia provides an IRC channel where all edits to the wiki are logged.
IRC是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat
继承关系
先看WikipediaEditsSource类的继承关系,做个初步了解,如下图:
如上图所示,RichFunction接口负责资源开启关闭以及环境上下文,而SourceFunction接口则是和数据生产行为的开始和停止有关,这些接口最终都在WikipediaEditSource实现;
构造方法
通过构造方法来了解有哪些参数被确定了:
//远程连接的域名
public static final String DEFAULT_HOST = "irc.wikimedia.org";
//远程连接的端口
public static final int DEFAULT_PORT = 6667;
//IRC协议的channel
public static final String DEFAULT_CHANNEL = "#en.wikipedia";
private final String host;
private final int port;
private final String channel;
public WikipediaEditsSource() {
this(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_CHANNEL);
}
public WikipediaEditsSource(String host, int port, String channel) {
this.host = host;
this.port = port;
this.channel = Objects.requireNonNull(channel);
}
通过上述代码可以见到,数据的来源是irc.wikimedia.org这个网址;
主业务代码
主要的业务逻辑是WikipediaEditsSource的run方法,该方法在任务启动的时候会被StreamSource.run方法调用:
@Override
public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {
try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {
// 创建一个IRC协议的连接
ircStream.connect();
//进入指定的channel
ircStream.join(channel);
try {
while (isRunning) {
//从阻塞队列中获取数据
WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);
//如果取到了数据,就调用ctx.collect方法,将数据生产到Flink环境,给其他operator使用
if (edit != null) {
ctx.collect(edit);
}
}
} finally {
//结束时要向服务器发送数据表示离开
ircStream.leave(channel);
}
}
}
上面的代码,我们挑几处重要的展开看一看;
和维基百科消息服务器建立连接后做的事情
- 为了弄明白Flink是如何与维基百科的数据源建立连接的,先把ircStream.connect()这段代码展开,对应的是IRCConnection类的connect方法:
public void connect() throws IOException {
if (level != 0) // otherwise disconnected or connect
throw new SocketException("Socket closed or already open ("+ level +")");
IOException exception = null;
Socket s = null;
for (int i = 0; i < ports.length && s == null; i++) {
try {
//建立的是普通Socket连接
s = new Socket(host, ports[i]);
exception = null;
} catch (IOException exc) {
if (s != null)
s.close();
s = null;
exception = exc;
}
}
if (exception != null)
throw exception; // connection wasn't successful at any port
prepare(s);
}
上述代码表明,Flink与维基百科的数据源服务器之间建立的是普通的Socket连接,至于IRC协议,都是在这个Socket连接的通道里的一些读写操作;
- 上面的prepare方法比较关键,展开看看:
protected void prepare(Socket s) throws IOException {
if (s == null)
throw new SocketException("Socket s is null, not connected");
socket = s;
level = 1;
s.setSoTimeout(timeout);
in = new BufferedReader(new InputStreamReader(s.getInputStream(),
encoding));
out = new PrintWriter(new OutputStreamWriter(s.getOutputStream(),
encoding));
//IRCConnection是Thread的子类,执行start方法就表明会启动一个线程来执行IRCConnection的run方法
start();
//遵守IRC协议约定,发送一些注册相关的内容
register();
}
可以看出,prepare方法做了两个重要的事情:启动一个子线程、发送IRC协议的注册信息,接下来看启动的子线程做了什么;
- 打开IRCConnection的run方法:
public void run() {
try {
String line;
while (!isInterrupted()) {
line = in.readLine();
if (line != null)
get(line);
else
close();
}
} catch (IOException exc) {
close();
}
}
run方法中的内容很简单,就是让这个子线程负责读取远端发送的字符串,每读到一行就调用get方法去处理;
- get方法的内容很多,做的事情是根据IRC协议解析这个字符串再做不同的处理,这里我们只要关注下面这段,就是收到一条业务消息后如何处理:
//每当有人编辑了维基百科,这里就会收到一条command为PRIVMSG的记录
标签:null,Socket,数据源,Flink,WikipediaEditsSource,new,IRC,public,channel From: https://blog.51cto.com/u_17015008/12080180