首页 > 编程语言 >Java版Flink使用指南——定制RabbitMQ数据源的序列化器

Java版Flink使用指南——定制RabbitMQ数据源的序列化器

时间:2024-07-09 10:56:46浏览次数:18  
标签:Java String 数据源 SampleData RabbitMQ import org 序列化

大纲

《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-rabbitmq</artifactId>
			<version>3.0.1-1.17</version>
		</dependency>

新增Json库依赖

		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.17.1</version>
		</dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.32</version>
            <scope>provided</scope>
        </dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {
    private Long id;
    private String name;
    private int age;
    private Boolean married;
    private Double salary;

    public String toJson() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsString(this);
    }

    public static SampleData fromJson(String json) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(json, SampleData.class);
    }
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;

import java.io.IOException;

public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {
    @Override
    public SampleData deserialize(byte[] message) throws IOException {
        return SampleData.fromJson(new String(message));
    }

    @Override
    public boolean isEndOfStream(SampleData nextElement) {
        return false;
    }

    @Override
    public TypeInformation<SampleData> getProducedType() {
        return TypeInformation.of(SampleData.class);
    }
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";
		String host = "172.21.112.140"; // IP of the rabbitmq server
		int port = 5672;
		String username = "admin";
		String password = "fangliang";
		String virtualHost = "/";
		int parallelism = 1;

		// create a RabbitMQ source
		RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
				.setHost(host)
				.setPort(port)
				.setUserName(username)
				.setPassword(password)
				.setVirtualHost(virtualHost)
				.build();

		RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());
		final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);

		stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

标签:Java,String,数据源,SampleData,RabbitMQ,import,org,序列化
From: https://blog.csdn.net/breaksoftware/article/details/140077842

相关文章

  • Java IO
    IO概述根据UNIX网络编程对I/O模型的分类,UNIX提供了5种I/O模型,分别是阻塞I/O模型、非阻塞I/O模型、I/O复用模型、信号驱动I/O模型、异步I/O模型。阻塞I/O模型(blockingI/O):是最常用的I/O模型,缺省情形下,所有文件操作都是阻塞的。我们以套接字接口为例来理解此模型,即在进程空......
  • 1154java jsp SSM古董拍卖网站系统(源码+文档+PPT+运行视频+讲解视频)
     项目技术:SSM+Maven+Vue等等组成,B/S模式+Maven管理等等。环境需要1.运行环境:最好是javajdk1.8,我们在这个平台上运行的。其他版本理论上也可以。2.IDE环境:IDEA,Eclipse,Myeclipse都可以。推荐IDEA;3.tomcat环境:Tomcat7.x,8.x,9.x版本均可4.硬件环境:windows7/8/1......
  • Java线程之线程池
    1、什么是线程池?        线程池:用来管理线程对象的池子2、为什要使用线程池?               1、解决频繁的创建和销毁线程消耗的性能        2、解决大量创建线程而导致的内存泄露问题         注意:......
  • 1047java jsp SSM旅游管理系统旅游路线推荐特色产品酒店预约(源码+文档+PPT+运行视频+
     项目技术:SSM+Maven+Vue等等组成,B/S模式+Maven管理等等。环境需要1.运行环境:最好是javajdk1.8,我们在这个平台上运行的。其他版本理论上也可以。2.IDE环境:IDEA,Eclipse,Myeclipse都可以。推荐IDEA;3.tomcat环境:Tomcat7.x,8.x,9.x版本均可4.硬件环境:windows7/8/1......
  • Java中的类加载器
     类加载器1.什么是类加载器?启动类加载器(BootstrapClassLoader):这是JVM自带的类加载器,负责加载Java的核心类库,如rt.jar等。由于安全原因,启动类加载器加载的类不能被其他类加载器加载的类所引用。扩展类加载器(ExtensionClassLoader):负责加载Java的扩展类库,一般位于$JAVA_H......
  • java反射技术学习
    反射反射:加载类,并允许以编程的方式解剖类中的各种成分(成员变量,方法,构造器等)反射学什么?学习获取类的信息、操作他们1.反射第一步:加载类、获取类的字节码:Class对象 packagecom.itheima.reflect; ​ publicclassreflect1{   publicstaticvoidmain(String[]......
  • JavaScript介绍、初识(注释语法、书写位置、书写规范)、常量和变量、数据类型Number、
    【一】JavaScript介绍【1】什么是jsjs也是一门编程语言,他可以写后端代码【2】什么是node.js前端由于非常受制于后端,所以有一些人异想天开想要通过js来编写后端代码一统江湖由此开发了一个叫nodejs的工具(支持js跑在后端服务器上)但是并不能完美的实现【3】JavaScript......
  • Java 修饰符
    Java中的修饰符用于修饰类、方法、变量等元素,它们控制了这些元素的访问性、继承性和其他特性。主要的修饰符包括:1.**访问控制修饰符**:  -**public**:可以被任何其他类访问。  -**protected**:可以被同一包内的类及其子类访问。  -**default(nomodifier)*......
  • C#——二进制流序列化和反序列化
    C#二进制流序列化和反序列化在C#中,可以使用BinaryFormatter来进行二进制的序列化和反序列化。首先,定义一个可序列化的类[Serializable]publicclassMyObject{publicintIntProperty{get;set;}publicstringStringProperty{get;set;}}使用BinaryFo......
  • C#——XML格式序列化和反序列化
    C#—XML格式序列化和反序列化在C#中,可以使用System.Xml.Serialization命名空间下的XmlSerializer类来实现XML的序列化。首先,定义一个可序列化的对象模型:usingSystem;usingSystem.Xml.Serialization;[XmlRoot("Person")]publicclassPerson{[XmlElement("Name"......