首页 > 其他分享 >Using Spring for Apache Kafka

Using Spring for Apache Kafka

时间:2023-06-02 21:38:22浏览次数:51  
标签:... container void Kafka listener using Apache Using public


Using Spring for Apache Kafka



Sending Messages



KafkaTemplate



The KafkaTemplate wraps a producer and provides convenience methods to send data to kafka topics. Both asynchronous and synchronous methods are provided, with the async methods returning a Future.



ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);

ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}



The first 3 methods require that a default topic has been provided to the template.



The metrics and partitionsFor methods simply delegate to the same methods on the underlying Producer. The executemethod provides direct access to the underlying Producer.



To use the template, configure a producer factory and provide it in the template’s constructor:



@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    ...
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}



The template can also be configured using standard <bean/> definitions.



Then, to use the template, simply invoke one of its methods.



When using the methods with a Message<?> parameter, topic, partition and key information is provided in a message header:



  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION_ID
  • KafkaHeaders.MESSAGE_KEY



with the message payload being the data.



Optionally, you can configure the KafkaTemplate with a ProducerListener to get an async callback with the results of the send (success or failure) instead of waiting for the Future to complete.



public interface ProducerListener<K, V> {

    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);

    void one rror(String topic, Integer partition, K key, V value, Exception exception);

    boolean isInterestedInSuccess();

}



By default, the template is configured with a LoggingProducerListener which logs errors and does nothing when the send is successful.



onSuccess is only called if isInterestedInSuccess returns true.



For convenience, the abstract ProducerListenerAdapter is provided in case you only want to implement one of the methods. It returns false for isInterestedInSuccess.



Notice that the send methods return a ListenableFuture<SendResult>. You can register a callback with the listener to receive the result of the send asynchronously.



ListenableFuture<SendResult<Integer, String>> future = template.send("foo");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

    @Override
    public void onSuccess(SendResult<Integer, String> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }

});



The SendResult has two properties, a ProducerRecord and RecordMetadata; refer to the Kafka API documentation for information about those objects.



If you wish to block the sending thread, to await the result, you can invoke the future’s get() method. You may wish to invokeflush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter which will cause the template to flush() on each send. Note, however that flushing will likely significantly reduce performance.



Receiving Messages



Messages can be received by configuring a MessageListenerContainer and providing a Message Listener, or by using the@KafkaListener annotation.



Message Listeners



When using a Message Listener Container you must provide a listener to receive data. There are currently four supported interfaces for message listeners:



public interface MessageListener<K, V> {} (1)

    void onMessage(ConsumerRecord<K, V> data);

}

public interface AcknowledgingMessageListener<K, V> {} (2)

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

public interface BatchMessageListener<K, V> {} (3)

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> {} (4)

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}



  1. Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods.
  2. Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
  3. Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methodsAckMode.RECORD is not supported when using this interface since the listener is given the complete batch.
  4. Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.



Message Listener Containers



Two MessageListenerContainer implementations are provided:



  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer



The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. TheConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.



KafkaMessageListenerContainer



The following constructors are available.



public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionInitialOffset... topicPartitions)



Each takes a ConsumerFactory and information about topics and partitions, as well as other configuration in aContainerProperties object. The second constructor is used by the ConcurrentMessageListenerContainer (see below) to distribute TopicPartitionInitialOffset across the consumer instances. ContainerProperties has the following constructors:



public ContainerProperties(TopicPartitionInitialOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)



The first takes an array of TopicPartitionInitialOffset arguments to explicitly instruct the container which partitions to use (using the consumer assign() method), and with an optional initial offset: a positive value is an absolute offset by default; a negative value is relative to the current last offset within a partition by default. A constructor forTopicPartitionInitialOffset is provided that takes an additional boolean argument. If this is true, the initial offsets (positive or negative) are relative to the current position for this consumer. The offsets are applied when the container is started. The second takes an array of topics and Kafka allocates the partitions based on the group.id property - distributing partitions across the group. The third uses a regex Pattern to select the topics.



Refer to the JavaDocs for ContainerProperties for more information about the various properties that can be set.



ConcurrentMessageListenerContainer



The single constructor is similar to the first KafkaListenerContainer constructor:



public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)



It also has a property concurrency, e.g. container.setConcurrency(3) will create 3 KafkaMessageListenerContainer s.



For the first constructor, kafka will distribute the partitions across the consumers. For the second constructor, theConcurrentMessageListenerContainer distributes the TopicPartition s across the delegateKafkaMessageListenerContainer s.



If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartitions, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.



Committing Offsets



Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, kafka will auto-commit the offsets according to its configuration. If it is false, the containers support the following AckMode s.



