首页 > 其他分享 >graylog OutputBufferProcessor 简单说明

graylog OutputBufferProcessor 简单说明

时间:2022-10-08 13:09:48浏览次数:81  
标签:OutputBufferProcessor LOG graylog 简单 msg output null final graylog2

OutputBufferProcessor 对于输出处理比较重要,包含了路由(比如不同stream 写到外部put,写到不同的外部MessageOutput 中
OutputBufferProcessor 也是Disruptor 的一个handler

参考处理

 

public void onEvent(MessageEvent event) throws Exception {
incomingMessages.mark();

final Message msg = event.getMessage();
if (msg == null) {
LOG.debug("Skipping null message.");
return;
}
LOG.debug("Processing message <{}> from OutputBuffer.", msg.getId());
// 消息的路由
final Set<MessageOutput> messageOutputs = outputRouter.getStreamOutputsForMessage(msg);
msg.recordCounter(serverStatus, "matched-outputs", messageOutputs.size());
// 处理消息
final Future<?> defaultOutputCompletion = processMessage(msg, defaultMessageOutput);

final CountDownLatch streamOutputsDoneSignal = new CountDownLatch(messageOutputs.size());
for (final MessageOutput output : messageOutputs) {
processMessage(msg, output, streamOutputsDoneSignal);
}

// Wait until all writer threads for stream outputs have finished or timeout is reached.
if (!streamOutputsDoneSignal.await(configuration.getOutputModuleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.warn("Timeout reached. Not waiting any longer for stream output writer threads to complete.");
}

// now block until the default output has finished. most batching outputs will already been done because their
// fast path is really fast (usually an insert into a queue), but the slow flush path might block for a long time
// this exerts the back pressure to the system
if (defaultOutputCompletion != null) {
Uninterruptibles.getUninterruptibly(defaultOutputCompletion);
} else {
LOG.error("The default output future was null, this is a bug!");
}

if (msg.hasRecordings()) {
LOG.debug("Message event trace: {}", msg.recordingsAsString());
}

outputThroughput.inc();

LOG.debug("Wrote message <{}> to all outputs. Finished handling.", msg.getId());

event.clearMessages();
}
incomingMessages.mark();


processMessage 处理,核心是写到MessageOutput中

private Future<?> processMessage(final Message msg, final MessageOutput output, final CountDownLatch doneSignal) {
if (output == null) {
LOG.error("Output was null!");
doneSignal.countDown();
return Futures.immediateCancelledFuture();
}
if (!output.isRunning()) {
LOG.debug("Skipping stopped output {}", output.getClass().getName());
doneSignal.countDown();
return Futures.immediateCancelledFuture();
}

Future<?> future = null;
try {
LOG.debug("Writing message to [{}].", output.getClass());
if (LOG.isTraceEnabled()) {
LOG.trace("Message id for [{}]: <{}>", output.getClass(), msg.getId());
}
future = executor.submit(new Runnable() {
@Override
public void run() {
try (Timer.Context ignored = processTime.time()) {
output.write(msg);
} catch (Exception e) {
LOG.error("Error in output [" + output.getClass() + "].", e);
} finally {
doneSignal.countDown();
}
}
});
} catch (Exception e) {
LOG.error("Could not write message batch to output [" + output.getClass() + "].", e);
doneSignal.countDown();
}
return future;
}
if (output == null) {


参考资料

​https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java​​​
​​​https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/plugin/outputs/MessageOutput.java​

标签:OutputBufferProcessor,LOG,graylog,简单,msg,output,null,final,graylog2
From: https://blog.51cto.com/rongfengliang/5737208

相关文章

  • graylog MessageOutput 简单说明
    MessageOutput才是graylog真正写入日志到外部存储的地方不同的实现都依赖一个MessageQueueAcknowledger,对于已经处理的消息进行确认,确保不会多次执行扩展的子类  参考资......
  • graylog OutputBuffer 简单说明
    因为graylog比较依赖buffer,所以对于输出,套路与process类似,技术上也依赖了Disruptor消息处理上依赖一个OutputBufferProcessor,与process一致,OutputBufferProcessor后边说......
  • graylog ProcessBufferProcessor 简单说明
    ProcessBufferProcessor的核心是进行消息处理,会调用消息处理器,同时会将消息放到OutputBuffer中ProcessBufferProcessor实际上是ProcessBuffer中Disruptor的一个handl......
  • graylog 的InputBuffer 简单说明
    InputBuffer是graylog实际处理日志的部分,内部处理基于了Disruptor,同时还依赖了Journalling能力内部实际处理是依赖LocalKafka的(而且是推荐的)InputBuffer处理的几个模......
  • graylog ProcessBuffer 简单说明
    graylog在消息进入之后(对应input),选择不同的handler处理之后,会到不同的buffer中对于DirectMessageHandler的会到ProcessBuffer中,对于是否开启了Journalling,处理会不一......
  • graylog MessageInput 简单说明
    input是graylog处理消息的核心,内部对于网络的处理是基于了netty框架,消息的核心基类是MessageInput基类的能力如下图,包含了基本常用的配置,状态,以及input节点信息,同时还包......
  • C#一个简单的解析csv文件的方法
    可以将csv转成具体的类型对象,没有用序列化,需要传入转成函数手动编写类型转换代码,代码比较简单publicclassModel{publicint序号{get;set;}......
  • 力扣599(java&python)- 两个列表的最小索引总和(简单)
    题目:假设Andy和Doris想在晚餐时选择一家餐厅,并且他们都有一个表示最喜爱餐厅的列表,每个餐厅的名字用字符串表示。你需要帮助他们用最少的索引和找出他们共同喜爱的餐......
  • xm-select简单使用一例
    HTML<divid="admin_ids"class="xm-select-demo"></div><scriptsrc="/Public/js/xm-select.js"></script><script> varadmin_ids=xmSelect.render({ el:'#adm......
  • voltus的IR drop分析(简单流程)
    一:文件准备voltus的环境文件配置1:pgv库配置pgv库分别配置std库;mem库;IP库配置库命令:libgen_stdcell.tclread_lib-lefset_pg_library_mode\ -ground_pins *......