首页 > 其他分享 >Netty实战(十六)

Netty实战(十六)

时间:2023-06-18 20:00:50浏览次数:43  
标签:实战 Netty netty 十六 LogEvent io new import channel

(UDP广播事件(二)编写广播者和监视器)

一、编写广播者

Netty 提供了大量的类来支持 UDP 应用程序的编写。下面我们列出一些要用到的类型:

名 称 描 述
interface AddressedEnvelope<M, A extends SocketAddress>extends ReferenceCounted 定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中 M 是消息类型;A 是地址类型
class DefaultAddressedEnvelope<M, A extends SocketAddress>implements AddressedEnvelope<M,A> 提供了 interface AddressedEnvelope的默认实现
class DatagramPacketextends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> 扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器
implements ByteBufHolder 扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器
interface DatagramChannelextends Channel 扩展了 Netty 的 Channel 抽象以支持 UDP 的多播组管理
class NioDatagramChannnelextends AbstractNioMessageChannelimplements DatagramChannel 定义了一个能够发送和接收 AddressedEnvelope 消息的 Channel 类型

Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。
要将 LogEvent 消息转换为 DatagramPacket,我们将需要一个编码器。但是没有必要从头开始编写我们自己的。我们将扩展 Netty 的 MessageToMessageEncoder,这个我们之前使用过。

下面我们看一下将要广播的信息:我们将展示正在广播的 3 个日志条目,每一个都将通过一个专门的 DatagramPacket进行广播。 在这里插入图片描述

这是该 LogEventBroadcaster 的 ChannelPipeline 的一个高级别视图,展示了 LogEvent 消息是如何流经它的。 在这里插入图片描述

我们会将所有的将要被传输的数据都被封装在了 LogEvent 消息中。LogEventBroadcaster 将把这些写入到 Channel 中,并通过 ChannelPipeline 发送它们,在那里它们将会被转换(编码)为 DatagramPacket 消息。最后,他们都将通过 UDP 被广播,并由远程节点(监视器)所捕获。

下面我们自定义一个MessageToMessageEncoder来执行上面所说的转换:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate:
 */
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;

    //LogEventEncoder 创建了即将被发送到指定的InetSocketAddress 的 DatagramPacket 消息
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc()
                .buffer(file.length + msg.length + 1);
        //将文件名写入到 ByteBuf 中
        buf.writeBytes(file);
        //添加一个SEPARATOR
        buf.writeByte(LogEvent.SEPARATOR);
        //将日志消息写入ByteBuf 中
        buf.writeBytes(msg);
        //将一个拥有数据和目的地地址的新 DatagramPacket 添加到出站的消息列表中
        out.add(new DatagramPacket(buf, remoteAddress));
    }
}

在 LogEventEncoder 被实现之后,我们就该准备引导该服务器,其包括设置各种各样的 ChannelOption,以及在 ChannelPipeline 中安装所需要的 ChannelHandler,这些都要通过主类 LogEventBroadcaster 完成。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: 广播者组件
 */
public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;
    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        //引导该 NioDatagramChannel(无连接的)
        bootstrap.group(group).channel(NioDatagramChannel.class)
                //设置 SO_BROADCAST 套接字选项
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }
    public void run() throws Exception {
        //绑定 Channel
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //启动主处理循环
        for (;;) {
            long len = file.length();
            if (len < pointer) {
               // 文件已重置
                //如果有必要,将文件指针设置到该文件的最后一个字节
                pointer = len;
            } else if (len > pointer) {
                // 已添加内容
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                //设置当前的文件指针,以确保没有任何的旧日志被发送
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    //对于每个日志条目,写入一个LogEvent到Channel 中
                    ch.writeAndFlush(new LogEvent(null, -1,
                            file.getAbsolutePath(), line));
                }
                //存储其在文件中的当前位置
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                //休眠 1 秒,如果被中断,则退出循环;否则重新处理它
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }
    public void stop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            throw new IllegalArgumentException();
        }
        //创建并启动一个新的LogEventBroadcaster的实例
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                new InetSocketAddress("255.255.255.255",
                        Integer.parseInt(args[0])), new File(args[1]));
        try {
            broadcaster.run();
        }
        finally {
            broadcaster.stop();
        }
    }
}

