首页 > 其他分享 >异源数据同步 → DataX 为什么要支持 kafka?

异源数据同步 → DataX 为什么要支持 kafka?

时间:2024-08-26 08:56:31浏览次数:13  
标签:String private Key 异源 DataX conf props kafka

开心一刻

昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我 10 万,要我陪她去上海,我没同意

朋友评论道:你没同意,为什么在上海?

我回复到:上个月没同意

嘴真硬

前情回顾

关于 DataX,官网有很详细的介绍,鄙人不才,也写过几篇文章

异构数据源同步之数据同步 → datax 改造,有点意思

异构数据源同步之数据同步 → datax 再改造,开始触及源码

异构数据源同步之数据同步 → DataX 使用细节

异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密

不了解的小伙伴可以按需去查看,所以了,DataX 就不做过多介绍了;官方提供了非常多的插件,囊括了绝大部分的数据源,基本可以满足我们日常需要,但数据源种类太多,DataX 插件不可能包含全部,比如 kafka,DataX 官方是没有提供读写插件的,大家知道为什么吗?你们如果对数据同步了解的比较多的话,一看到 kafka,第一反应往往想到的是 实时同步,而 DataX 针对的是 离线同步,所以 DataX 官方没提供 kafka 插件是不是也就能理解了?因为不合适嘛!

但如果客户非要离线同步也支持 kafka

人家要嘛

你能怎么办?直接怼过去:实现不了?

实现不了

所以没得选,那就只能给 DataX 开发一套 kafka 插件了;基于 DataX插件开发宝典,插件开发起来还是非常简单的

