一、启动Zookeeper服务
在Windows系统中打开第1个cmd窗口,启动Zookeeper服务:
> cd c:\kafka_2.12-2.4.0 > .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties
二、启动Kafka服务
打开第2个cmd窗口,然后输入下面命令启动Kafka服务:
> cd c:\kafka_2.12-2.4.0 > .\bin\windows\kafka-server-start.bat .\config\server.properties
三、创建主题
为了测试Kafka,这里创建一个主题(Topic),名称为“topic_test”,包含一个分区,只有一个副本,在第3个cmd窗口中执行如下命令:
> cd c:\kafka_2.12-2.4.0 > .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_test
可以继续执行如下命令,查看topic_test是否创建成功:
> .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
如果创建成功,就可以在执行结果中看到topic_test。
继续在第3个cmd窗口中执行如下命令创建一个生产者来产生消息:
> .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic_test
该命令执行以后,屏幕上的光标会一直在闪烁,这时,就可以用键盘输入一些内容,比如输入:
I love Kafka
Kafka is good
新建第4个cmd窗口,执行如下命令来消费消息:
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic_test --from-beginning
该命令执行以后,就会在屏幕上看到刚才输入的语句“I love Kafka”和“Kafka is good”。
使用Python操作Kafka
使用Python操作Kafka之前,需要安装第三方模块python-kafka,命令如下:
> pip install kafka-python
安装结束以后,可以使用如下命令查看已经安装的kafka-python的版本信息:
> pip list
这个命令会显示已经安装的Pyhon第三方模块,并且给出每个模块的版本信息。
编写一个生产者程序producer_test.py用来生成消息:
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') # 连接Kafka msg = "Hello World".encode('utf-8') # 发送内容,必须是bytes类型 producer.send('test', msg) # 发送的topic为test producer.close()
编写一个消费者程序consumer_test.py用来消费消息:
from kafka import KafkaConsumer consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='smallest') for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(recv)
启动Zookeeper服务和Kafka服务,然后,先执行producer_test.py,再执行consumer_test.py,就可以看到屏幕上打印出“Hello World”。
有一个文件score.csv,其内容如下:
"Name","Score"
"Zhang San",99.0
"Li Si",45.5
"Wang Hong",82.5
"Liu Qian",76.0
"Ma Li",62.5
"Shen Teng",78.0
"Pu Wen",86.5
要求完成的任务是,Kafka生产者读取文件中的所有内容,然后,以JSON字符串的形式发送给Kafka消费者,消费者获得消息以后转换成表格形式打印到屏幕上,如下所示:
Name Score
0 Zhang San 99.0
1 Li Si 45.5
2 Wang Hong 82.5
3 Liu Qian 76.0
4 Ma Li 62.5
5 Shen Teng 78.0
6 Pu Wen 86.5
为了完成上述任务,可以编写代码文件kafka_demo.py(要求和文件score.csv在同一个目录下),其内容如下:
import sys import json import pandas as pd import os from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError KAFKA_HOST = "localhost" #服务器地址 KAFKA_PORT = 9092 #端口号 KAFKA_TOPIC = "topic0" #topic data=pd.read_csv(os.getcwd()+'\\score.csv') key_value=data.to_json() class Kafka_producer(): def __init__(self, kafkahost, kafkaport, kafkatopic, key): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.key = key self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort) ) def sendjsondata(self, params): try: parmas_message = params producer = self.producer producer.send(self.kafkatopic, key=self.key, value=parmas_message.encode('utf-8')) producer.flush() except KafkaError as e: print(e) class Kafka_consumer(): def __init__(self, kafkahost, kafkaport, kafkatopic, groupid,key): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.groupid = groupid self.key = key self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid, bootstrap_servers='{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort) ) def consume_data(self): try: for message in self.consumer: yield message except KeyboardInterrupt as e: print(e) def sortedDictValues(adict): items = adict.items() items=sorted(items,reverse=False) return [value for key, value in items] def main(xtype, group, key): if xtype == "p": # 生产模块 producer = Kafka_producer(KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC, key) print("===========> producer:", producer) params =key_value producer.sendjsondata(params) if xtype == 'c': # 消费模块 consumer = Kafka_consumer(KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC, group,key) print("===========> consumer:", consumer) message = consumer.consume_data() for msg in message: msg=msg.value.decode('utf-8') python_data=json.loads(msg) ##字符串转换成字典 key_list=list(python_data) test_data=pd.DataFrame() for index in key_list: if index=='Name': a1=python_data[index] data1 = sortedDictValues(a1) test_data[index]=data1 else: a2 = python_data[index] data2 = sortedDictValues(a2) test_data[index] = data2 print(test_data) if __name__ == '__main__': main(xtype='p',group='py_test',key=None) main(xtype='c',group='py_test',key=None)
Kafka与MySQL的组合使用
需要完成的任务是,把JSON格式数据放入Kafka发送出去,然后,再从Kafka中获取到JSON格式数据,对其进行解析并写入到MySQL数据库。
编写一个生产者程序producer_json.py:
# producer_json.py from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8')) # 连接kafka data={ "sno":"95001", "name":"John", "sex":"M", "age":23 } producer.send('json_topic', data) # 发送的topic为json_topic producer.close()
编写一个消费者程序consumer_json.py:
from kafka import KafkaConsumer import json import pymysql.cursors consumer = KafkaConsumer('json', bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest') for msg in consumer: msg1=str(msg.value, encoding = "utf-8") dict = json.loads(msg1) # 连接数据库 connect = pymysql.Connect( host='localhost', port=3306, user='root', # 数据库用户名 passwd='123456', # 密码 db='school', charset='utf8' ) # 获取游标 cursor = connect.cursor() # 插入数据 sql = "INSERT INTO student(sno,sname,ssex,sage) VALUES ('%s', '%s', '%s', %d)" data = (dict['sno'],dict['name'],dict['sex'],dict['age']) cursor.execute(sql % data) connect.commit() print('成功插入数据') # 关闭数据库连接 connect.close()
在Windows系统中启动MySQL服务,然后,打开MySQL数据库的命令行界面,输入如下SQL语句创建数据库school:
mysql> CREATE DATABASE school;
创建好数据库school以后,可以使用如下SQL语句打开数据库:
mysql> USE school;
使用如下SQL语句创建一个表student:
mysql>CREATE TABLE student(
-> sno char(5),
-> sname char(10),
-> ssex char(2),
-> sage int);
使用如下SQL语句查看已经创建的表:
mysql> SHOW TABLES;
在Windows系统中启动Zookeeper服务和Kafka服务,然后,先执行生产者程序producer_json.py,再执行消费者程序consumer_json.py,执行成功以后,使用如下命令查看MySQL数据库中新插入的记录:
mysql> SELECT * FROM student;
可以看到,一条记录已经被成功地插入到了MySQL数据库。
标签:producer,self,kafka,key,使用,test,Kafka From: https://www.cnblogs.com/qi-6666/p/16904189.html