首页 > 其他分享 >异源数据同步 → DataX 为什么要支持 kafka?

异源数据同步 → DataX 为什么要支持 kafka?

时间:2024-09-04 16:30:09浏览次数:6  
标签:String private Key 异源 DataX conf props put kafka

开心一刻

昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我 10 万,要我陪她去上海,我没同意

朋友评论道:你没同意,为什么在上海?

我回复到:上个月没同意

异源数据同步 → DataX 为什么要支持 kafka?_kafka

前情回顾

关于 DataX,官网有很详细的介绍,鄙人不才,也写过几篇文章

异构数据源同步之数据同步 → datax 改造,有点意思

异构数据源同步之数据同步 → datax 再改造,开始触及源码

异构数据源同步之数据同步 → DataX 使用细节

异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密

不了解的小伙伴可以按需去查看,所以了,DataX 就不做过多介绍了;官方提供了非常多的插件,囊括了绝大部分的数据源,基本可以满足我们日常需要,但数据源种类太多,DataX 插件不可能包含全部,比如 kafka,DataX 官方是没有提供读写插件的,大家知道为什么吗?你们如果对数据同步了解的比较多的话,一看到 kafka,第一反应往往想到的是 实时同步,而 DataX 针对的是 离线同步,所以 DataX 官方没提供 kafka 插件是不是也就能理解了?因为不合适嘛!

但如果客户非要离线同步也支持 kafka

异源数据同步 → DataX 为什么要支持 kafka?_数据源_02

你能怎么办?直接怼过去:实现不了?

异源数据同步 → DataX 为什么要支持 kafka?_ide_03

所以没得选,那就只能给 DataX 开发一套 kafka 插件了;基于 DataX插件开发宝典,插件开发起来还是非常简单的

kafkawriter

  1. 编程接口
    自定义 Kafkawriter 继承 DataX 的 Writer,实现 job、task 对应的接口即可
/**
 * @author 青石路
 */
public class KafkaWriter extends Writer {

    public static class Job extends Writer.Job {

        private Configuration conf = null;

        @Override
        public List<Configuration> split(int mandatoryNumber) {
            List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
            for (int i = 0; i < mandatoryNumber; i++) {
                configurations.add(this.conf.clone());
            }
            return configurations;
        }

        private void validateParameter() {
            this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
            this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
        }

        @Override
        public void init() {
            this.conf = super.getPluginJobConf();
            this.validateParameter();
        }


        @Override
        public void destroy() {

        }
    }

    public static class Task extends Writer.Task {
        private static final Logger logger = LoggerFactory.getLogger(Task.class);
        private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");

        private Producer<String, String> producer;
        private Configuration conf;
        private Properties props;
        private String fieldDelimiter;
        private List<String> columns;
        private String writeType;

        @Override
        public void init() {
            this.conf = super.getPluginJobConf();
            fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
            columns = conf.getList(Key.COLUMN, String.class);
            writeType = conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null);
            if (CollUtil.isEmpty(columns)) {
                throw DataXException.asDataXException(KafkaWriterErrorCode.REQUIRED_VALUE,
                        String.format("您提供配置文件有误,[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN));
            }

            props = new Properties();
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
            //这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
            props.put(ProducerConfig.ACKS_CONFIG, conf.getUnnecessaryValue(Key.ACK, "0", null));
            props.put(CommonClientConfigs.RETRIES_CONFIG, conf.getUnnecessaryValue(Key.RETRIES, "0", null));
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));