kafkawriter

  1. 编程接口

    自定义 Kafkawriter 继承 DataX 的 Writer,实现 job、task 对应的接口即可

    /**
     * @author 青石路
     */
    public class KafkaWriter extends Writer {
    
        public static class Job extends Writer.Job {
    
            private Configuration conf = null;
    
            @Override
            public List<Configuration> split(int mandatoryNumber) {
                List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
                for (int i = 0; i < mandatoryNumber; i++) {
                    configurations.add(this.conf.clone());
                }
                return configurations;
            }
    
            private void validateParameter() {
                this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
                this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
            }
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                this.validateParameter();
            }
    
    
            @Override
            public void destroy() {
    
            }
        }
    
        public static class Task extends Writer.Task {
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
            private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
    
            private Producer<String, String> producer;
            private Configuration conf;
            private Properties props;
            private String fieldDelimiter;
            private List<String> columns;
            private String writeType;
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
                columns = conf.getList(Key.COLUMN, String.class);
                writeType = conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null);
                if (CollUtil.isEmpty(columns)) {
                    throw DataXException.asDataXException(KafkaWriterErrorCode.REQUIRED_VALUE,
                            String.format("您提供配置文件有误,[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN));
                }
    
                props = new Properties();
                props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
                //这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
                props.put(ProducerConfig.ACKS_CONFIG, conf.getUnnecessaryValue(Key.ACK, "0", null));
                props.put(CommonClientConfigs.RETRIES_CONFIG, conf.getUnnecessaryValue(Key.RETRIES, "0", null));
                props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
                props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
    
                Configuration saslConf = conf.getConfiguration(Key.SASL);
                if (ObjUtil.isNotNull(saslConf)) {
                    logger.info("配置启用了SASL认证");
                    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaWriterErrorCode.REQUIRED_VALUE));
                    props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaWriterErrorCode.REQUIRED_VALUE));
                    String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaWriterErrorCode.REQUIRED_VALUE);
                    String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaWriterErrorCode.REQUIRED_VALUE);
                    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
                }
    
                producer = new KafkaProducer<String, String>(props);
            }
    
            @Override
            public void prepare() {
                if (Boolean.parseBoolean(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {
    
                    ListTopicsResult topicsResult = AdminClient.create(props).listTopics();
                    String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
    
                    try {
                        if (!topicsResult.names().get().contains(topic)) {
                            new NewTopic(
                                    topic,
                                    Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
                                    Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
                            );
                            List<NewTopic> newTopics = new ArrayList<NewTopic>();
                            AdminClient.create(props).createTopics(newTopics);
                        }
                    } catch (Exception e) {
                        throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
                    }
                }
            }
    
            @Override
            public void startWrite(RecordReceiver lineReceiver) {
                logger.info("start to writer kafka");
                Record record = null;
                while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
                    //获取一行数据,按照指定分隔符 拼成字符串 发送出去
                    if (writeType.equalsIgnoreCase(WriteType.TEXT.name())) {
                        producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                                recordToString(record),
                                recordToString(record))
                        );
                    } else if (writeType.equalsIgnoreCase(WriteType.JSON.name())) {
                        producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                                recordToString(record),
                                recordToKafkaJson(record))
                        );
                    }
                    producer.flush();
                }
            }
    
            @Override
            public void destroy() {
                logger.info("producer close");
                if (producer != null) {
                    producer.close();
                }
            }
    
            /**
             * 数据格式化
             *
             * @param record
             * @return
             */
            private String recordToString(Record record) {
                int recordLength = record.getColumnNumber();
                if (0 == recordLength) {
                    return NEWLINE_FLAG;
                }
                Column column;
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < recordLength; i++) {
                    column = record.getColumn(i);
                    sb.append(column.asString()).append(fieldDelimiter);
                }
    
                sb.setLength(sb.length() - 1);
                sb.append(NEWLINE_FLAG);
    
                return sb.toString();
            }
    
            private String recordToKafkaJson(Record record) {
                int recordLength = record.getColumnNumber();
                if (recordLength != columns.size()) {
                    throw DataXException.asDataXException(KafkaWriterErrorCode.ILLEGAL_PARAM,
                            String.format("您提供配置文件有误,列数不匹配[record columns=%d, writer columns=%d]", recordLength, columns.size()));
                }
                List<KafkaColumn> kafkaColumns = new ArrayList<>();
                for (int i = 0; i < recordLength; i++) {
                    KafkaColumn column = new KafkaColumn(record.getColumn(i), columns.get(i));
                    kafkaColumns.add(column);
                }
                return JSONUtil.toJsonStr(kafkaColumns);
            }
        }
    }
    

    DataX 框架按照如下的顺序执行 Job 和 Task 的接口

    job_task 接口执行顺序

    重点看 Task 的接口实现

    • init:读取配置项,然后创建 Producer 实例

    • prepare:判断 Topic 是否存在,不存在则创建

    • startWrite:通过 RecordReceiver 从 Channel 获取 Record,然后写入 Topic

      支持两种写入格式:textjson,细节请看下文中的 kafkawriter.md

    • destroy:关闭 Producer 实例

    实现不难,相信大家都能看懂

  2. 插件定义

    resources 下新增 plugin.json

    {
        "name": "kafkawriter",
        "class": "com.qsl.datax.plugin.writer.kafkawriter.KafkaWriter",
        "description": "write data to kafka",
        "developer": "qsl"
    }
    

    强调下 class,是 KafkaWriter 的全限定类名,如果你们没有完全拷贝我的,那么要改成你们自己的

  3. 配置文件

    resources 下新增 plugin_job_template.json

    {
        "name": "kafkawriter",
        "parameter": {
            "bootstrapServers": "",
            "topic": "",
            "ack": "all",
            "batchSize": 1000,
            "retries": 0,
            "fieldDelimiter": ",",
            "writeType": "json",
            "column": [
                "const_id",
                "const_field",
                "const_field_value"
            ],
            "sasl": {
                "securityProtocol": "SASL_PLAINTEXT",
                "mechanism": "PLAIN",
                "username": "",
                "password": ""
            }
        }
    }
    

    配置项说明:kafkawriter.md

  4. 打包发布

    可以参考官方的 assembly 配置,利用 assembly 来打包

至此,kafkawriter 就算完成了

