首页 > 其他分享 >graylog 的journal message 实现简单说明

graylog 的journal message 实现简单说明

时间:2022-10-01 21:33:56浏览次数:73  
标签:kafkaLog graylog2 journal LocalKafkaJournal graylog Gauge new message

graylog 的journal message 做为graylog 背压的处理还是比较重要的一个模块,以下说明下具体实现

功能接口图

 

 

简单调用关系

 

 

启动入口

LocalKafkaJournal 类
核心部分如下(LocalKafkaJournal 构造函数部分),对于其他read 已经write 的使用了

 
try {
        final BrokerState brokerState = new BrokerState();
        brokerState.newState(RunningAsBroker.state());
        kafkaScheduler = new KafkaScheduler(2, "kafka-journal-scheduler-", false); // TODO make thread count configurable
        kafkaScheduler.startup();
       // 配置部分基于了graylog的 配置文件进行处理
        logManager = new LogManager(
                new File[]{journalDirectory.toFile()},
                Map$.MODULE$.<String, LogConfig>empty(),
                defaultConfig,
                cleanerConfig,
                NUM_IO_THREADS,
                SECONDS.toMillis(60L),
                SECONDS.toMillis(60L),
                SECONDS.toMillis(60L),
                kafkaScheduler, // Broker state
                brokerState,
                JODA_TIME);
 
        final TopicAndPartition topicAndPartition = new TopicAndPartition("messagejournal", 0);
        final Option<Log> messageLog = logManager.getLog(topicAndPartition);
        if (messageLog.isEmpty()) {
            kafkaLog = logManager.createLog(topicAndPartition, logManager.defaultConfig());
        } else {
            kafkaLog = messageLog.get();
        }
 
        // Set up more metrics
        this.metricRegistry.register(name(metricPrefix, METRIC_NAME_SIZE), (Gauge<Long>) kafkaLog::size);
        this.metricRegistry.register(name(metricPrefix, METRIC_NAME_LOG_END_OFFSET), (Gauge<Long>) kafkaLog::logEndOffset);
        this.metricRegistry.register(name(metricPrefix, METRIC_NAME_NUMBER_OF_SEGMENTS), (Gauge<Integer>) kafkaLog::numberOfSegments);
        this.metricRegistry.register(name(metricPrefix, METRIC_NAME_UNFLUSHED_MESSAGES), (Gauge<Long>) kafkaLog::unflushedMessages);
        this.metricRegistry.register(name(metricPrefix, METRIC_NAME_RECOVERY_POINT), (Gauge<Long>) kafkaLog::recoveryPoint);
        this.metricRegistry.register(name(metricPrefix, METRIC_NAME_LAST_FLUSH_TIME), (Gauge<Long>) kafkaLog::lastFlushTime);
 
        // must not be a lambda, because the serialization cannot determine the proper Metric type :(
        this.metricRegistry.register(getOldestSegmentMetricName(), (Gauge<Date>) new Gauge<Date>() {
            @Override
            public Date getValue() {
                long oldestSegment = Long.MAX_VALUE;
                for (final LogSegment segment : LocalKafkaJournal.this.getSegments()) {
                    oldestSegment = Math.min(oldestSegment, segment.created());
                }
 
                return new Date(oldestSegment);
            }
        });
 
        LOG.info("Initialized Kafka based journal at {}", journalDirectory);
 
        offsetFlusher = new OffsetFileFlusher();
        dirtyLogFlusher = new DirtyLogFlusher();
        recoveryCheckpointFlusher = new RecoveryCheckpointFlusher();
        logRetentionCleaner = new LogRetentionCleaner();
    } catch (KafkaException e) {
        // most likely failed to grab lock
        LOG.error("Unable to start logmanager.", e);
        throw new RuntimeException(e);
    }

使用LocalKafkaJournal的服务

如下图,这几个比较核心都依赖LocalKafkaJournal 服务,实际上从上边的调用链也可以看出来,内部与使用kafka 的模式以基本一样的

 

 

graylog 的cli 部分也是依赖LocalKafkaJournal ,核心是进行消息的一些处理,比如清理,查询信息。。。

参考资料

https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/shared/journal/LocalKafkaJournal.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/commands/journal/AbstractJournalCommand.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/shared/messageq/localkafka

标签:kafkaLog,graylog2,journal,LocalKafkaJournal,graylog,Gauge,new,message
From: https://www.cnblogs.com/rongfengliang/p/16747814.html

相关文章