1 import json 2 import traceback 3 from datetime import datetime 4 import time 5 from pykafka import KafkaClient 6 from utils import * 7 8 # 生成报文信息:process_message_event报文 9 def create_data(): 10 data = { 11 "message": { 12 "project": "scan_api", 13 "packetType": "http", 14 "data": { 15 "id": "4BBZ3FE977D042Z2972AD75D70Z33E01", 16 "httpType": 1, 17 "host": "www.para.com.cn1", 18 "port": 31200, 19 "httpDirection": 2, 20 "uri": "/", 21 "fullUri": "http://www.para.com.cn1/", 22 "requestIp": "20.10.10.11", 23 "requestMethod": "GET", 24 "requestTime": "2023-09-12 15:04:14", 25 "requestTimestamp": 1694502254000, 26 "requestTimestampStr": None, 27 "requestContentLength": 110, 28 "requestContentType": "application/json", 29 "requestHeader": "{\"Referer\":\"http://10.10.2.236:31200\",\"origin\":\"www.httpbin.org\",\"User-Agent\":\"ly-Apache-HttpClient/4.5.13 (Java/1.8.0_151)\",\"Authorization\":\"Para&158*abc34f11111111111111<script >\",\"Accept\":\"text/plain, application/json, application/*+json, */*\",\"Connection\":\"Keep-Alive\",\"Host\":\"10.10.2.236:31269\",\"Accept-Encoding\":\"gzip,deflate\",\"Content-Length\":\"110\",\"Content-Type\":\"application/xml\"}", 30 "requestBody": "{\"password\":\"123456\",\"@type\":\"200\",\"a\":\"1\",\"LOL\":\"Lee Sin\",\"age\":\"18\",\"hero\":\"bigman\"}", 31 "sensitiveChars": None, 32 "statusCode": 200, 33 "costTime": 18, 34 "responseIp": "10.10.2.236", 35 "responseTime": "2023-09-12 15:04:14", 36 "responseTimestamp": 1694502254000, 37 "responseTimestampStr": None, 38 "responseContentLength": 0, 39 "responseContentType": "application/json", 40 "responseHeader": "{\"Set-Cookie\":\"name=123123;SameSite=Lax\",\"content-dispositoin\":\"attachment\",\"access-control-allow-origin\":\"www.httpbin.org\",\"Transfer-Encoding\":\"chunked\",\"Keep-Alive\":\"timeout=60\",\"Connection\":\"keep-alive\",\"Vary\":\"Access-Control-Request-Headers\",\"Date\":\"Mon, 10 Oct 2022 13:54:38 GMT\",\"Content-Type\":\"application/json\",\"Server\":\"nginx/1.2.9\",\"Access-Control-Allow-Origin\":\"*\"}", 41 "responseBody": "{\"name\":\"elasticsearch<script >\",\"cluster_name\":\"elasticsearch\",\"cluster_uuid\":\"2c248XANRVClgOVPd91OiA\",\"version\":{\"number\":\"7.11.5\",\"build_flavor\":\"default\",\"build_type\":\"docker\",\"build_hash\":\"8d61b4f7ddf931f219e3745f295ed2bbc50c8e84\",\"build_date\":\"2022-06-23T21:57:28.736740635Z\",\"build_snapshot\":false,\"lucene_version\":\"8.11.1\",\"minimum_wire_compatibility_version\":\"6.8.0\",\"minimum_index_compatibility_version\":\"6.0.0-beta1\"},\"tagline\":\"You Know, for Search\"}", 42 "complete": True, 43 "hasIllegalAsset": False, 44 "url": "http://confluence.bhaf.com.cn/plugins/servlet/confluence/placeholder/macro", 45 "referer": "http://10.10.2.236:31200", 46 "userAgent": "ly-Apache-HttpClient/4.5.13 (Java/1.8.0_151)" 47 } 48 }, 49 "callBack": None, 50 "messageType": "http" 51 } 52 53 #修改字段 54 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") 55 current_timestamp = int(time.time() * 1000) 56 57 uri = "/anything/test/abc/autoly/AAA/1010-hh-Q4" 58 # host = "192.169.1.255:888" 59 host = "www.mingan.com.cn" 60 port = 31200 61 method = "POST" 62 63 64 data["message"]["data"]["requestTime"] = current_time 65 data["message"]["data"]["requestTimestamp"] = current_timestamp 66 data["message"]["data"]["responseTime"] = current_time 67 data["message"]["data"]["responseTimestamp"] = current_timestamp 68 69 data["message"]["data"]["id"] = getRandomstr(32).upper() 70 71 data["message"]["data"]["host"] = host 72 data["message"]["data"]["uri"] = uri 73 data["message"]["data"]["fullUri"] = "http://" + host + uri 74 data["message"]["data"]["requestMethod"] = method 75 76 # with open(data.json, 'w') as file: 77 # json.dump(data, file, indent=4) 78 print("***报文准备就绪***") 79 return data 80 81 def get_msg(client, pro_topic="testly"): 82 # 消费者 83 topic = client.topics[pro_topic.encode()] 84 consumer = topic.get_simple_consumer(consumer_group=b'test_group', 85 auto_commit_enable=True, 86 auto_commit_interval_ms=1, 87 consumer_id=b'test_id') 88 89 for message in consumer: 90 if message is not None: 91 print(message.offset, message.value.decode('utf-8')) 92 93 94 def send_data_to_kafka(client, data, pro_topic="testly"): 95 # 数据发送到kafka 96 try: 97 topic = client.topics[pro_topic] 98 send_value = str(data) 99 # sync_producer = topic.get_sync_producer() # 异步生产 100 producer = topic.get_producer() # 同步生产 101 producer.produce(bytes(send_value, encoding='utf-8')) 102 producer.stop() 103 print("发送数据到kafka成功") 104 105 except Exception as e: 106 print("生产kafka数据失败:%s, %s" % (e.__traceback__.tb_lineno, traceback.format_exc())) 107 108 if __name__ == "__main__": 109 client = KafkaClient(hosts='192.168.6.101:9092') # 连接kafka 110 send_data_to_kafka(client, create_data(), pro_topic="testly")View Code
标签:current,http,python,topic,json,pyKafka,data,message From: https://www.cnblogs.com/yaner2018/p/18021706