首页 > 其他分享 >Kafka的使用

Kafka的使用

时间:2022-11-18 18:34:41浏览次数:45  
标签:producer self kafka key 使用 test Kafka

一、启动Zookeeper服务

在Windows系统中打开第1个cmd窗口,启动Zookeeper服务:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

  

二、启动Kafka服务

打开第2个cmd窗口,然后输入下面命令启动Kafka服务:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-server-start.bat .\config\server.properties

  

三、创建主题

为了测试Kafka,这里创建一个主题(Topic),名称为“topic_test”,包含一个分区,只有一个副本,在第3个cmd窗口中执行如下命令:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_test

  

可以继续执行如下命令,查看topic_test是否创建成功:

> .\bin\windows\kafka-topics.bat  --list  --zookeeper  localhost:2181

  

如果创建成功,就可以在执行结果中看到topic_test。

继续在第3个cmd窗口中执行如下命令创建一个生产者来产生消息:

> .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic_test

  

该命令执行以后,屏幕上的光标会一直在闪烁,这时,就可以用键盘输入一些内容,比如输入:

I love Kafka
Kafka is good

新建第4个cmd窗口,执行如下命令来消费消息:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic_test --from-beginning

该命令执行以后,就会在屏幕上看到刚才输入的语句“I love Kafka”和“Kafka is good”。

 

使用Python操作Kafka

使用Python操作Kafka之前,需要安装第三方模块python-kafka,命令如下:

> pip install kafka-python

安装结束以后,可以使用如下命令查看已经安装的kafka-python的版本信息:

> pip list

这个命令会显示已经安装的Pyhon第三方模块,并且给出每个模块的版本信息。

编写一个生产者程序producer_test.py用来生成消息:

from kafka import KafkaProducer
 
producer = KafkaProducer(bootstrap_servers='localhost:9092')  # 连接Kafka
 
msg = "Hello World".encode('utf-8')  # 发送内容,必须是bytes类型
producer.send('test', msg)  # 发送的topic为test
producer.close()

  

编写一个消费者程序consumer_test.py用来消费消息:

from kafka import KafkaConsumer
 
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='smallest')
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)

  

启动Zookeeper服务和Kafka服务,然后,先执行producer_test.py,再执行consumer_test.py,就可以看到屏幕上打印出“Hello World”。

有一个文件score.csv,其内容如下:

"Name","Score"
"Zhang San",99.0
"Li Si",45.5
"Wang Hong",82.5
"Liu Qian",76.0
"Ma Li",62.5
"Shen Teng",78.0
"Pu Wen",86.5

要求完成的任务是,Kafka生产者读取文件中的所有内容,然后,以JSON字符串的形式发送给Kafka消费者,消费者获得消息以后转换成表格形式打印到屏幕上,如下所示:

       Name  Score

0  Zhang San   99.0

1      Li Si   45.5

2  Wang Hong   82.5

3   Liu Qian   76.0

4      Ma Li   62.5

5  Shen Teng   78.0

6     Pu Wen   86.5

为了完成上述任务,可以编写代码文件kafka_demo.py(要求和文件score.csv在同一个目录下),其内容如下:

import sys
import json
import pandas as pd
import os
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
 
KAFKA_HOST = "localhost" #服务器地址
KAFKA_PORT = 9092    #端口号
KAFKA_TOPIC = "topic0"  #topic
 
data=pd.read_csv(os.getcwd()+'\\score.csv')
key_value=data.to_json()

class Kafka_producer():
    def __init__(self, kafkahost, kafkaport, kafkatopic, key):
      self.kafkaHost = kafkahost
      self.kafkaPort = kafkaport
      self.kafkatopic = kafkatopic
      self.key = key
      self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(
          kafka_host=self.kafkaHost,
          kafka_port=self.kafkaPort)
       )
    def sendjsondata(self, params):
         try:
            parmas_message = params
            producer = self.producer
            producer.send(self.kafkatopic, key=self.key, value=parmas_message.encode('utf-8'))
            producer.flush()
         except KafkaError as e:
             print(e)
 
 
