首页 > 其他分享 >消费样例

消费样例

时间:2024-11-11 14:21:37浏览次数:1  
标签:消费 java 样例 private record static import consumer

  1. kafka消费消息
  2. 多线程带重试功能的异步处理
  3. 错误补偿机制,当超过最大重试次数后,消息扔到数据库表中
  4. 拉取一批消息异步处理,批量提交ack
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class KafkaBatchConsumer {

    private static final String TOPIC = "your_topic";
    private static final String BOOTSTRAP_SERVERS = "your_kafka_bootstrap_servers";
    private static final String GROUP_ID = "your_group_id";
    private static final String JDBC_URL = "your_jdbc_url";
    private static final String JDBC_USER = "your_db_user";
    private static final String JDBC_PASSWORD = "your_db_password";

    private static KafkaConsumer<String, String> consumer;
    private static ExecutorService executorService;

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        executorService = Executors.newFixedThreadPool(10); // Adjust the pool size as needed

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            if (!records.isEmpty()) {
                AtomicInteger remainingTasks = new AtomicInteger(records.count());

                for (ConsumerRecord<String, String> record : records) {
                    CompletableFuture.runAsync(() -> processRecordWithRetry(record, 3), executorService)
                            .whenComplete((result, throwable) -> {
                                if (throwable != null) {
                                    System.err.println("Error processing record: " + throwable.getMessage());
                                }
                                if (remainingTasks.decrementAndGet() == 0) {
                                    consumer.commitSync(); // Commit offsets after all records in the batch are processed
                                }
                            });
                }
            }
        }
    }

    private static void processRecordWithRetry(ConsumerRecord<String, String> record, int maxRetries) {
        int attempt = 0;
        boolean success = false;
        while (attempt < maxRetries && !success) {
            try {
                processRecord(record);
                success = true;
            } catch (Exception e) {
                attempt++;
                System.err.println("Attempt " + attempt + " failed for record: " + record.value());
                if (attempt >= maxRetries) {
                    logInvalidRecord(record, e);
                } else {
                    try {
                        Thread.sleep(1000); // Optional: Wait between retries
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    private static void processRecord(ConsumerRecord<String, String> record) throws Exception {
        // Simulate processing time
        Thread.sleep(5000);
        // Implement your message processing logic here
        System.out.println("Processed record: " + record.value());
    }

    private static void logInvalidRecord(ConsumerRecord<String, String> record, Exception e) {
        // Log the invalid record to the database
        try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
            String sql = "INSERT INTO t_invalid_mq (topic, partition, offset, key, value, error_message) VALUES (?, ?, ?, ?, ?, ?)";
            try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
                pstmt.setString(1, record.topic());
                pstmt.setInt(2, record.partition());
                pstmt.setLong(3, record.offset());
                pstmt.setString(4, record.key());
                pstmt.setString(5, record.value());
                pstmt.setString(6, e.getMessage());
                pstmt.executeUpdate();
            }
        } catch (SQLException sqlException) {
            System.err.println("Failed to log invalid record: " + sqlException.getMessage());
        }
    }
}

标签:消费,java,样例,private,record,static,import,consumer
From: https://www.cnblogs.com/PythonOrg/p/18539588

相关文章

  • 2025年第五届消费电子与计算机工程国际学术会议(ICCECE 2025) 2025 5th International
    @目录一、会议详情二、重要信息三、大会介绍四、出席嘉宾五、征稿主题一、会议详情二、重要信息大会官网:https://ais.cn/u/vEbMBz三、大会介绍四、出席嘉宾五、征稿主题如想"投稿"请点击如下图片......
  • 生产消费者模型
               线程同步互斥锁(互斥量)条件变量生产/消费者模型一、互斥锁C++11提供了四种互斥锁:mutex:互斥锁。timed_mutex:带超时机制的互斥锁。recursive_mutex:递归互斥锁。recursive_timed_mutex:带超时机制的递归互斥锁。包含头文件:#include<mutex>1、mutex类......
  • 私域流量圈层在新消费时代的机遇与挑战:兼论开源 AI 智能名片、2 + 1 链动模式、S2B2C
    摘要:本文剖析了私域流量圈层在新消费时代呈现出的独特温度与信任优势,阐述了从传统销售到新消费转型中用户心理的变化。同时,强调了内容对于私域流量的关键作用,并分析开源AI智能名片、2+1链动模式、S2B2C商城小程序在私域流量发展中的应用,探讨私域流量作为流量优化方式的局......
  • 面向大规模队列,百万并发的多优先级消费系统设计
    引言HTTP是一种常用的通信协议,除了常见网站访问、上传下载,HTTP协议还经常被用在消息推送场景上。设想你搭建了一个电商平台,有很多大型商家入驻了该电商平台并售卖各类商品,在消费者购买某个商品后,平台会通过HTTP协议将消费者购买商品的信息通知商家,商家则会在后台接收平台推......
  • 双 11 如何实现高效营销,火山引擎 VeDI 助力实现消费者“量”“质”双提升
    自2009年至今,电商领域年度盛事的“双11”,已经历经了十五个年头,随着时间的推移,越来越多的平台和品牌商家,都加入到了这场狂欢当中。愈发垂直的品类、琳琅满目的商品、不断加码的促销玩法,让消费者们的可选择性越来越多。 那么,品牌商家应该如何在既有的“双11”玩法上推陈出新,......
  • 多生产者-多消费者问题
    多生产者-多消费者问题一、问题背景及分析这里的“多”指种类多而不是数量多。多种生产者和多种消费者共享同一片缓冲区,且一种消费者只接受特定的一种生产者生产的产品。​​‍二、问题实现​​同样,实现互斥的P操作一定要在实现同步的P操作之后,否则可能引起“死锁”......
  • 【专题】2024摇摆的消费者-消费者体验营销报告汇总PDF洞察(附原数据表)
    原文链接: https://tecdat.cn/?p=38173在当今经济社会的多元发展格局下,消费领域呈现出复杂且多变的态势。从日常购物到各类大宗商品消费,从国内市场到跨境交易,消费者的行为、需求以及市场趋势都在不断演变。一方面,消费者对于购物体验的重视程度愈发凸显,其不仅关注产品本身,更在意......
  • 数智化实践案例 | 数据赋能业务决策,探索消费品行业财务数智化转型
    爱慕股份是中国知名品牌企业,公司通过全渠道布局渗透细分市场以获取较好的市场份额。截至2020年12月31日,公司零售网络由2156个线下销售终端和以天猫、唯品会为主的线上渠道所组成,其中公司线下直营终端达1725个,2020年直营渠道贡献营收占比59.28%。公司的线下零售网点......
  • 【大数据学习 | kafka】消费者的分区分配规则
    1.概述上面我们提到过,消费者有的时候会少于或者多于分区的个数,那么如果消费者少了有的消费者要消费多个分区的数据,如果消费者多了,有的消费者就可能没有分区的数据消费。那么这个关系是如何分配的呢?现在我们知道kafka中存在一个coordinator可以管理这么一堆消费者,它可以帮......
  • 【大数据学习 | kafka】简述kafka的消费者consumer
    1.消费者的结构能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。这里面要涉及到一个动作叫做拉取。首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个......