OutputBufferProcessor 对于输出处理比较重要,包含了路由(比如不同stream 写到外部put,写到不同的外部MessageOutput 中
OutputBufferProcessor 也是Disruptor 的一个handler
参考处理
public void onEvent(MessageEvent event) throws Exception {
incomingMessages.mark();
final Message msg = event.getMessage();
if (msg == null) {
LOG.debug("Skipping null message.");
return;
}
LOG.debug("Processing message <{}> from OutputBuffer.", msg.getId());
// 消息的路由
final Set<MessageOutput> messageOutputs = outputRouter.getStreamOutputsForMessage(msg);
msg.recordCounter(serverStatus, "matched-outputs", messageOutputs.size());
// 处理消息
final Future<?> defaultOutputCompletion = processMessage(msg, defaultMessageOutput);
final CountDownLatch streamOutputsDoneSignal = new CountDownLatch(messageOutputs.size());
for (final MessageOutput output : messageOutputs) {
processMessage(msg, output, streamOutputsDoneSignal);
}
// Wait until all writer threads for stream outputs have finished or timeout is reached.
if (!streamOutputsDoneSignal.await(configuration.getOutputModuleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.warn("Timeout reached. Not waiting any longer for stream output writer threads to complete.");
}
// now block until the default output has finished. most batching outputs will already been done because their
// fast path is really fast (usually an insert into a queue), but the slow flush path might block for a long time
// this exerts the back pressure to the system
if (defaultOutputCompletion != null) {
Uninterruptibles.getUninterruptibly(defaultOutputCompletion);
} else {
LOG.error("The default output future was null, this is a bug!");
}
if (msg.hasRecordings()) {
LOG.debug("Message event trace: {}", msg.recordingsAsString());
}
outputThroughput.inc();
LOG.debug("Wrote message <{}> to all outputs. Finished handling.", msg.getId());
event.clearMessages();
}
incomingMessages.mark();
processMessage 处理,核心是写到MessageOutput中
private Future<?> processMessage(final Message msg, final MessageOutput output, final CountDownLatch doneSignal) {
if (output == null) {
LOG.error("Output was null!");
doneSignal.countDown();
return Futures.immediateCancelledFuture();
}
if (!output.isRunning()) {
LOG.debug("Skipping stopped output {}", output.getClass().getName());
doneSignal.countDown();
return Futures.immediateCancelledFuture();
}
Future<?> future = null;
try {
LOG.debug("Writing message to [{}].", output.getClass());
if (LOG.isTraceEnabled()) {
LOG.trace("Message id for [{}]: <{}>", output.getClass(), msg.getId());
}
future = executor.submit(new Runnable() {
@Override
public void run() {
try (Timer.Context ignored = processTime.time()) {
output.write(msg);
} catch (Exception e) {
LOG.error("Error in output [" + output.getClass() + "].", e);
} finally {
doneSignal.countDown();
}
}
});
} catch (Exception e) {
LOG.error("Could not write message batch to output [" + output.getClass() + "].", e);
doneSignal.countDown();
}
return future;
}
if (output == null) {
参考资料
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/plugin/outputs/MessageOutput.java