首页 > 其他分享 >Kafka生产者

Kafka生产者

时间:2023-05-07 13:55:47浏览次数:25  
标签:String val 生产者 Kafka props put import 序列化

Kafka生产者

下面是生产者使用kafka的API发送消息的过程

image-20230420095601102

可以从上面看出,如果发送成功则会返回元数据,否者抛出异常!

创建Kafka生产者

属性介绍

bootstrap.servers:指定broker的地址清单,没有必要填写所有的地址,生产者会根据已有地址找到所有broker地址。

key.serializer:序列化方式,比如字符串,整数,数组等规定序列化的方式。

  • ByteArraySerializer:如果key为对象,如DateTime等,则序列化对象
  • StringSerializer:如果key为字符串,则序列化字符串
  • IntegerSerializer:如果key为整数,则序列化整数

value.serializer:与key.serializer相同,都是规定序列化方式

消息发送有三种方式

  • 发送并忘记:发送消息给服务器,并不关系是否正常到达

    from kafka import KafkaProducer
    import json
    
    # 创建生产者
    producer = KafkaProducer(
        topic="test",
        bootstrap_servers = ["localhost:9092"]
    )
    # 发送信息
    try:
        producer.send(key="key", value="{'name':'bob', 'age':12}")
    except:
        print("Error: Kakfa Producer")
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import java.util.Properties
    
    
    class Kafka_Producer {
    
      private var kafka_produncer : KafkaProducer[String, String] = null
      //初始化对象
      def init():Unit={
        var prop: Properties = new Properties()
        prop.put("bootstrap.servers","localhost:9092")
        // 指定序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        // 指定topic
        prop.put("topic", "test")
    
        kafka_produncer = new KafkaProducer[String, String](prop)
      }
    
      def send():Unit={
        for (i <- 1 to 100){
          val msg = s"${i} : this is a linys ${i} kafka data"
          println(s"发送数据:${msg}")
          kafka_produncer.send(new ProducerRecord[String, String](s"${i}-key", msg))
    
        }
      }
    }
    
  • 同步发送:使用send()函数发送消息后,返回一个Future对象,调用get()函数进行等待。

      def send():Unit={
        for (i <- 1 to 100){
          val msg = s"${i} : this is a linys ${i} kafka data"
          println(s"发送数据:${msg}")
          try {
            val metadata: RecordMetadata = kafka_produncer.send(new ProducerRecord[String, String](s"${i}-key", msg)).get()
          }catch {
            case ex: Exception =>{
              ex.printStackTrace()
            }
          }
        }
      }
    
  • 异步发送:使用send()函数,并指定一个回调函数,服务器在返回响应时调用该函数。

      def send():Unit={
        for (i <- 1 to 100){
          val msg = s"${i} : this is a linys ${i} kafka data"
          println(s"发送数据:${msg}")
          try {
            val producer_record = new ProducerRecord[String, String](s"${i}-key", msg)
            kafka_produncer.send(producer_record, new Callback {  //回调函数
              override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
                if (metadata != null){
                  println("发送成功!")
                }
                if (exception != null){
                  println("发送失败!")
                }
              }
            })
          }catch {
            case ex: Exception =>{
              ex.printStackTrace()
            }
          }
        }
      }
    

    也可以这样实现:

    class DemoProducerCallback extends Callback{
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        if (metadata != null){
          println("消息发送成功!")
        }
        if (exception != null){
          println("消息发送失败!")
        }
      }
    }
    
      def send():Unit={
        for (i <- 1 to 100){
          val msg = s"${i} : this is a linys ${i} kafka data"
          println(s"发送数据:${msg}")
          try {
            val producer_record = new ProducerRecord[String, String](s"${i}-key", msg)
            kafka_produncer.send(producer_record, new DemoProducerCallback())
          }catch {
            case ex: Exception =>{
              ex.printStackTrace()
            }
          }
        }
      }
    

生产者的配置

参数 含义
acks 1. acks=0时,生产者在成功写入消息之前不会等待任何来自服务器的响应。2. acks=1时, 只要集群的首领节点收到消息,生产者就会收到来自服务器的成功响应,如果首领节点崩溃时,生产者就会收到一个错误响应,为了避免数据丢失,生产者会重新发送消息。3. acks=all时,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
buffer.memory 设置生产者内存缓冲区的大小,如果程序发送消息的速度超过发送到服务器的速度,会导致生产者缓冲区空间不足,调用send函数会触发阻塞或者抛出异常。
compression.type 可以设置为snappy,gzip,lz4,指定了消息被发送给broker之前使用哪种压缩算法进行压缩。
retries 决定了生产者可以重发消息的次数,如果超过则报错
batch.size 指定一个批次使用内存的大小
linger.ms 在发送批次之前等待更多消息加入批次的时间
client.id 服务器用它来识别消息的来源
max.in.flight.requests.per.connection 指定生产者在收到服务器响应之前可以发送多少个消息
timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms 生产者在发送数据时等待服务器返回相应的时间
max.block.ms 在调用send()或partitionsFor()方法获取元数据时生产者的阻塞时间
max.request.size 控制生产者发送的请求大小,可以指能发送的单个消息的最大值
receive.buffer.bytes 和 send.buffer.bytes 分别指TCP socket接收和发送数据包的缓冲区大小