kafkareader

  1. 编程接口

    自定义 Kafkareader 继承 DataX 的 Reader,实现 job、task 对应的接口即可

    /**
     * @author 青石路
     */
    public class KafkaReader extends Reader {
    
        public static class Job extends Reader.Job {
    
            private Configuration originalConfig = null;
    
            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();
                this.validateParameter();
            }
    
            @Override
            public void destroy() {
    
            }
    
            @Override
            public List<Configuration> split(int adviceNumber) {
                List<Configuration> configurations = new ArrayList<>(adviceNumber);
                for (int i=0; i<adviceNumber; i++) {
                    configurations.add(this.originalConfig.clone());
                }
                return configurations;
            }
    
            private void validateParameter() {
                this.originalConfig.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE);
                this.originalConfig.getNecessaryValue(Key.TOPIC, KafkaReaderErrorCode.REQUIRED_VALUE);
            }
        }
    
        public static class Task extends Reader.Task {
    
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
    
            private Consumer<String, String> consumer;
            private String topic;
            private Configuration conf;
            private int maxPollRecords;
            private String fieldDelimiter;
            private String readType;
            private List<Column.Type> columnTypes;
    
            @Override
            public void destroy() {
                logger.info("consumer close");
                if (Objects.nonNull(consumer)) {
                    consumer.close();
                }
            }
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                this.topic = conf.getString(Key.TOPIC);
                this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);
                fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
                readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);
                if (!ReadType.JSON.name().equalsIgnoreCase(readType)
                        && !ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                            String.format("您提供配置文件有误,不支持的readType[%s]", readType));
                }
                if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                    List<String> columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);
                    if (CollUtil.isEmpty(columnTypeList)) {
                        throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                                String.format("您提供配置文件有误,readType是JSON时[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN_TYPE));
                    }
                    convertColumnType(columnTypeList);
                }
                Properties props = new Properties();
                props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
                props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
                Configuration saslConf = conf.getConfiguration(Key.SASL);
                if (ObjUtil.isNotNull(saslConf)) {
                    logger.info("配置启用了SASL认证");
                    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
                    props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
                    String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
                    String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
                    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
                }
                consumer = new KafkaConsumer<>(props);
            }
    
            @Override
            public void startRead(RecordSender recordSender) {
                consumer.subscribe(CollUtil.newArrayList(topic));
                int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);
                int retries = conf.getInt(Key.RETRIES, 5);
                if (retries < 0) {
                    logger.info("joinGroupSuccessRetries 配置有误[{}], 重置成默认值[5]", retries);
                    retries = 5;
                }
                /**
                 * consumer 每次都是新创建,第一次poll时会重新加入消费者组,加入过程会进行Rebalance,而 Rebalance 会导致同一 Group 内的所有消费者都不能工作
                 * 所以 poll 拉取的过程中,即使topic中有数据也不一定能拉到,因为 consumer 正在加入消费者组中
                 * kafka-clients 没有对应的API、事件机制来知道 consumer 成功加入消费者组的确切时间
                 * 故增加重试
                 */
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                int i = 0;
                if (CollUtil.isEmpty(records)) {
                    for (; i < retries; i++) {
                        records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                        logger.info("第 {} 次重试,获取消息记录数[{}]", i + 1, records.count());
                        if (!CollUtil.isEmpty(records)) {
                            break;
                        }
                    }
                }
                if (i >= retries) {
                    logger.info("重试 {} 次后,仍未获取到消息,请确认是否有数据、配置是否正确", retries);
                    return;
                }
                transferRecord(recordSender, records);
                do {
                    records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                    transferRecord(recordSender, records);
                } while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);
            }
    
            private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) {
                if (CollUtil.isEmpty(records)) {
                    return;
                }
                for (ConsumerRecord<String, String> record : records) {
                    Record sendRecord = recordSender.createRecord();
                    String msgValue = record.value();
                    if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                        transportJsonToRecord(sendRecord, msgValue);
                    } else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                        // readType = text,全当字符串类型处理
                        String[] columnValues = msgValue.split(fieldDelimiter);
                        for (String columnValue : columnValues) {
                            sendRecord.addColumn(new StringColumn(columnValue));
                        }
                    }
                    recordSender.sendToWriter(sendRecord);
                }
                consumer.commitAsync();
            }
    
            private void convertColumnType(List<String> columnTypeList) {
                columnTypes = new ArrayList<>();
                for (String columnType : columnTypeList) {
                    switch (columnType.toUpperCase()) {
                        case "STRING":
                            columnTypes.add(Column.Type.STRING);
                            break;
                        case "LONG":
                            columnTypes.add(Column.Type.LONG);
                            break;
                        case "DOUBLE":
                            columnTypes.add(Column.Type.DOUBLE);
                        case "DATE":
                            columnTypes.add(Column.Type.DATE);
                            break;
                        case "BOOLEAN":
                            columnTypes.add(Column.Type.BOOL);
                            break;
                        case "BYTES":
                            columnTypes.add(Column.Type.BYTES);
                            break;
                        default:
                            throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                    String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnType));
                    }
                }
            }
    
            private void transportJsonToRecord(Record sendRecord, String msgValue) {
                List<KafkaColumn> kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);
                if (columnTypes.size() != kafkaColumns.size()) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                            String.format("您提供的配置文件有误,readType是JSON时[%s列数=%d]与[json列数=%d]的数量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));
                }
                for (int i=0; i<columnTypes.size(); i++) {
                    KafkaColumn kafkaColumn = kafkaColumns.get(i);
                    switch (columnTypes.get(i)) {
                        case STRING:
                            sendRecord.setColumn(i, new StringColumn(kafkaColumn.getColumnValue()));
                            break;
                        case LONG:
                            sendRecord.setColumn(i, new LongColumn(kafkaColumn.getColumnValue()));
                            break;
                        case DOUBLE:
                            sendRecord.setColumn(i, new DoubleColumn(kafkaColumn.getColumnValue()));
                            break;
                        case DATE:
                            // 暂只支持时间戳
                            sendRecord.setColumn(i, new DateColumn(Long.parseLong(kafkaColumn.getColumnValue())));
                            break;
                        case BOOL:
                            sendRecord.setColumn(i, new BoolColumn(kafkaColumn.getColumnValue()));
                            break;
                        case BYTES:
                            sendRecord.setColumn(i, new BytesColumn(kafkaColumn.getColumnValue().getBytes(StandardCharsets.UTF_8)));
                            break;
                        default:
                            throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                    String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnTypes.get(i)));
                    }
                }
            }
        }
    }
    

    重点看 Task 的接口实现

    • init:读取配置项,然后创建 Consumer 实例

    • startWrite:从 Topic 拉取数据,通过 RecordSender 写入到 Channel 中

      这里有几个细节需要注意下

      1. Consumer 每次都是新创建的,拉取数据的时候,如果消费者还未加入到指定的消费者组中,那么它会先加入到消费者组中,加入过程会进行 Rebalance,而 Rebalance 会导致同一消费者组内的所有消费者都不能工作,此时即使 Topic 中有可拉取的消息,也拉取不到消息,所以引入了重试机制来尽量保证那一次同步任务拉取的时候,消费者能正常拉取消息
      2. 一旦 Consumer 拉取到消息,则会循环拉取消息,如果某一次的拉取数据量小于最大拉取量(maxPollRecords),说明 Topic 中的消息已经被拉取完了,那么循环终止;这与常规使用(Consumer 会一直主动拉取或被动接收)是有差别的
      3. 支持两种读取格式:textjson,细节请看下文的配置文件说明
      4. 为了保证写入 Channel 数据的完整,需要配置列的数据类型(DataX 的数据类型)
    • destroy:

      关闭 Consumer 实例

  2. 插件定义

    resources 下新增 plugin.json

    {
        "name": "kafkareader",
        "class": "com.qsl.datax.plugin.reader.kafkareader.KafkaReader",
        "description": "read data from kafka",
        "developer": "qsl"
    }
    

    classKafkaReader 的全限定类名

  3. 配置文件

    resources 下新增 plugin_job_template.json

    {
        "name": "kafkareader",
        "parameter": {
            "bootstrapServers": "",
            "topic": "test-kafka",
            "groupId": "test1",
            "writeType": "json",
            "pollTimeoutMs": 2000,
            "columnType": [
                "LONG",
                "STRING",
                "STRING"
            ],
            "sasl": {
                "securityProtocol": "SASL_PLAINTEXT",
                "mechanism": "PLAIN",
                "username": "",
                "password": "2"
            }
        }
    }
    

    配置项说明:kafkareader.md

  4. 打包发布

    可以参考官方的 assembly 配置,利用 assembly 来打包