            Configuration saslConf = conf.getConfiguration(Key.SASL);
            if (ObjUtil.isNotNull(saslConf)) {
                logger.info("配置启用了SASL认证");
                props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaWriterErrorCode.REQUIRED_VALUE));
                props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaWriterErrorCode.REQUIRED_VALUE));
                String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaWriterErrorCode.REQUIRED_VALUE);
                String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaWriterErrorCode.REQUIRED_VALUE);
                props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
            }

            producer = new KafkaProducer<String, String>(props);
        }

        @Override
        public void prepare() {
            if (Boolean.parseBoolean(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {

                ListTopicsResult topicsResult = AdminClient.create(props).listTopics();
                String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);

                try {
                    if (!topicsResult.names().get().contains(topic)) {
                        new NewTopic(
                                topic,
                                Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
                                Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
                        );
                        List<NewTopic> newTopics = new ArrayList<NewTopic>();
                        AdminClient.create(props).createTopics(newTopics);
                    }
                } catch (Exception e) {
                    throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
                }
            }
        }

        @Override
        public void startWrite(RecordReceiver lineReceiver) {
            logger.info("start to writer kafka");
            Record record = null;
            while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
                //获取一行数据,按照指定分隔符 拼成字符串 发送出去
                if (writeType.equalsIgnoreCase(WriteType.TEXT.name())) {
                    producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                            recordToString(record),
                            recordToString(record))
                    );
                } else if (writeType.equalsIgnoreCase(WriteType.JSON.name())) {
                    producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                            recordToString(record),
                            recordToKafkaJson(record))
                    );
                }
                producer.flush();
            }
        }

        @Override
        public void destroy() {
            logger.info("producer close");
            if (producer != null) {
                producer.close();
            }
        }

        /**
         * 数据格式化
         *
         * @param record
         * @return
         */
        private String recordToString(Record record) {
            int recordLength = record.getColumnNumber();
            if (0 == recordLength) {
                return NEWLINE_FLAG;
            }
            Column column;
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < recordLength; i++) {
                column = record.getColumn(i);
                sb.append(column.asString()).append(fieldDelimiter);
            }

            sb.setLength(sb.length() - 1);
            sb.append(NEWLINE_FLAG);

            return sb.toString();
        }

        private String recordToKafkaJson(Record record) {
            int recordLength = record.getColumnNumber();
            if (recordLength != columns.size()) {
                throw DataXException.asDataXException(KafkaWriterErrorCode.ILLEGAL_PARAM,
                        String.format("您提供配置文件有误,列数不匹配[record columns=%d, writer columns=%d]", recordLength, columns.size()));
            }
            List<KafkaColumn> kafkaColumns = new ArrayList<>();
            for (int i = 0; i < recordLength; i++) {
                KafkaColumn column = new KafkaColumn(record.getColumn(i), columns.get(i));
                kafkaColumns.add(column);
            }
            return JSONUtil.toJsonStr(kafkaColumns);
        }
    }
}

DataX 框架按照如下的顺序执行 Job 和 Task 的接口

异源数据同步 → DataX 为什么要支持 kafka?_数据源_04

重点看 Task 的接口实现

  • init:读取配置项,然后创建 Producer 实例
  • prepare:判断 Topic 是否存在,不存在则创建
  • startWrite:通过 RecordReceiver 从 Channel 获取 Record,然后写入 Topic
    支持两种写入格式:textjson,细节请看下文中的 kafkawriter.md
  • destroy:关闭 Producer 实例

实现不难,相信大家都能看懂

  1. 插件定义
    resources 下新增 plugin.json
{
    "name": "kafkawriter",
    "class": "com.qsl.datax.plugin.writer.kafkawriter.KafkaWriter",
    "description": "write data to kafka",
    "developer": "qsl"
}

强调下 class,是 KafkaWriter 的全限定类名,如果你们没有完全拷贝我的,那么要改成你们自己的

  1. 配置文件
    resources 下新增 plugin_job_template.json
{
    "name": "kafkawriter",
    "parameter": {
        "bootstrapServers": "",
        "topic": "",
        "ack": "all",
        "batchSize": 1000,
        "retries": 0,
        "fieldDelimiter": ",",
        "writeType": "json",
        "column": [
            "const_id",
            "const_field",
            "const_field_value"
        ],
        "sasl": {
            "securityProtocol": "SASL_PLAINTEXT",
            "mechanism": "PLAIN",
            "username": "",
            "password": ""
        }
    }
}

配置项说明:kafkawriter.md

  1. 打包发布
    可以参考官方的 assembly 配置,利用 assembly 来打包

至此,kafkawriter 就算完成了

kafkareader

  1. 编程接口
    自定义 Kafkareader 继承 DataX 的 Reader,实现 job、task 对应的接口即可
/**
 * @author 青石路
 */
public class KafkaReader extends Reader {

    public static class Job extends Reader.Job {

        private Configuration originalConfig = null;

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
            this.validateParameter();
        }