The consumer poll() method will return one or more ConsumerRecords; the MessageListener is called for each record; the following describes the action taken by the container for each AckMode :



  • RECORD - commit the offset when the listener returns after processing the record.
  • BATCH - commit the offset when all the records returned by the poll() have been processed.
  • TIME - commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
  • COUNT - commit the offset when all the records returned by the poll() have been processed as long as ackCountrecords have been received since the last commit.
  • COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
  • MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics asBATCH are applied.
  • MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.



Note

MANUAL, and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or aBatchAcknowledgingMessageListener; see Message Listeners.



The commitSync() or commitAsync() method on the consumer is used, depending on the syncCommits container property.



The Acknowledgment has this method:



public interface Acknowledgment {

    void acknowledge();

}



This gives the listener control over when offsets are committed.



@KafkaListener Annotation



The @KafkaListener annotation provides a mechanism for simple POJO listeners:



public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic")
    public void listen(String data) {
        ...
    }

}



This mechanism requires an @EnableKafka annotation on one of your @Configuration classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer: by default, a bean with namekafkaListenerContainerFactory is expected.



@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}



Notice that to set container properties, you must use the getContainerProperties() method on the factory. It is used as a template for the actual properties injected into the container.



You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets):



@KafkaListener(id = "bar", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}



Each partition can be specified in the partitions or partitionOffsets attribute, but not both.



When using manual AckMode, the listener can also be provided with the Acknowledgment; this example also shows how to use a different container factory.



@KafkaListener(id = "baz", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}



Finally, metadata about the message is available from message headers:



@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}



Starting with version 1.1@KafkaListener methods can be configured to receive the entire batch of consumer records received from the consumer poll. To configure the listener container factory to create batch listeners, set the batchListener property:



@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}



To receive a simple list of payloads:



@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}



The topic, partition, offset etc are available in headers which parallel the payloads:



@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}



Alternatively you can receive a List of Message<?> objects with each offset, etc in each message, but it must be the only parameter (aside from an optional Acknowledgment when using manual commits) defined on the method:



@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}



You can also receive a list of ConsumerRecord<?, ?> objects but it must be the only parameter (aside from an optionalAcknowledgment when using manual commits) defined on the method:



@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}



Filtering Messages



In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed. The framework cannot know whether such a message has been processed or not, that is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation thereof.



The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter class, which can wrap your MessageListener. This class takes an implementation of RecordFilterStrategy where you implement the filter method to signal that a message is a duplicate and should be discarded.



FilteringAcknowledgingMessageListenerAdapter is also provided for wrapping an AcknowledgingMessageListener. This has an additional property ackDiscarded which indicates whether the adapter should acknowledge the discarded record; it istrue by default.



When using @KafkaListener, set the RecordFilterStrategy (and optionally ackDiscarded) on the container factory and the listener will be wrapped in the appropriate filtering adapter.



Finally, FilteringBatchMessageListenerAdapter and FilteringBatchAcknowledgingMessageListenerAdapter are provided, for when using a batch message listener.



Retrying Deliveries



If your listener throws an exception, the default behavior is to invoke the ErrorHandler, if configured, or logged otherwise.



Note

Two error handler interfaces are provided ErrorHandler and BatchErrorHandler; the appropriate type must be configured to match the Message Listener.



To retry deliveries, convenient listener adapters - RetryingMessageListenerAdapter andRetryingAcknowledgingMessageListenerAdapter are provided, depending on whether you are using a MessageListener or an AcknowledgingMessageListener.



These can be configured with a RetryTemplate and RecoveryCallback<Void> - see the spring-retry project for information about these components. If a recovery callback is not provided, the exception is thrown to the container after retries are exhausted. In that case, the ErrorHandler will be invoked, if configured, or logged otherwise.



When using @KafkaListener, set the RetryTemplate (and optionally recoveryCallback) on the container factory and the listener will be wrapped in the appropriate retrying adapter.



A retry adapter is not provided for any of the batch message listeners.



Detecting Idle Asynchronous Consumers



While efficient, one problem with asynchronous consumers is detecting when they are idle - users might want to take some action if no messages arrive for some period of time.



You can configure the listener container to publish a ListenerContainerIdleEvent when some time passes with no message delivery. While the container is idle, an event will be published every idleEventInterval milliseconds.



To configure this feature, set the idleEventInterval on the container:



@Bean
public KafKaMessageListenerContainer(ConnectionFactory connectionFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafKaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
    return container;
}



Or, for a @KafkaListener



@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}



In each of these cases, an event will be published once per minute while the container is idle.



Event Consumption



You can capture these events by implementing ApplicationListener - either a general listener, or one narrowed to only receive this specific event. You can also use @EventListener, introduced in Spring Framework 4.2.