序列化器

自定义序列器

序列化的关键就是将对象变成二进制进行网络传输:

序列化过程:对象转化为二进制

val objectMapper = new ObjectMapper()
objectMapper.writeValueAsBytes(data)

反序列化过程:二进制转化为对象

val mapper = new ObjectMapper()
val user:User = mapper.readValue[User](data, classOf[User])

下面是对User类进行序列化和反序列化的案例:

// 序列化对象
class User {
  private var name:String = ""
  private var age:Int = 0

  def this(name:String, age:Int) {
    this()
    this.name=name
    this.age
  }
  def getName:String = this.name
  def getAge:Int = this.age

  override def toString: String = s"User(${name}, ${age})"
}

// User的序列化工具
class UserSerializer extends Serializer[User] {
  override def serialize(topic: String, data: User): Array[Byte] = {
    if(data==null)return null
    else{
      val objectMapper = new ObjectMapper()
      objectMapper.writeValueAsBytes(data)
    }
  }
}
// User的反序列化工具
class UserDeserializer extends Deserializer[User]{
  override def deserialize(topic: String, data: Array[Byte]): User = {
    val mapper = new ObjectMapper()
    val user:User = mapper.readValue[User](data, classOf[User])
    return user
  }
}

下面则是对该序列化的使用(生产者+消费者)

package com.kone.test

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

import java.util.Properties
import java.util.concurrent.Future

object KafkaProducerWithUserObject {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("key.serializer", "com.kone.test.User.UserSerializer")    //这是序列化器的路径
    props.put("acks", "all")

    val producer: KafkaProducer[String, User] = new KafkaProducer[String, User](props)
    try{
      for(i <- 0 to 100){
        val user = new User(s"name:${i}", i)
        val record = new ProducerRecord[String, User]("topic", i.toString, user)
        val metadata: Future[RecordMetadata] = producer.send(record)
      }
    }catch {
      case e:Exception => e.printStackTrace()
    }finally {
      producer.close()
    }
  }
}
package com.kone.test

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

import java.util.Properties
import scala.collection.JavaConverters._

object KafkaConsumerWithUserObject {
  def main(args:Array[String]):Unit={
    val props: Properties = new Properties()
    props.put("group.id","test")
    props.put("bootstrap.servers","localhost:9092")
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer","om.kone.test.User.UserDeserializer")
    props.put("enable.auto.commit","true")
    props.put("auto.commit.interval.ms","1000")

    val consumer: KafkaConsumer[String, User] = new KafkaConsumer[String, User](props)

    try{
      consumer.subscribe(List("topic").asJava)
      while(true){
        val records: ConsumerRecords[String, User] = consumer.poll(10)
        for(record <- records.asScala){
          println(s"topic: ${record.topic()}  key:${record.key()}  value:${record.value()}")
        }
      }
    }catch {
      case e:Exception => e.printStackTrace()
    }finally {
      consumer.close()
    }
  }
}

使用Avro序列化

kakfa使用Avro序列化才是常用且高效的序列化方案;在使用avro序列化之前需要定义schema信息。负责读取数据的应用程序使用标识符从注册表里拉取schema来反序列记录。序列化器和反序列化器分别负责处理schema的注册和拉取。

image-20230502104056630

下面是一个案例,所涉及的文件分别为:Customer.avsc, KafkaConsumerAvroSerializer.scala, KafkaProducerAvroSerializer.scala 和 UserBehavior.scala

  • Customer.avsc:schema信息
  • UserBehavior.scala:schema信息所对应的user类
  • KafkaConsumerAvroSerializer.scala:kafka消费者
  • KafkaProducerAvroSerializer.scala:kafka生产者
{
    "namespace":"com.kone.test.UserBehavior",
    "type":"record",
    "name": "Stock",
    "fields": [
        {"name": "userid","type": "long"},
        {"name": "item","type": "long"},
        {"name": "categoryid","type": "long"},
        {"name": "behavior","type": "string"},
        {"name": "timestamp","type": "long"}
    ]
}
package com.kone.test

case class UserBehavior(
                       userid:Long, item:Long, categoryid:Long, behavior:String, timestamp:Long
                       ) extends Serializable{

}
object UserBehavior{
  def apply(userArray:Array[String]):UserBehavior ={
    return new UserBehavior(userArray(0).toLong,userArray(1).toLong,userArray(2).toLong,userArray(3),userArray(4).toLong)
  }
}
package com.kone.test

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