        @Override
        public void destroy() {

        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            List<Configuration> configurations = new ArrayList<>(adviceNumber);
            for (int i=0; i<adviceNumber; i++) {
                configurations.add(this.originalConfig.clone());
            }
            return configurations;
        }

        private void validateParameter() {
            this.originalConfig.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE);
            this.originalConfig.getNecessaryValue(Key.TOPIC, KafkaReaderErrorCode.REQUIRED_VALUE);
        }
    }

    public static class Task extends Reader.Task {

        private static final Logger logger = LoggerFactory.getLogger(Task.class);

        private Consumer<String, String> consumer;
        private String topic;
        private Configuration conf;
        private int maxPollRecords;
        private String fieldDelimiter;
        private String readType;
        private List<Column.Type> columnTypes;

        @Override
        public void destroy() {
            logger.info("consumer close");
            if (Objects.nonNull(consumer)) {
                consumer.close();
            }
        }

        @Override
        public void init() {
            this.conf = super.getPluginJobConf();
            this.topic = conf.getString(Key.TOPIC);
            this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);
            fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
            readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);
            if (!ReadType.JSON.name().equalsIgnoreCase(readType)
                    && !ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                        String.format("您提供配置文件有误,不支持的readType[%s]", readType));
            }
            if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                List<String> columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);
                if (CollUtil.isEmpty(columnTypeList)) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                            String.format("您提供配置文件有误,readType是JSON时[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN_TYPE));
                }
                convertColumnType(columnTypeList);
            }
            Properties props = new Properties();
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
            props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            Configuration saslConf = conf.getConfiguration(Key.SASL);
            if (ObjUtil.isNotNull(saslConf)) {
                logger.info("配置启用了SASL认证");
                props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
                props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
                String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
                String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
                props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
            }
            consumer = new KafkaConsumer<>(props);
        }

        @Override
        public void startRead(RecordSender recordSender) {
            consumer.subscribe(CollUtil.newArrayList(topic));
            int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);
            int retries = conf.getInt(Key.RETRIES, 5);
            if (retries < 0) {
                logger.info("joinGroupSuccessRetries 配置有误[{}], 重置成默认值[5]", retries);
                retries = 5;
            }
            /**
             * consumer 每次都是新创建,第一次poll时会重新加入消费者组,加入过程会进行Rebalance,而 Rebalance 会导致同一 Group 内的所有消费者都不能工作
             * 所以 poll 拉取的过程中,即使topic中有数据也不一定能拉到,因为 consumer 正在加入消费者组中
             * kafka-clients 没有对应的API、事件机制来知道 consumer 成功加入消费者组的确切时间
             * 故增加重试
             */
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
            int i = 0;
            if (CollUtil.isEmpty(records)) {
                for (; i < retries; i++) {
                    records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                    logger.info("第 {} 次重试,获取消息记录数[{}]", i + 1, records.count());
                    if (!CollUtil.isEmpty(records)) {
                        break;
                    }
                }
            }
            if (i >= retries) {
                logger.info("重试 {} 次后,仍未获取到消息,请确认是否有数据、配置是否正确", retries);
                return;
            }
            transferRecord(recordSender, records);
            do {
                records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                transferRecord(recordSender, records);
            } while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);
        }

        private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) {
            if (CollUtil.isEmpty(records)) {
                return;
            }
            for (ConsumerRecord<String, String> record : records) {
                Record sendRecord = recordSender.createRecord();
                String msgValue = record.value();
                if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                    transportJsonToRecord(sendRecord, msgValue);
                } else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                    // readType = text,全当字符串类型处理
                    String[] columnValues = msgValue.split(fieldDelimiter);
                    for (String columnValue : columnValues) {
                        sendRecord.addColumn(new StringColumn(columnValue));
                    }
                }
                recordSender.sendToWriter(sendRecord);
            }
            consumer.commitAsync();
        }

        private void convertColumnType(List<String> columnTypeList) {
            columnTypes = new ArrayList<>();
            for (String columnType : columnTypeList) {
                switch (columnType.toUpperCase()) {
                    case "STRING":
                        columnTypes.add(Column.Type.STRING);
                        break;
                    case "LONG":
                        columnTypes.add(Column.Type.LONG);
                        break;
                    case "DOUBLE":
                        columnTypes.add(Column.Type.DOUBLE);
                    case "DATE":
                        columnTypes.add(Column.Type.DATE);
                        break;
                    case "BOOLEAN":
                        columnTypes.add(Column.Type.BOOL);
                        break;
                    case "BYTES":
                        columnTypes.add(Column.Type.BYTES);
                        break;
                    default:
                        throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnType));
                }
            }
        }

        private void transportJsonToRecord(Record sendRecord, String msgValue) {
            List<KafkaColumn> kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);
            if (columnTypes.size() != kafkaColumns.size()) {
                throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                        String.format("您提供的配置文件有误,readType是JSON时[%s列数=%d]与[json列数=%d]的数量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));
            }
            for (int i=0; i<columnTypes.size(); i++) {
                KafkaColumn kafkaColumn = kafkaColumns.get(i);
                switch (columnTypes.get(i)) {
                    case STRING:
                        sendRecord.setColumn(i, new StringColumn(kafkaColumn.getColumnValue()));
                        break;
                    case LONG:
                        sendRecord.setColumn(i, new LongColumn(kafkaColumn.getColumnValue()));
                        break;
                    case DOUBLE:
                        sendRecord.setColumn(i, new DoubleColumn(kafkaColumn.getColumnValue()));
                        break;
                    case DATE:
                        // 暂只支持时间戳
                        sendRecord.setColumn(i, new DateColumn(Long.parseLong(kafkaColumn.getColumnValue())));
                        break;
                    case BOOL:
                        sendRecord.setColumn(i, new BoolColumn(kafkaColumn.getColumnValue()));
                        break;
                    case BYTES:
                        sendRecord.setColumn(i, new BytesColumn(kafkaColumn.getColumnValue().getBytes(StandardCharsets.UTF_8)));
                        break;
                    default:
                        throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnTypes.get(i)));
                }
            }
        }
    }
}