class Kafka_consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid,key):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.key = key
        self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
           bootstrap_servers='{kafka_host}:{kafka_port}'.format(
           kafka_host=self.kafkaHost,
           kafka_port=self.kafkaPort)
         )
    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt as e:
                print(e)
 
def sortedDictValues(adict):
    items = adict.items()
    items=sorted(items,reverse=False)
    return [value for key, value in items]
 
def main(xtype, group, key):
    if xtype == "p":
        # 生产模块
        producer = Kafka_producer(KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC, key)
        print("===========> producer:", producer)
        params =key_value        
        producer.sendjsondata(params)
    if xtype == 'c':
        # 消费模块
        consumer = Kafka_consumer(KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC, group,key)
        print("===========> consumer:", consumer)
        message = consumer.consume_data()
        for msg in message:
            msg=msg.value.decode('utf-8')
            python_data=json.loads(msg) ##字符串转换成字典
            key_list=list(python_data)
            test_data=pd.DataFrame()
            for index in key_list:                
                if index=='Name':
                    a1=python_data[index]                    
                    data1 = sortedDictValues(a1)                    
                    test_data[index]=data1
                else:
                    a2 = python_data[index]
                    data2 = sortedDictValues(a2)
                    test_data[index] = data2
            print(test_data)
 
if __name__ == '__main__':
    main(xtype='p',group='py_test',key=None)
    main(xtype='c',group='py_test',key=None)

  

 

Kafka与MySQL的组合使用

需要完成的任务是,把JSON格式数据放入Kafka发送出去,然后,再从Kafka中获取到JSON格式数据,对其进行解析并写入到MySQL数据库。

编写一个生产者程序producer_json.py:

# producer_json.py
from kafka import KafkaProducer
import json
 
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))  # 连接kafka
 
data={
  "sno":"95001",
  "name":"John",
  "sex":"M",
  "age":23  
 }
 
producer.send('json_topic', data)  # 发送的topic为json_topic
producer.close()

  

编写一个消费者程序consumer_json.py:

from kafka import KafkaConsumer
import json
import pymysql.cursors

consumer = KafkaConsumer('json', bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:     
    msg1=str(msg.value, encoding = "utf-8")    
    dict = json.loads(msg1)    
    # 连接数据库
    connect = pymysql.Connect(
    host='localhost',
    port=3306,
    user='root',  # 数据库用户名
    passwd='123456',  # 密码
    db='school',
    charset='utf8'
    )

    # 获取游标
    cursor = connect.cursor()

    # 插入数据
    sql = "INSERT INTO student(sno,sname,ssex,sage) VALUES ('%s', '%s', '%s', %d)"
    data = (dict['sno'],dict['name'],dict['sex'],dict['age'])    
    cursor.execute(sql % data)    
    connect.commit()
    print('成功插入数据')

    # 关闭数据库连接
    connect.close()  

  

在Windows系统中启动MySQL服务,然后,打开MySQL数据库的命令行界面,输入如下SQL语句创建数据库school:

mysql> CREATE DATABASE school;

创建好数据库school以后,可以使用如下SQL语句打开数据库:

mysql> USE school;

使用如下SQL语句创建一个表student:

mysql>CREATE TABLE student(
    -> sno char(5),
    -> sname char(10),
    -> ssex char(2),
    -> sage int);

使用如下SQL语句查看已经创建的表:

mysql> SHOW TABLES;

在Windows系统中启动Zookeeper服务和Kafka服务,然后,先执行生产者程序producer_json.py,再执行消费者程序consumer_json.py,执行成功以后,使用如下命令查看MySQL数据库中新插入的记录:

mysql> SELECT * FROM student;

可以看到,一条记录已经被成功地插入到了MySQL数据库。

 

标签:producer,self,kafka,key,使用,test,Kafka
From: https://www.cnblogs.com/qi-6666/p/16904189.html

相关文章