首页 > 其他分享 >Flink数据源拆解分析(WikipediaEditsSource)

Flink数据源拆解分析(WikipediaEditsSource)

时间:2024-09-22 13:22:38浏览次数:3  
标签:null Socket 数据源 Flink WikipediaEditsSource new IRC public channel

在demo中,WikipediaEditsSource类作为数据源负责向Flink提供实时消息,今天咱们一起来分析其源码,了解Flink是怎么获取到来自Wiki的实时数据的,这对我们今后做自定义数据源也有很好的参考作用;

官方解释

以下是官网对消息来源的说明,维基百科提供了一个IRC协议的通道,从这个通道可以获取对维基百科所做的编辑行为的日志:

Wikipedia provides an IRC channel where all edits to the wiki are logged.

IRC是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat

继承关系

先看WikipediaEditsSource类的继承关系,做个初步了解,如下图:

在这里插入图片描述

如上图所示,RichFunction接口负责资源开启关闭以及环境上下文,而SourceFunction接口则是和数据生产行为的开始和停止有关,这些接口最终都在WikipediaEditSource实现;

构造方法

通过构造方法来了解有哪些参数被确定了:

//远程连接的域名

public static final String DEFAULT_HOST = "irc.wikimedia.org";

//远程连接的端口

public static final int DEFAULT_PORT = 6667;

//IRC协议的channel

public static final String DEFAULT_CHANNEL = "#en.wikipedia";

private final String host;

private final int port;

private final String channel;

public WikipediaEditsSource() {

this(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_CHANNEL);

}

public WikipediaEditsSource(String host, int port, String channel) {

this.host = host;

this.port = port;

this.channel = Objects.requireNonNull(channel);

}

通过上述代码可以见到,数据的来源是irc.wikimedia.org这个网址;

主业务代码

主要的业务逻辑是WikipediaEditsSource的run方法,该方法在任务启动的时候会被StreamSource.run方法调用:

@Override

public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {

try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {

// 创建一个IRC协议的连接

ircStream.connect();

//进入指定的channel

ircStream.join(channel);

try {

while (isRunning) {

//从阻塞队列中获取数据

WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);

//如果取到了数据,就调用ctx.collect方法,将数据生产到Flink环境,给其他operator使用

if (edit != null) {

ctx.collect(edit);

}

}

} finally {

//结束时要向服务器发送数据表示离开

ircStream.leave(channel);

}

}

}

上面的代码,我们挑几处重要的展开看一看;

和维基百科消息服务器建立连接后做的事情

  1. 为了弄明白Flink是如何与维基百科的数据源建立连接的,先把ircStream.connect()这段代码展开,对应的是IRCConnection类的connect方法:

public void connect() throws IOException {

if (level != 0) // otherwise disconnected or connect

throw new SocketException("Socket closed or already open ("+ level +")");

IOException exception = null;

Socket s = null;

for (int i = 0; i < ports.length && s == null; i++) {

try {

//建立的是普通Socket连接

s = new Socket(host, ports[i]);

exception = null;

} catch (IOException exc) {

if (s != null)

s.close();

s = null;

exception = exc;

}

}

if (exception != null)

throw exception; // connection wasn't successful at any port

prepare(s);

}

上述代码表明,Flink与维基百科的数据源服务器之间建立的是普通的Socket连接,至于IRC协议,都是在这个Socket连接的通道里的一些读写操作;

  1. 上面的prepare方法比较关键,展开看看:

protected void prepare(Socket s) throws IOException {

if (s == null)

throw new SocketException("Socket s is null, not connected");

socket = s;

level = 1;

s.setSoTimeout(timeout);

in = new BufferedReader(new InputStreamReader(s.getInputStream(),

encoding));

out = new PrintWriter(new OutputStreamWriter(s.getOutputStream(),

encoding));

//IRCConnection是Thread的子类,执行start方法就表明会启动一个线程来执行IRCConnection的run方法

start();

//遵守IRC协议约定,发送一些注册相关的内容

register();

}