至此,kafkareader 也完成了

总结

  1. 完整代码:qsl-datax
  2. kafkareader 重试机制只能降低拉取不到数据的概率,并不能杜绝;另外,如果上游一直往 Topic 中发消息,kafkareader 每次拉取的数据量都等于最大拉取量,那么同步任务会一直进行而不会停止,这还是离线同步吗?
  3. 离线同步,不推荐走 kafka,因为用 kafka 走实时同步更香

标签:String,private,Key,异源,DataX,conf,props,kafka
From: https://www.cnblogs.com/youzhibing/p/18378097

相关文章

  • linux下试验中间件canal的example示例-binlog日志的实时获取显示以及阿里巴巴中间件ca
    一、linux下试验中间件canal的example示例-binlog日志的实时获取显示    今天重装mysql后,进行了canal的再次试验,原来用的mysql5.7,今天重装直接换了5.6算了。反正测试服务器的mysql也不常用。canal启动后日志显示examplepreparetofindstartpositionjustshowmaste......
  • rocketmq 是参考了 kafka架构, 为什么rocketmq吞吐量是10万/秒, kafka吞吐量是17万/秒
    我们都知道,为了防止消息在服务器丢失,一般都是进行持久化(保存在磁盘),在发送消失时那就涉及到从磁盘拷贝到内核空间,从内核空间到用户态,再从用户态到socket缓存区,从socket缓存区到网卡四次拷贝。kafka使用的是零拷贝-sendfile,把内核态数据发送到网卡,减少两次拷......
  • kafka
    消息队列的流派MQ是什么MessageQueue(MQ)是一种消息队列中间件。MQ的主要作用是通过分离消息的发送和接收来实现应用程序的异步和解耦。然而,MQ的核心目的是通信:它屏蔽了底层复杂的通信协议,并定义了一套更简单的应用层通信协议。在分布式系统中,模块间通信通常使用HTTP......
  • centos7安装Kafka单节点环境部署三-安装Logstash
    1、下载Logstashwgethttps://artifacts.elastic.co/downloads/logstash/logstash-7.17.7-linux-x86_64.tar.gz2、解压到/usr/local/mkdir-p/usr/local/logstash7.17tar-zxflogstash-7.17.7-linux-x86_64.tar.gz-C/usr/local/logstash7.17/--strip-components=1#--......
  • [消息队列]kafka
    Kafka如何保证消息的消费顺序?我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了2个消息,这2个消息对应的操作分别对应的数据库操作是:更改用户会员等级。根据会员等级计算订单价格。假如这两条消息的消费顺序不一样造成的最终结果就会......
  • 浅谈Kafka(一)
    浅谈Kafka(一)文章目录浅谈Kafka(一)Kafa的设计是什么样的数据传输的事务定义消息队列的应用场景Kafka怎么样判断节点是否存活Kafka的消息是采用pull模式还是push模式Kafka在磁盘上的消息格式Kafka高效文件存储设计特点Kafka与传统消息系统之间的区别Kafka的分区数据怎样保......
  • centos7安装Kafka单节点环境部署一-ZooKeeper安装与配置
    由于Kafka运行需要zookeeper配合,zookeeper需要运行在JVM上,所以需要安装JDK,zookeeper。Kafka从2.0.0版本开始就不再支持JDK7及以下版本,就以CentOS764位JDK8为例1、下载ZooKeeperwgethttps://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.ta......
  • 时间轮算法理解、Kafka实现
    概述TimingWheel,时间轮,简单理解就是一种用来存储若干个定时任务的环状队列(或数组),工作原理和钟表的表盘类似。关于环形队列,请参考环形队列。时间轮由两个部分组成,一个环状数组,一个遍历环状数组的指针。首先定义一个固定长度的环状数组,队列中的每一个元素代表一个时间格(可以精确......
  • kafka集群扩容
    环境:节点 zookeeper端口 kafka端口 备注172.17.0.81 12181 19092 原有节点172.17.0.82 12181 19092 原有节点172.17.0.83 12181 19092 原有节点172.17.0.90 12181 19092 扩容节点172.17.0.91 12181 19092 扩容节点步骤:一、扩容zookeeper节点#修改配置文件,需注意新节点my......
  • Linux CentOS 7 Kafka 单机版安装
    Kafka从2.6.0开始,默认使用Java11,3.0.0开始,不再支持Java8,详见:https://kafka.apache.org/downloadsProducer:消息生产者,就是向kafkabroker发消息的客户端:Consumer:消息消费者,向kafkabroker取消息的客户端;ConsumerGroup:消费者组,由多个consumer组成。消费者组......