The following example combines the @KafkaListener and @EventListener into a single class. It’s important to understand that the application listener will get events for all containers so you may need to check the listener id if you want to take specific action based on which container is idle. You can also use the @EventListener condition for this purpose.



The events have 4 properties:



  • source - the listener container instance
  • id - the listener id (or container bean name)
  • idleTime - the time the container had been idle when the event was published
  • topicPartitions - the topics/partitions that the container was assigned at the time the event was generated



public class Listener { @KafkaListener(id = "qux", topics = "annotated") public void listen4(@Payload String foo, Acknowledgment ack) { ... } @EventListener(condition = "event.listenerId.startsWith('qux-')") public void eventHandler(ListenerContainerIdleEvent event) { this.event = event; eventLatch.countDown(); } }



Important

Event listeners will see events for all containers; so, in the example above, we narrow the events received based on the listener ID. Since containers created for the @KafkaListener support concurrency, the actual containers are named id-n where the n is a unique value for each instance to support the concurrency. Hence we use startsWith in the condition.



Caution

If you wish to use the idle event to stop the lister container, you should not call container.stop() on the thread that calls the listener - it will cause delays and unnecessary log messages. Instead, you should hand off the event to a different thread that can then stop the container. Also, you should not stop() the container instance in the event if it is a child container, you should stop the concurrent container instead.



Current Positions when Idle



