首页 > 其他分享 >graylog RawMessage&RawMessageEvent&MessageEvent&Message 说明

graylog RawMessage&RawMessageEvent&MessageEvent&Message 说明

时间:2022-10-05 09:55:40浏览次数:71  
标签:java graylog2 RawMessage server graylog new Message final

RawMessage 是从graylog input 组件获取到的还没解析处理的消息,是有SimpleChannelInboundHandler 处理的原始消息
RawMessageEvent 是Disruptor 处理的事件包装
MessageEvent 是经过input 之后到ProcessBuffer 中的消息事件(Disruptor 包装的)
Message 是经过input 以及ProcessBufferProcessor 之后生成的消息

参考图

MessageEvent 是包含了RawMessage 以及message经过Disruptor 的事件

 

 

RawMessage参考代码

 
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    final byte[] bytes = new byte[msg.readableBytes()];
    msg.readBytes(bytes);
    final RawMessage raw = new RawMessage(bytes, (InetSocketAddress) ctx.channel().remoteAddress());
    input.processRawMessage(raw); // 会通过MessageInput 进行处理,会放到inputbuffer 中
}

RawMessageEvent 参考代码

@Inject
    public InputBufferImpl(MetricRegistry metricRegistry,
                           BaseConfiguration configuration,
                           Provider<DirectMessageHandler> directMessageHandlerProvider,
                           Provider<RawMessageEncoderHandler> rawMessageEncoderHandlerProvider,
                           Provider<JournallingMessageHandler> spoolingMessageHandlerProvider) {
     // 基于Disruptor的事件包装,是由 RawMessage 产生的,具体可以通过
        final Disruptor<RawMessageEvent> disruptor = new Disruptor<>(
                RawMessageEvent.FACTORY,
                configuration.getInputBufferRingSize(),
                threadFactory(metricRegistry),
                ProducerType.MULTI,
                configuration.getInputBufferWaitStrategy());
        disruptor.setDefaultExceptionHandler(new LoggingExceptionHandler(LOG));
 
        final int numberOfHandlers = configuration.getInputbufferProcessors();
        if (configuration.isMessageJournalEnabled()) {
            LOG.info("Message journal is enabled.");
 
            final RawMessageEncoderHandler[] handlers = new RawMessageEncoderHandler[numberOfHandlers];
            for (int i = 0; i < numberOfHandlers; i++) {
                handlers[i] = rawMessageEncoderHandlerProvider.get();
            }
            disruptor.handleEventsWithWorkerPool(handlers).then(spoolingMessageHandlerProvider.get());
        } else {
            LOG.info("Message journal is disabled.");
            final DirectMessageHandler[] handlers = new DirectMessageHandler[numberOfHandlers];
            for (int i = 0; i < numberOfHandlers; i++) {
                handlers[i] = directMessageHandlerProvider.get();
            }
            disruptor.handleEventsWithWorkerPool(handlers);
        }
 
        ringBuffer = disruptor.start();
 
        incomingMessages = metricRegistry.meter(name(InputBufferImpl.class, "incomingMessages"));
        safelyRegister(metricRegistry, GlobalMetricNames.INPUT_BUFFER_USAGE, new Gauge<Long>() {
            @Override
            public Long getValue() {
                return InputBufferImpl.this.getUsage();
            }
        });
        safelyRegister(metricRegistry, GlobalMetricNames.INPUT_BUFFER_SIZE, constantGauge(ringBuffer.getBufferSize()));
 
        LOG.info("Initialized {} with ring size <{}> and wait strategy <{}>, running {} parallel message handlers.",
                this.getClass().getSimpleName(),
                configuration.getInputBufferRingSize(),
                configuration.getInputBufferWaitStrategy().getClass().getSimpleName(),
                numberOfHandlers);
    }

MessageEvent 参考处理

ProcessBuffer.java 通过inputbuffer 放到 ProcessBuffer 中
public void insertBlocking(@Nonnull RawMessage rawMessage) {
    final long sequence = ringBuffer.next();
    final MessageEvent event = ringBuffer.get(sequence);
    event.setRaw(rawMessage);
    ringBuffer.publish(sequence);
    afterInsert(1);
}

Message 提供的能力

后续MessageProcessor 处理的就是此类,同时后续的output 也是使用的此消息进行存储处理

 

 

参考资料

https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/plugin/journal/RawMessage.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/plugin/Message.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/shared/buffers/RawMessageEvent.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/plugin/inputs/MessageInput.java

标签:java,graylog2,RawMessage,server,graylog,new,Message,final
From: https://www.cnblogs.com/rongfengliang/p/16755106.html

相关文章

  • 10-RabbitMQ核心API-其他[Binding, Queue, Message, Virtual host]
    Binding绑定关系Exchange和Exchange,Queue之间的连接关系Binding中可以包含RouteKey或者参数Queue消息队列,实际存储消息数据Durability:是否持久化,Durable......
  • graylog OutputRouter 简单说明
    OutputRouter核心是基于stream以及消息获取到实际消息的外部输出,方便后续的存储以及处理,实现上依赖了OutputRegistry存储了MessageOutput与Stream的关系,可以用来方便......
  • graylog MessageOutput 简单说明
    MessageOutput才是graylog真正写入日志到外部存储的地方不同的实现都依赖一个MessageQueueAcknowledger,对于已经处理的消息进行确认,确保不会多次执行扩展的子类  ......
  • graylog OutputBufferProcessor 简单说明
    OutputBufferProcessor对于输出处理比较重要,包含了路由(比如不同stream写到外部put,写到不同的外部MessageOutput中OutputBufferProcessor也是Disruptor的一个handler......
  • graylog OutputBuffer 简单说明
    因为graylog比较依赖buffer,所以对于输出,套路与process类似,技术上也依赖了Disruptor消息处理上依赖一个OutputBufferProcessor,与process一致,OutputBufferProcessor后边......
  • graylog 的InputBuffer 简单说明
    InputBuffer是graylog实际处理日志的部分,内部处理基于了Disruptor,同时还依赖了Journalling能力内部实际处理是依赖LocalKafka的(而且是推荐的)InputBuffer处理的几个......
  • graylog MessageInput 简单说明
    input是graylog处理消息的核心,内部对于网络的处理是基于了netty框架,消息的核心基类是MessageInput基类的能力如下图,包含了基本常用的配置,状态,以及input节点信息,同时......
  • graylog jprofiler docker 镜像
    主要是添加jprifler方便学习参考dockerfile很简单,添加文件就行了,具体jprofiler官方下载解压就行了FROMgraylog/graylog:4.3COPYjprofiler13.0.3//op......
  • graylog rest servcie 启动&集成说明
    参考处理模块定义Graylog2ModuleprotectedvoidaddSystemRestResource(Class<?>restResourceClass){systemRestResourceBinder().addBinding().toI......
  • SendMessage SendMessage 是应用程序和应用程序之间进行消息传递的主要手段之一
    Windows是一个消息驱动模式的系统,SendMessage是应用程序和应用程序之间进行消息传递的主要手段之一。由于SendMessage函数的参数选项过于繁多,因此很有必要作一个汇总,分......