首页 > 其他分享 >graylog OutputBufferProcessor 简单说明

graylog OutputBufferProcessor 简单说明

时间:2022-10-04 09:55:58浏览次数:73  
标签:OutputBufferProcessor LOG graylog 简单 msg output null final graylog2

OutputBufferProcessor 对于输出处理比较重要,包含了路由(比如不同stream 写到外部put,写到不同的外部MessageOutput 中
OutputBufferProcessor 也是Disruptor 的一个handler

参考处理

 
public void onEvent(MessageEvent event) throws Exception {
        incomingMessages.mark();
 
        final Message msg = event.getMessage();
        if (msg == null) {
            LOG.debug("Skipping null message.");
            return;
        }
        LOG.debug("Processing message <{}> from OutputBuffer.", msg.getId());
       // 消息的路由
        final Set<MessageOutput> messageOutputs = outputRouter.getStreamOutputsForMessage(msg);
        msg.recordCounter(serverStatus, "matched-outputs", messageOutputs.size());
       // 处理消息
        final Future<?> defaultOutputCompletion = processMessage(msg, defaultMessageOutput);
 
        final CountDownLatch streamOutputsDoneSignal = new CountDownLatch(messageOutputs.size());
        for (final MessageOutput output : messageOutputs) {
            processMessage(msg, output, streamOutputsDoneSignal);
        }
 
        // Wait until all writer threads for stream outputs have finished or timeout is reached.
        if (!streamOutputsDoneSignal.await(configuration.getOutputModuleTimeout(), TimeUnit.MILLISECONDS)) {
            LOG.warn("Timeout reached. Not waiting any longer for stream output writer threads to complete.");
        }
 
        // now block until the default output has finished. most batching outputs will already been done because their
        // fast path is really fast (usually an insert into a queue), but the slow flush path might block for a long time
        // this exerts the back pressure to the system
        if (defaultOutputCompletion != null) {
            Uninterruptibles.getUninterruptibly(defaultOutputCompletion);
        } else {
            LOG.error("The default output future was null, this is a bug!");
        }
 
        if (msg.hasRecordings()) {
            LOG.debug("Message event trace: {}", msg.recordingsAsString());
        }
 
        outputThroughput.inc();
 
        LOG.debug("Wrote message <{}> to all outputs. Finished handling.", msg.getId());
 
        event.clearMessages();
    }

processMessage 处理,核心是写到MessageOutput中

   private Future<?> processMessage(final Message msg, final MessageOutput output, final CountDownLatch doneSignal) {
        if (output == null) {
            LOG.error("Output was null!");
            doneSignal.countDown();
            return Futures.immediateCancelledFuture();
        }
        if (!output.isRunning()) {
            LOG.debug("Skipping stopped output {}", output.getClass().getName());
            doneSignal.countDown();
            return Futures.immediateCancelledFuture();
        }
 
        Future<?> future = null;
        try {
            LOG.debug("Writing message to [{}].", output.getClass());
            if (LOG.isTraceEnabled()) {
                LOG.trace("Message id for [{}]: <{}>", output.getClass(), msg.getId());
            }
            future = executor.submit(new Runnable() {
                @Override
                public void run() {
                    try (Timer.Context ignored = processTime.time()) {
                        output.write(msg);
                    } catch (Exception e) {
                        LOG.error("Error in output [" + output.getClass() + "].", e);
                    } finally {
                        doneSignal.countDown();
                    }
                }
            });
        } catch (Exception e) {
            LOG.error("Could not write message batch to output [" + output.getClass() + "].", e);
            doneSignal.countDown();
        }
        return future;
    }

参考资料

https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/plugin/outputs/MessageOutput.java

标签:OutputBufferProcessor,LOG,graylog,简单,msg,output,null,final,graylog2
From: https://www.cnblogs.com/rongfengliang/p/16753280.html

相关文章

  • graylog OutputBuffer 简单说明
    因为graylog比较依赖buffer,所以对于输出,套路与process类似,技术上也依赖了Disruptor消息处理上依赖一个OutputBufferProcessor,与process一致,OutputBufferProcessor后边......
  • 一个简单的看板娘
    添加一个看板娘一、博客想要添加一个看板娘需要三个条件:①准备原材料(原材料可以认为是看板娘的模型,本次看板娘较为简单)②准备CSS/JSON文件 ③网页的HTML代码 二、......
  • 学会 Git 02: Git 分支管理之简单操作分支进行开发
    Git分支管理:简单操作分支进行开发什么是分支?上一篇,我们gitbranch显示的信息就是本地数据库中的各个分支名。分支是为了在多人协作的时候,防止互相干扰,协同开发而诞生......
  • 力扣205(java)-同构字符串(简单)
    题目:给定两个字符串 s 和 t ,判断它们是否是同构的。如果 s 中的字符可以按某种映射关系替换得到 t ,那么这两个字符串是同构的。每个出现的字符都应当映射到另一......
  • 简单代码雨
    @echoofftitledigitalraincolor02setlocalENABLEDELAYEDEXPANSIONfor/l%%iin(0)do(set"line="for/l%%jin(1,1,80)do(set/aDown%%j-=2......
  • Feign的简单介绍及配置参数
    contextId用于区分实例,类似beanName作者:DATA_MONK​,转载请注明原文链接......
  • Nacos 注册中心简单使用
    config-service配置相关naming-service服务相关Nacos也基地版获取host下服务作者:DATA_MONK​,转载请注明原文链接​......
  • Nacos 注册中心简单使用
    config-service配置相关naming-service服务相关Nacos也基地版获取host下服务作者:DATA_MONK​,转载请注明原文链接......
  • graylog 的InputBuffer 简单说明
    InputBuffer是graylog实际处理日志的部分,内部处理基于了Disruptor,同时还依赖了Journalling能力内部实际处理是依赖LocalKafka的(而且是推荐的)InputBuffer处理的几个......
  • graylog MessageInput 简单说明
    input是graylog处理消息的核心,内部对于网络的处理是基于了netty框架,消息的核心基类是MessageInput基类的能力如下图,包含了基本常用的配置,状态,以及input节点信息,同时......