重点看 Task 的接口实现

  • init:读取配置项,然后创建 Consumer 实例
  • startWrite:从 Topic 拉取数据,通过 RecordSender 写入到 Channel 中
    这里有几个细节需要注意下
  1. Consumer 每次都是新创建的,拉取数据的时候,如果消费者还未加入到指定的消费者组中,那么它会先加入到消费者组中,加入过程会进行 Rebalance,而 Rebalance 会导致同一消费者组内的所有消费者都不能工作,此时即使 Topic 中有可拉取的消息,也拉取不到消息,所以引入了重试机制来尽量保证那一次同步任务拉取的时候,消费者能正常拉取消息
  2. 一旦 Consumer 拉取到消息,则会循环拉取消息,如果某一次的拉取数据量小于最大拉取量(maxPollRecords),说明 Topic 中的消息已经被拉取完了,那么循环终止;这与常规使用(Consumer 会一直主动拉取或被动接收)是有差别的
  3. 支持两种读取格式:textjson,细节请看下文的配置文件说明
  4. 为了保证写入 Channel 数据的完整,需要配置列的数据类型(DataX 的数据类型)
  • destroy:
    关闭 Consumer 实例
  1. 插件定义
    resources 下新增 plugin.json
{
    "name": "kafkareader",
    "class": "com.qsl.datax.plugin.reader.kafkareader.KafkaReader",
    "description": "read data from kafka",
    "developer": "qsl"
}

classKafkaReader 的全限定类名

  1. 配置文件
    resources 下新增 plugin_job_template.json
{
    "name": "kafkareader",
    "parameter": {
        "bootstrapServers": "",
        "topic": "test-kafka",
        "groupId": "test1",
        "writeType": "json",
        "pollTimeoutMs": 2000,
        "columnType": [
            "LONG",
            "STRING",
            "STRING"
        ],
        "sasl": {
            "securityProtocol": "SASL_PLAINTEXT",
            "mechanism": "PLAIN",
            "username": "",
            "password": "2"
        }
    }
}

配置项说明:kafkareader.md

  1. 打包发布
    可以参考官方的 assembly 配置,利用 assembly 来打包

至此,kafkareader 也完成了

总结

  1. 完整代码:qsl-datax
  2. kafkareader 重试机制只能降低拉取不到数据的概率,并不能杜绝;另外,如果上游一直往 Topic 中发消息,kafkareader 每次拉取的数据量都等于最大拉取量,那么同步任务会一直进行而不会停止,这还是离线同步吗?
  3. 离线同步,不推荐走 kafka,因为用 kafka 走实时同步更香