import java.io.File
import java.util.Properties
import scala.util.Try

object KafkaConsumerAvroSerializer {
  def main(args: Array[String]): Unit = {
    // avro schema 解析
    val schema: Schema = new Schema.Parser().parse(new File("Customer.avsc"))
    val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary[GenericRecord](schema)

    val props: Properties = new Properties()
    props.put("bootstrap.servers","localhost:9092")
    props.put("group.id","test")
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer")

    //创建kakfa消费者
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](props)

    try{
      while(true){
        val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(10)
        for(record <- records){
          val genericRecord: Try[GenericRecord] = recordInjection.invert(record)
          println(genericRecord.toString)
        }
      }
    }catch {
      case e:Exception => e.printStackTrace()
    }finally {
      consumer.close()
    }
  }
}
package com.kone.test

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

import java.io.File
import java.util.Properties
import scala.io.BufferedSource

object KafkaProducerAvroSerializer {
  //数据解析 将csv中的数据 -> list数据
  def csv2list():List[UserBehavior]={
    //用户数据
    val source: BufferedSource = scala.io.Source.fromURI(this.getClass.getResource("/user.csv").toURI)
    //将数据解析为User对象
    val data: List[UserBehavior] = source.getLines().toList.map(_.split(",")).map(arr => UserBehavior(arr))
    return data
  }
  def main(args:Array[String]):Unit={
    // avro schema 解析
    val schema: Schema = new Schema.Parser().parse(new File("Customer.avsc"))
    val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary[GenericRecord](schema)

    //获取数据
    val data: List[UserBehavior] = csv2list()

    //kafka生产者配置
    val props: Properties = new Properties()
    props.put("bootstrap.servers","localhost:9092")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer")

    val producer: KafkaProducer[String, Array[Byte]] = new KafkaProducer[String, Array[Byte]](props)

    //发送数据
    data.foreach(user=>{
      val avroRecord: GenericData.Record = new GenericData.Record(schema)
      avroRecord.put("userid",user.userid)
      avroRecord.put("item",user.item)
      avroRecord.put("categoryid",user.categoryid)
      avroRecord.put("behavior",user.behavior)
      avroRecord.put("timestamp",user.timestamp)

      val bytes: Array[Byte] = recordInjection.apply(avroRecord)

      try{
        val record: ProducerRecord[String, Array[Byte]] = new ProducerRecord[String, Array[Byte]]("test", bytes)
        val metadata: RecordMetadata = producer.send(record).get()
      }catch {
        case e:Exception => e.printStackTrace()
      }finally {
        producer.close()
      }
    })

  }
}

分区

kafka发送的是一对键值对,其中相同key会被写到相同分区,也就是说,如果一个进程只从一个主题的分区读取数据,那么相同键的所有记录都可以被该进程读取。如果,键key为null,则会被随机的分配到各个分区。分区器会使用轮询算法将消息均匀分布到各个分区上。

自定义分区策略

下面这个分区策略如下:

  • 如果key没有被指定,则分为2种情况:1.topic有可用的分区,2.topic没有可用分区
    • 情况1:获取可用分区的数量num,使用map存储<topic, counter>,其中counter初始化为随机值,后续调用自增即可,分区为counter%num。
    • 情况2:给一个不可分配分区即可。
  • key有被指定,直接使用key的hash值
package com.kone.test

import io.netty.util.internal.ThreadLocalRandom
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{Cluster, PartitionInfo}

import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

class KafkaPartitioners extends Partitioner {
  // 用于存储topic对应的计数器, key:topic, value:计数器
  private val topicCounterMap: ConcurrentHashMap[String, AtomicInteger] = new ConcurrentHashMap[String, AtomicInteger]()

  override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
      //通过cluster从元数据中获取topic的所有分区数据
    val partitionInfos: util.List[PartitionInfo] = cluster.partitionsForTopic(topic)

    // 如果keyBytes中没有指定key
    if (keyBytes == null){
      //获得一个自增的值
      val nextvalue: Int = nextValue(topic)
      //通过cluster拿到所有可用的分区
      val availablePartitions: util.List[PartitionInfo] = cluster.availablePartitionsForTopic(topic)
      // 如果topic存在可用分区
      if (availablePartitions.size()>0){
        val part: Int = Utils.toPositive(nextvalue) % availablePartitions.size()
        //然后从可用分区中返回一个分区
        return availablePartitions.get(part).partition()
      }else{
        //如果不存在分区时,那么就从所有不可用分区中通过取余的方式返回一个不可用分区
        return Utils.toPositive(nextvalue)%partitionInfos.size()
      }
    }else{
      // 如果key有被指定
      // 则使用该key进行hash操作,然后对所有分区数据进行取余
      Utils.toPositive(Utils.murmur2(keyBytes))%partitionInfos.size()
    }
  }

  //根据topic获取自增函数
  def nextValue(topic:String):Int = {
    var counter: AtomicInteger = topicCounterMap.get(topic) //
    if (null == counter){
      // 如果是第一次,该topic还没有对应的计数,会生成一个随机数
      counter = new AtomicInteger(ThreadLocalRandom.current().nextInt())
      //然后,将该随机数与topic对应起来存入map中
      val currentCounter: AtomicInteger = topicCounterMap.putIfAbsent(topic, counter)
      if(currentCounter!=null){
        counter=currentCounter
      }
    }
    return counter.getAndIncrement()
  }


  override def close(): Unit = {}

  override def configure(configs: util.Map[String, _]): Unit = {}
}