可以看出,prepare方法做了两个重要的事情:启动一个子线程、发送IRC协议的注册信息,接下来看启动的子线程做了什么;

  1. 打开IRCConnection的run方法:

public void run() {

try {

String line;

while (!isInterrupted()) {

line = in.readLine();

if (line != null)

get(line);

else

close();

}

} catch (IOException exc) {

close();

}

}

run方法中的内容很简单,就是让这个子线程负责读取远端发送的字符串,每读到一行就调用get方法去处理;

  1. get方法的内容很多,做的事情是根据IRC协议解析这个字符串再做不同的处理,这里我们只要关注下面这段,就是收到一条业务消息后如何处理:

//每当有人编辑了维基百科,这里就会收到一条command为PRIVMSG的记录

标签:null,Socket,数据源,Flink,WikipediaEditsSource,new,IRC,public,channel
From: https://blog.51cto.com/u_17015008/12080180

相关文章

  • Java中的多数据源管理:如何在单个应用中集成多数据库
    Java中的多数据源管理:如何在单个应用中集成多数据库大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在现代软件架构中,应用往往需要访问多个数据库以支持不同的业务需求。本文将介绍如何在Java应用中实现多数据源管理,包括配置、使用和切换数据源的最佳......
  • Flink的反压机制:底层原理、产生原因、排查思路与解决方案
            反压(Backpressure)是流处理框架(如ApacheFlink)中非常重要的概念。反压的产生和有效处理,直接影响整个流处理作业的稳定性和性能。本文将从Flink的底层原理、反压产生的原因、如何排查反压问题,以及如何解决反压问题等方面进行详细讨论。1.Flink反压的底层原......
  • Flink 中 Checkpoint 的底层原理和机制
            Flink的Checkpoint机制是ApacheFlink在流式处理中的一个核心特性,保证了分布式数据流处理系统的 容错性。通过定期保存 状态快照(checkpoint),即使在发生故障时,Flink也可以恢复到之前的状态,确保处理的正确性。为了全面解释Flink的Checkpoint底层实现......
  • flink 启动Job加载外部jar都有哪些方法?
    flink启动Job加载外部jar都有哪些方法在ApacheFlink版本中,启动Job时加载外部Jar包有几种不同的方法。这些方法允许用户引入自定义的UDF(用户定义函数)或其他依赖项。以下是几种常见的方法:1.使用flinkrun命令直接启动你可以通过命令行工具flinkrun来指定你的Job......
  • 如何基于Flink CDC与OceanBase构建实时数仓,实现简化链路,高效排查
    本文作者:阿里云FlinkSQL负责人,伍翀,ApacheFlinkPMCMember&Committer众多数据领域的专业人士都很熟悉ApacheFlink,它作为流式计算引擎,流批一体,其核心在于其强大的分布式流数据处理能力,同时巧妙地融合了流计算与批计算的能力,因此成为了众多企业在进行流式计算业务时的首......
  • 大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(已更完)ClickHouse(正在更新···)章节内容上节我们完成了如下的内容:MergeTre......
  • Flink-cdc丢失数据排查
    一、获取任务信息任务id:i01f51582-d8be-4262-aefa-000000任务名称:ods_test1234丢失的数据时间:2024-09-1609:28:47 二、数据同步查看日志1、筛选日志筛选2024-09-1609:28:47到5分钟后数据2、查找快照id,筛选内容Committedsnapshot7258609197164498019(BaseRowDelt......
  • 大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(正在更新!)章节内容上节我们完成了如下的内容:ManageOperatorStateStateBackendCheckpoint......
  • 大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(正在更新!)章节内容上节我们完成了如下的内容:Flink并行度Flink并行度详解Flink并行度......
  • 大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(正在更新!)章节内容上节我们完成了如下的内容:FlinkTimeWatermarkJava代码实例测试简单介......