这样就完成了该应用程序的广播者组件。

二、编写监视器

我们定义一个 LogEventMonitor。

这个程序将: (1)接收由 LogEventBroadcaster 广播的 UDP DatagramPacket; (2)将它们解码为 LogEvent 消息; (3)将 LogEvent 消息写出到 System.out。

和之前一样,该逻辑由一组自定义的 ChannelHandler 实现,我们扩展 MessageToMessageDecoder。

下面是一个LogEventMonitor 的 ChannelPipeline,展示了LogEvent 是如何流经它的 在这里插入图片描述

综上所述,我们需要2个解码器来处理LogEvent ,它们分别是LogEventDecoder 、LogEventHandler。

LogEventDecoder :

1、ChannelPipeline 中的第一个解码器LogEventDecoder 负责将传入的DatagramPacket解码为LogEvent 消息(一个用于转换入站数据的任何 Netty 应用程序的典型设置)

下面我们开始写LogEventDecoder的实现:


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;

import java.util.List;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate:LogEventDecoder
 */
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {

    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
        //获取对 DatagramPacket 中的数据(ByteBuf)的引用
        ByteBuf data = datagramPacket.content();
        //获取该 SEPARATOR 的索引
        int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
        //提取文件名
        String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
        //提取日志消息
        String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
        //构建一个新的 LogEvent 对象,并且将它添加到(已经解码的消息的)列表中
        LogEvent event = new LogEvent(datagramPacket.sender(),
                System.currentTimeMillis(), filename, logMsg);
        out.add(event);
    }
}

LogEventHandler:

2、第二个 ChannelHandler 的工作是对第一个 ChannelHandler 所创建的 LogEvent 消息执行一些处理。在这个场景下,它只是简单地将它们写出到 System.out。在真实世界的应用程序中,可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。

下面我们开始写LogEventHandler的实现:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: LogEventHandler
 */
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //扩展 SimpleChannelInboundHandler 以处理 LogEvent 消息

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //当异常发生时,打印栈跟踪信息,并关闭对应的 Channe
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
        //创建 StringBuilder,并且构建输出的字符串
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());
        //打印 LogEvent 的数据
        System.out.println(builder.toString());
    }
}

LogEventHandler 将以一种简单易读的格式打印 LogEvent 消息,包括以下的各项:

  • 发送方的 InetSocketAddress,其由 IP 地址和端口组成;
  • 生成 LogEvent 消息的日志文件的绝对路径名;
  • 实际上的日志消息,其代表日志文件中的一行。

按照我们之前讲过的内容,下面我们要做的是将LogEventDecoder 和LogEventHandler 安装到ChannelPipeline。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.net.InetSocketAddress;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: 监视器
 */
public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                //引导该 NioDatagramChannel
                .channel(NioDatagramChannel.class)
                //设置套接字选项 SO_BROADCAST
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //将 LogEventDecoder 和 LogEventHandler 添加到 ChannelPipeline 中
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                } )
                .localAddress(address);
    }
    
    //绑定 Channel。注意,DatagramChannel 是无连接的
    public Channel bind() {
        return bootstrap.bind().syncUninterruptibly().channel();
    }
    public void stop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
        }
        //构造一个新的LogEventMonitor
        LogEventMonitor monitor = new LogEventMonitor(
                new InetSocketAddress(Integer.parseInt(args[0])));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync();
        } finally {
            monitor.stop();
        }
    }
}

三、运行 LogEventBroadcaster 和 LogEventMonitor

编写完广播者和监视器后,我们在idea中分别启动它们,当我们看到控制台打印:”LogEventMonitor running“时,说明已经启动成功!

标签:实战,Netty,netty,十六,LogEvent,io,new,import,channel
From: https://blog.51cto.com/TiMi/6509363