Note that you can obtain the current positions when idle is detected by implementing ConsumerSeekAware in your listener; seeonIdleContainer() in `Seeking to a Specific Offset.



Topic/Partition Initial Offset



There are several ways to set the initial offset for a partition.



When manually assigning partitions, simply set the initial offset (if desired) in the configured TopicPartitionInitialOffsetarguments (see Message Listener Containers). You can also seek to a specific offset at any time.



When using group management where the broker assigns partitions:



  • For a new group.id, the initial offset is determined by the auto.offset.reset consumer property (earliest orlatest).
  • For an existing group id, the initial offset is the current offset for that group id. You can, however, seek to a specific offset during initialization (or at any time thereafter).



Seeking to a Specific Offset



In order to seek, your listener must implement ConsumerSeekAware which has the following methods:



void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);



The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in aConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.



When using group management, the second method is called when assignments change. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback; you must use the callback argument, not the one passed intoregisterSeekCallback. This method will never be called if you explicitly assign partitions yourself; use theTopicPartitionInitialOffset in that case.



The callback has one method:



void seek(String topic, int partition, long offset);



You can also perform seek operations from onIdleContainer() when an idle container is detected; see Detecting Idle Asynchronous Consumers for how to enable idle container detection.



To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback for the appropriate thread.



Serialization/Deserialization and Message Conversion



Apache Kafka provides a high-level API for serializing/deserializing record values as well as their keys. It is present with theorg.apache.kafka.common.serialization.Serializer<T> and org.apache.kafka.common.serialization.Deserializer<T>abstractions with some built-in implementations. Meanwhile we can specify simple (de)serializer classes using Producer and/or Consumer configuration properties, e.g.:



props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);



for more complex or particular cases, the KafkaConsumer, and therefore KafkaProducer, provides overloaded constructors to accept (De)Serializer instances for keys and/or values, respectively.



To meet this API, the DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory also provide properties to allow to inject a custom (De)Serializer to target Producer/Consumer.



For this purpose, Spring for Apache Kafka also provides JsonSerializer/JsonDeserializer implementations based on the Jackson JSON object mapper. The JsonSerializer is quite simple and just allows writing any Java object as a JSON byte[], the JsonDeserializer requires an additional Class<?> targetType argument to allow the deserialization of a consumedbyte[] to the proper target object.



JsonDeserializer<Bar> barDeserializer = new JsonDeserializer<>(Bar.class);



Both JsonSerializer and JsonDeserializer can be customized with an ObjectMapper. You can also extend them to implement some particular configuration logic in the configure(Map<String, ?> configs, boolean isKey) method.



Although the Serializer/Deserializer API is quite simple and flexible from the low-level Kafka Consumer and Producerperspective, you might need more flexibility at the Spring Messaging level, either when using @KafkaListener or Spring Integration. To easily convert to/from org.springframework.messaging.Message, Spring for Apache Kafka provides aMessageConverter abstraction with the MessagingMessageConverter implementation and its StringJsonMessageConvertercustomization. The MessageConverter can be injected into KafkaTemplate instance directly and viaAbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property:



@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Foo foo) {
...
}



When using a @KafkaListener, the parameter type is provided to the message converter to assist with the conversion.



Note

When using the StringJsonMessageConverter, you should use a StringDeserializer in the kafka consumer configuration and StringSerializer in the kafka producer configuration, when using Spring Integration or theKafkaTemplate.send(Message<?> message) method.



Log Compaction



When using Log Compaction, it is possible to send and receive messages with null payloads which identifies the deletion of a key.



Starting with version 1.0.3, this is now fully supported.



To send a null payload using the KafkaTemplate simply pass null into the value argument of the send() methods. One exception to this is the send(Message<?> message) variant. Since spring-messaging Message<?> cannot have a nullpayload, a special payload type KafkaNull is used and the framework will send null. For convenience, the staticKafkaNull.INSTANCE is provided.



When using a message listener container, the received ConsumerRecord will have a null value().



To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false; you will usually also need the key so your application knows which key was "deleted":



@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
    // value == null represents key deletion
}



When using a class-level @KafkaListener, some additional configuration is needed - a @KafkaHandler method with aKafkaNull payload:



@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    private final CountDownLatch latch1 = new CountDownLatch(2);

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

标签:...,container,void,Kafka,listener,using,Apache,Using,public
From: https://blog.51cto.com/u_16131764/6405081

相关文章

  • 【kafka】浅谈kafka常考特性
    Kafka前几天聊完绩效的时候问了下今年还有没有涨薪,组长的原话是"很难。。。我尽量帮大家争取。。。",我刚听完脑海的第一念头:"此处涨薪难,自有不难处!"。冷静分析一波,今年整体大环境不行,还是苟着拿波年终吧,先不准备跳了,跟大家浅浅分享一下之前准备的kafka相关知识点,等看机会的时候可......
  • CentOS 7.x安装微服务网关Apache APISIX
    阅读文本大概需要3分钟。    APISIX是一个云原生、高性能、可扩展的微服务API网关。它是基于OpenResty和etcd来实现,和传统API网关相比,APISIX具备动态路由和插件热加载,特别适合微服务体系下的API管理。APISIX通过插件机制,提供动态负载平衡、身份验证、限流限速等功能,并且......
  • 当Elasticsearch遇见Kafka
    Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种数据源的兼容能力也是其成功的秘诀之一。而Elasticsearch强大的数据源兼容能力,主要来源于其核心组件之一的Logstash,Logstash通过插件的形式实现了对多种数据源的输入和输出。Kafka是一种高吞......
  • kafka动态生产者
    packagecom.sunclouder.das.data.kafka.forward;importcn.hutool.core.util.StrUtil;importcn.hutool.json.JSONObject;importcn.hutool.json.JSONUtil;importcom.sunclouder.das.data.kafka.entity.ConfigEntity;importcom.sunclouder.das.data.kafka.entity.DasConfig......
  • 数据治理核心保障数据质量监控开源项目Apache Griffin分享
    @目录概述定义为何要做数据质量监控基本概念特性架构安装Docker部署Docker镜像批处理使用Docker镜像流处理使用UI界面操作概述定义ApacheGriffin官网地址https://griffin.apache.org/源码release最新版本0.6.0ApacheGriffin官网文档地址https://griffin.apache.org......
  • Kafka环境的配置
    大家对于消息队列,想必不会很陌生,特别是ActiveMQ和RabbitMQ,今天我将会为大家介绍下Kafka在centOs系统中的安装。第一步:准备好包。对于kafka,你需要zookeeper,所以你需要下载zookeeper。点击zookeeper下载下载zookeeper后放入到centos中.放入文件夹software中。接着准备kafka.点击下......
  • 英特尔深度学习框架BigDL——a distributed deep learning library for Apache Spark
    BigDL:DistributedDeepLearningonApacheSparkWhatisBigDL?BigDLisadistributeddeeplearninglibraryforApacheSpark;withBigDL,userscanwritetheirdeeplearningapplicationsasstandardSparkprograms,whichcandirectlyrunontopofexisting......
  • kafka消费者那些事儿
    前言消息的消费一般有两种模式,推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。kakfa采用的是拉模式,这样可以很好的控制消费速率。那么kafka消费的具体工作流程是什么样的呢?kafka的位移管理又是怎么样的呢?消费者消费规则......
  • kafka数据检索2
    假设有一个名为test的主题,它有3个分区,每个分区的日志文件分别为test-0.log、test-1.log和test-2.log。现在想要通过offset100来查找test主题的消息。首先,需要确定offset100位于哪个分区。可以使用Kafka提供的命令行工具kafka-consumer-groups来查询消费者组的offset信息:bin/......
  • kafka数据检索
    Kafka可以通过消费者组来查找数据。消费者组是一组消费者的集合,它们共同读取一个或多个主题。消费者组可以使用Kafka提供的命令行工具或KafkaAPI来实现。使用命令行工具kafka-console-consumer可以查找数据。例如,以下命令可以从名为test的主题中读取消息:kafka-console-consum......