首页 > 其他分享 >mq消费者小技巧

mq消费者小技巧

时间:2022-11-11 17:13:08浏览次数:55  
标签:ConsumerConfig 技巧 void mq props put CONFIG public 消费者

1.指定Stop接口

public interface IStop {

    void close();
}

2.指定Consumer接口

public interface IConsumer extends IStop{

    void init();

    void start() throws Exception;
}

3.定义mq消费者kafka的消费类

@Component
public class ConsumerService extends Thread implements IConsumer {
	public void init() {
	// 进行初始化动作
		logger.info("app {}, token {}, nameserver {}, topic {}", app, token, nameserver, topic);
		this.processorList.add(new RedisProcessor());
		Properties props = new Properties();
		String jaasConfigFormat = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
		String jaasConfig = String.format(jaasConfigFormat, app, token);
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, nameserver);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, app);
		props.put(ConsumerConfig.CLIENT_ID_CONFIG, app);
		props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name());
		props.put(CommonClientConfigs.RETRIES_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name());
		props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
		props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100000);
		props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
		props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000000);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 30000);
		this.kafkaConsumer = new KafkaConsumer<>(props);
		kafkaConsumer.subscribe(Sets.newHashSet(topic));
	}
	@Override
	public synchronized void start() {
		super.setName("DeltaKafkaThreadService");
		super.start();
    }
	@Override
    public void run() {
        while (Stopper.isRunning()) {
            try {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
                records.forEach(record -> {
                    Record record1 = converter.convert(record);
                    processorList.forEach(process -> process.process(record1));
                });
                commit();
            } catch (Exception ex) {
                logger.error("handle msg exception ", ex);
            }
        }
    }
	// 手动commit offset
	private void commit() {
        kafkaConsumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
                logger.warn("offset info:={} exception: ", offsets, exception);
            }
        });
    }

    @Override
    public void close() {
        commit();
        kafkaConsumer.close();
    }
}

标签:ConsumerConfig,技巧,void,mq,props,put,CONFIG,public,消费者
From: https://www.cnblogs.com/PythonOrg/p/16881080.html

相关文章

  • Facebook广告投放技巧
    随着脸书广告的竞争越来越激烈,脸书广告越来越难获得高投资回报。这是一个潜在的问题,因为CPM每增加1%,ROI就会减少1%。在这种情况下,广告商可能需要修改他们的内容或整体营销策......
  • RocketMQ 在物流行业的应用与运维
    本文作者:丁威-中通快递资深架构师,《RocketMQ技术内幕》作者,ApacheRocketMQ社区首席布道师,公众号「中间件兴趣圈」维护者。01物流行业的业务特点物流行业有三大业务......
  • Android开发技巧四--圆角化控件,让它看起来更美
    当需要为应用程序UI控件选择背景的时候,开发者会添加自定义的颜色和形状来代替系统的默认样式,圆角边框看起来是很不错的效果,开发者只需要添加几行代码,就可以在应用程序中使用......
  • 线程通信(生产者消费者问题)
    生产者消费者问题在生产者消费者问题中,仅有synchronized是不够的:synchronized可阻止并发更新同一个共享资源,实现了同步synchronized不能实现不同线程之间的信息传递......
  • rabbitMQ安装(Linux)
    1.查看服务器对应版本 下载的rabbit跟Erlang安装包其中的el7表示RedHat7.x,即CentOS7.x 2.下载rabbitMQ注意:下载的安装包对应CentOS的版本号rabbitMQ官网:https:/......
  • ActiveMQ消息中间件的使用
    一、ActiveMQ的介绍。ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现。1、主要特点:......
  • Android--全局获取Context的技巧
    Android中很多地方都会用到Context,弹出Toast的时候需要、启动活动时需要、发送广播的时候也需要、操作数据库的时候需要、使用通知的时候也需要等等等。或许你现在还没有为......
  • 一些快速提高Android开发的脚本与技巧(终端篇)
    正所谓“工欲善其事必先利其器”,一个好的工具或者技巧能让提升工作效率,起到事半功倍的效果。在这里斗胆列出一些窃以为一些可能快速提高Android日常开发的脚本,希望可以为大......
  • 一个简单实用的Android调试应用技巧
    在应用开发中,我们常常会进行日志打印或者debug调试,以此来分析运行时的一些信息,便于发现bug和问题。AndroidStudio的Debug功能很好用,但是有时候有些情况下,就显得不是那么快......
  • 技巧篇 之 这可能是解决 Could not resolve X 最有效的办法之一咯。
    夜深人静,远离了城市的喧嚣,开启了个人的狂欢。前言本文扯皮话较多,主要记录下心路历程,勿喷。事情的开始,源于某天下发了一个维护老项目的任务。当时那个头大,依稀记得刚入职......