相关文章

  • Vue实战(09)-列表渲染:让你的页面秒变爆款!
    1最基础的循环<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>Vue中的列表渲染</title><scriptsrc="../vue.js"></script></head><body>......
  • 【人工智能】国产开源大模型聊天 AquilaChat 快速开始上手实战&效果评测
    【人工智能】国产开源大模型聊天AquilaChat快速开始上手实战&效果评测文章目录【人工智能】国产开源大模型聊天AquilaChat快速开始上手实战&效果评测禅与计算机程序设计艺术:评测结论——AquilaChat在写作水平上跟ChatGLM-6B差不多,但是AquilaChat多编程语言写代码能力还是......
  • TensorFlow05.3 神经网络-FashionMNIST实战
    一.数据的加载:(x,y),(x_test,y_test)=datasets.fashion_mnist.load_data()print(x.shape,y.shape)二.数据的处理defpreprocess(x,y):x=tf.cast(x,dtype=tf.float32)/255.#归一化y=tf.cast(y,dtype=tf.int32)returnx,y batchsz=128db......
  • Zookeeper入门实战(5)-分布式锁
    在分布式环境中,当需要控制对某一资源的不同进程并发访问时就需要使用分布式锁;可以使用 ZooKeeper+Curator来实现分布式锁,本文主要介绍 Curator中分布式锁的使用,文中所使用到的软件版本:Java1.8.0_341、Zookeeper3.7.1、curator5.4.0。1、引入依赖<dependency><groupId......
  • 十六、区块量化 MACD策略
    新增cross_macd_order.py#-*-coding:utf-8-*-importtalibimportcross_orderasorderimporttimedefmain():print("任务开始时间:",time.strftime('%Y-%m-%d%H:%M:%S',time.localtime(time.time())))forsymbolinorder.symbol_pool:#设置杠杆倍数......
  • 第1课 基于ChatGPT的端到端语音聊天机器人项目实战
    第1课基于ChatGPT的端到端语音聊天机器人项目实战1.1ChatGPTAPI后台开发实战本节主要是跟大家分享一个端到端的基于模型驱动的对话机器人,会有前端和后端,也会有一些具体模型的调用,读者需具有Python语言编程的基础,这是前置性的条件,有了这个基础,理论上讲本节所有的内容,读者都可以......
  • 基于ChatGPT的端到端语音聊天机器人项目实战
     企业级ChatGPT开发入门实战第1课基于ChatGPT的端到端语音聊天机器人项目实战1.4使用FastAPI构建语音聊天机器人后端实战在后端代码(backend)中调用了OpenAIAPI及其他的服务,如图1-10所示。图1-10后端代码调用OpenAIAPIopenai_requests.py是一个相对比较简单的代码文件,在生产级......
  • SpringBatch从入门到实战(一):简介和环境搭建
    一:简介SpringBatch是一个轻量级的批处理框架,适合处理大批量的数据(如百万级别)。功能就是从一个地方读数据写到另一个地方去。一般都是系统之间不能直接访问同一个数据库,需要通过文件来交换数据。二:从文件中读然后写到数据库这代码谁都会写,那么为什么还要使用框架?try(BufferedReader......
  • Python3网络爬虫开发实战阅读笔记
    基本库的使用网络请求库urllib(HTTP/1.1)Python自带请求库,繁琐基础使用:略requests(HTTP/1.1)Python常用第三方请求库,便捷基础使用:略httpx(HTTP/2.0)Python第三方库,支持HTTP/2.0,支持异步请求,支持Python的async请求模式pipinstall'httpx[http2]'基础使用:与requests相似,默认......
  • Go设计模式实战--用状态模式实现系统工作流和状态机
    大家好,这里是每周都在陪你进步的网管~!本节我们讲一个行为型的设计模式--状态模式,并通过Golang示例进行实战演示。状态模式(StatePattern)也叫作状态机模式(StateMachinePattern)状态模式允许对象的内部状态发生改变时,改变它的行为,就好像对象看起来修改了它实例化的类,状态模式是一种......