graylog 的journal message 做为graylog 背压的处理还是比较重要的一个模块,以下说明下具体实现
功能接口图
简单调用关系
启动入口
LocalKafkaJournal 类
核心部分如下(LocalKafkaJournal 构造函数部分),对于其他read 已经write 的使用了
try {
final BrokerState brokerState = new BrokerState();
brokerState.newState(RunningAsBroker.state());
kafkaScheduler = new KafkaScheduler(2, "kafka-journal-scheduler-", false); // TODO make thread count configurable
kafkaScheduler.startup();
// 配置部分基于了graylog的 配置文件进行处理
logManager = new LogManager(
new File[]{journalDirectory.toFile()},
Map$.MODULE$.<String, LogConfig>empty(),
defaultConfig,
cleanerConfig,
NUM_IO_THREADS,
SECONDS.toMillis(60L),
SECONDS.toMillis(60L),
SECONDS.toMillis(60L),
kafkaScheduler, // Broker state
brokerState,
JODA_TIME);
final TopicAndPartition topicAndPartition = new TopicAndPartition("messagejournal", 0);
final Option<Log> messageLog = logManager.getLog(topicAndPartition);
if (messageLog.isEmpty()) {
kafkaLog = logManager.createLog(topicAndPartition, logManager.defaultConfig());
} else {
kafkaLog = messageLog.get();
}
// Set up more metrics
this.metricRegistry.register(name(metricPrefix, METRIC_NAME_SIZE), (Gauge<Long>) kafkaLog::size);
this.metricRegistry.register(name(metricPrefix, METRIC_NAME_LOG_END_OFFSET), (Gauge<Long>) kafkaLog::logEndOffset);
this.metricRegistry.register(name(metricPrefix, METRIC_NAME_NUMBER_OF_SEGMENTS), (Gauge<Integer>) kafkaLog::numberOfSegments);
this.metricRegistry.register(name(metricPrefix, METRIC_NAME_UNFLUSHED_MESSAGES), (Gauge<Long>) kafkaLog::unflushedMessages);
this.metricRegistry.register(name(metricPrefix, METRIC_NAME_RECOVERY_POINT), (Gauge<Long>) kafkaLog::recoveryPoint);
this.metricRegistry.register(name(metricPrefix, METRIC_NAME_LAST_FLUSH_TIME), (Gauge<Long>) kafkaLog::lastFlushTime);
// must not be a lambda, because the serialization cannot determine the proper Metric type :(
this.metricRegistry.register(getOldestSegmentMetricName(), (Gauge<Date>) new Gauge<Date>() {
@Override
public Date getValue() {
long oldestSegment = Long.MAX_VALUE;
for (final LogSegment segment : LocalKafkaJournal.this.getSegments()) {
oldestSegment = Math.min(oldestSegment, segment.created());
}
return new Date(oldestSegment);
}
});
LOG.info("Initialized Kafka based journal at {}", journalDirectory);
offsetFlusher = new OffsetFileFlusher();
dirtyLogFlusher = new DirtyLogFlusher();
recoveryCheckpointFlusher = new RecoveryCheckpointFlusher();
logRetentionCleaner = new LogRetentionCleaner();
} catch (KafkaException e) {
// most likely failed to grab lock
LOG.error("Unable to start logmanager.", e);
throw new RuntimeException(e);
}
使用LocalKafkaJournal的服务
如下图,这几个比较核心都依赖LocalKafkaJournal 服务,实际上从上边的调用链也可以看出来,内部与使用kafka 的模式以基本一样的
graylog 的cli 部分也是依赖LocalKafkaJournal ,核心是进行消息的一些处理,比如清理,查询信息。。。
参考资料
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/shared/journal/LocalKafkaJournal.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/commands/journal/AbstractJournalCommand.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/shared/messageq/localkafka