标签:String,val,生产者,Kafka,props,put,import,序列化
From: https://www.cnblogs.com/ALINGMAOMAO/p/17379231.html

相关文章

  • Kafka消费者
    Kafka消费者消费者和消费者群组Kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收一部分分区的消息。若分区的数量大于等于消费者的数量,则消费者会消费一个或多个分区的数据。若分区的数量小于消费者的数量就会出现闲置消费者。上面为1个组......
  • Rabbitmq介绍,安装,基于queue实现消费者生产者,基本使用,消息安全,持久化,闲置消费,发布订阅,
    内容详细Rabbmit介绍消息队列中间件概念很大,准确一些叫消息队列中间件消息队列中间件使用redis当作消息队列来用,blpop阻塞式弹出,实现队列,先进先出MQ,消息队列,MessageQueue是什么?消息队列就是基础数据结构中先进先出(队列)的一种数据机制,类比于生活中,买东西,需要排队,先排队的人......
  • 数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)
    数据湖Iceberg-简介(1)数据湖Iceberg-存储结构(2)数据湖Iceberg-Hive集成Iceberg(3)数据湖Iceberg-SparkSQL集成(4)数据湖Iceberg-FlinkSQL集成(5)数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)数据湖Iceberg-FlinkDataFrame集成(7)数据湖Iceberg-FlinkSQL-kafka类型表......
  • 消息队列Rabbitmq介绍、rabbitmq安装、基于queue实现生产者消费者、基本使用、消息安
    目录1消息队列Rabbitmq介绍2rabbitmq安装3基于queue实现生产者消费者4基本使用4.1发送者4.2消费者5消息安全(详见笔记)6持久化(详见笔记)7闲置消费(详见笔记)8发布订阅(详见笔记)9发布订阅高级之Routing(按关键字匹配)(详见笔记)1消息队列Rabbitmq介绍#消息队列 -......
  • 互斥锁 读写锁 条件变量 生产者消费者问题
    #互斥锁/*#include<pthread.h>intpthread_mutex_init(pthread_mutex_t*restrictmutex,constpthread_mutexattr_t*restrictattr);功能:初始化一个互斥变量mutex参数:mutex:需要初始化的互斥变量......
  • filebeat+kafka_logstash+es进行日志分析
    filebeat+kafka_logstash+es进行日志分析目录一.将安装包上传至目标服务器(即日志所在的服务器)二.解压安装三.配置filebeat1.配置采集日志到logstash,这种配置适用于日志量较小的场景,Filebeat--->logstash,logstash直接解析filebeat2.配置采集日志至kafka,file......
  • Rabbitmq 介绍 、安装、基于Queue实现生产者消费者模型、基本使用、消息安全之ack、du
    师承老刘llnb一、消息队列介绍1.1介绍消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”1.2MQ解决什么问题MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。应用解耦......
  • Kafka基础阶段与集群搭建详细教程
    Kafka第一天课堂笔记一.Kafka简介1.1消息队列消息队列——用于存放消息的组件程序员可以将消息放入到队列中,也可以从消息队列中获取消息很多时候消息队列不是一个永久性的存储,是作为临时存储存在的(设定一个期限:设置消息在MQ中保存10天)消息队列中间件:消息队列的组件,例如:Kafk......
  • C#操作kafka
    1.手动设置TopicPartition=>offset//手动设置TopicPartition=>offsetforeach(TopicPartitionpartitioninconsumer.Assignment){if(partition.Partition.Value==13){TopicPartitionOffsetoffset=newTopicPartitionOffset(partition,27060);......
  • Kafka2.4安装与配置
    一、安装zookeeper集群1、安装jdk1.82、下载apache-zookeeper-3.5.7-bin.tar.gz并解压第1台机器:mkdir/usr/local/zookeeper/datamv/usr/local/zookeeper/conf/zoo_sample.cfg/usr/local/zookeeper/conf/zoo.cfgvim/usr/local/zookeeper/conf/zoo.cfgdataDir=/usr/l......