标签:String,private,Key,异源,DataX,conf,props,put,kafka
From: https://blog.51cto.com/u_13423706/11918689

相关文章

  • 【大数据】Kafka与RocketMQ:消息队列界的“绝代双骄”
    文章目录一、开场白:消息队列江湖的“风云际会”二、正文1.Kafka与RocketMQ的由来:两颗璀璨的明星2.发展历程:各自的成长轨迹3.区别:各有千秋,各领风骚4.使用场景:谁的主场,谁的地盘?5.如何选择:挑花了眼怎么办?6.市场占用情况:谁更受欢迎?三、结尾:携手共创,消息队列的未来......
  • python操作kafka
    一、参考阿里云的官方链接:        使用PythonSDK接入Kafka收发消息_云消息队列Kafka版(Kafka)-阿里云帮助中心二、安装python环境  三、添加python依赖库pipinstallconfluent-kafka==1.9.2四、新建一个setting.py文件配置信息kafka_setting={'sas......
  • Apache zookeeper kafka 开启SASL安全认证_kafka开启认证
    如果使用PLAIN认证有个问题,就是不能动态新增用户,每次添加用户后,需要重启正在运行的Kafka集群才能生效。因此,在生产环境中,这种认证方式不符合实际业务场景,不利于后期扩展。然而使用SCRAM认证,可以动态新增用户,添加用户后,可以不用重启正在运行的Kafka集群即可进行鉴权。所以生产环境......
  • kafka基础知识(持续更新中~)
    #broker.id属性在kafka集群中必须要是唯⼀broker.id=0#kafka部署的机器ip和提供服务的端⼝号listeners=PLAINTEXT://192.168.65.60:9092#kafka的消息存储⽂件log.dir=/usr/local/data/kafka-logs#kafka连接zookeeper的地址zookeeper.connect=192.168.65.60:2181./......
  • Kafka 常用的传输和序列化数据方式
    Kafka常用的传输和序列化数据方式。不同的方式有不同的优缺点,选择哪种方式通常取决于具体的应用场景、性能要求、数据兼容性需求等。以下是几种常见的方式,包括:1.ProtoBuf(ProtocolBuffers)概述:ProtoBuf是Google开发的一种语言中立、平台中立的高效二进制序列化格......
  • 一场 Kafka CRC 异常引发的血案rD
    一、问题概述客户的生产环境突然在近期间歇式的收到了KafkaCRC的相关异常,异常内容如下Recordbatchforpartitionskywalking-traces-0atoffset292107075isinvalid,cause:Recordiscorrupt(storedcrc=1016021496,computecrc=1981017560)JAVA复制全屏报错......
  • 一场 Kafka CRC 异常引发的血案
    一、问题概述客户的生产环境突然在近期间歇式的收到了KafkaCRC的相关异常,异常内容如下Recordbatchforpartitionskywalking-traces-0atoffset292107075isinvalid,cause:Recordiscorrupt(storedcrc=1016021496,computecrc=1981017560)报错没有规律性,有可......
  • [Kafka]binlog kafka并行消费提升小窍门
      线上库存Process实例配置详情:  背景:1.业务是通过监听上游mysqlbinlog完成的2.binlog是通过DDHkafka下发的3.consumer消费已经做到了7ms的单条消息消费性能优化4.怎样还能再提醒消费方的消费速率呢?5.当先consumer实例蓝绿组共12个实例2C2G(CPU2......
  • DataX + DataXWeb 初使用过程记录
    版本:DataXv202309 DataXWeb2.1.3预发布版DataX:Github:https://github.com/alibaba/DataX 功能介绍文档:https://github.com/alibaba/DataX/blob/master/introduction.md文档上虽然只写了Linux系统,但实际部署Windows也可以JDK版本使用1.8即可Python如果环境的版本可以选......
  • Kafka事务实现原理
    1Kafka的事务V.SRocketMQRocketMQ事务主要解决问题:确保执行本地事务和发消息这俩操作都成功/失败。RocketMQ还有事务反查机制兜底,更提高事务执行的成功率和数据一致性。而Kafka事务,是为确保在一个事务中发送的多条消息,要么都成功,要么都失败。这里的多条消息不一定在同一个top......