#!/usr/bin/python #-*- coding: utf-8 -*- # 抽取kafka数据到redis_mq模块 # 作者:王成 # 日期:2017-04-14 import MySQLdb import time import sys import redis import requests import json import yaml,logging from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler import time,timeit,datetime,sys,os,threading import Queue from daemon import Daemon from kafka import KafkaConsumer from kafka.structs import TopicPartition import pickle import base64 #import pandas as pd reload(sys) sys.setdefaultencoding('utf-8') class MyDaemon(Daemon): def execute_sql(self,sql,action='select'): try: if self.db is None: self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8") try: self.db.ping() except MySQLdb.Error,e: self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8") mysql_web = self.db.cursor(MySQLdb.cursors.DictCursor) r = mysql_web.execute(sql) if action=='select': r = mysql_web.fetchall() elif action=='update': pass elif action=='insert': r = self.db.insert_id() mysql_web.close() return r except Exception, e: logging.exception('连接数据库时错误: %s', str(e)) r = None if action=='select': r = [] elif action=='update': pass elif action=='insert': r = 0 return r def format_msg(self,msg): try: row = {} if len(msg['plateNumber'])<7 or msg['plateNumber']=='未识别' or msg['plateNumber']=='无' or msg['plateNumber']=='无车牌': row['license_plate'] = '无牌' else: row['license_plate'] = msg['plateNumber']#.decode("gbk").encode('utf-8') row['plate_type_id'] = msg['plateType'] row['region_id'] = '420100' #卡口编号 row['location_id'] = base64.b64encode(msg['locationId']) row['loc_id'] = msg['locationId'] #设备编号 row['device_id'] = base64.b64encode(msg['deviceId']) row['dev_id'] = msg['deviceId'] row['lane_id'] = 0 row['speed'] = 0 row['direction_id'] = msg['directionId'] row['capture_time'] = msg['captureTime'].strftime('%Y-%m-%d %H:%M:%S') row['image_url'] = msg['imageUrl'] row['plate_color'] = msg['plateColor'] return row except Exception, e: logging.exception('格式化信息时错误: %s', str(e)) return None def run(self): config_file = open(os.path.dirname(os.path.abspath(__file__)) + '/config.yaml') self.config = yaml.safe_load(config_file) config_file.close() name = 'yisa_get_msg_kafka_loc_2' logging.basicConfig(level=logging.INFO) handler = RotatingFileHandler('/var/log/%s.log' % name, maxBytes=134217728, backupCount=25) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logging.getLogger('').addHandler(handler) #-------------------同步输出到控制台------------------- # console = logging.StreamHandler() # console.setLevel(logging.INFO) # formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # console.setFormatter(formatter) # logging.getLogger().addHandler(console) #------------------------------------------------------- logging.warning('启动 [%s]', name) concurrency_lock=threading.BoundedSemaphore(value=self.config['yisa_get_msg']['pidfile6']) self.db = None self.err_count = 0#统计设备id错误数 self.err_dev_time = {}#统计设备时间错误数 self.crop_config = { "420100999999999":{"crop_x":0,"crop_y":0,"crop_w":0,"crop_h":0}, } self.MSG_QUEEN = Queue.Queue(0) try: r = redis.StrictRedis(unix_socket_path=self.config['redis_mq']['unix_socket_path'], password=self.config['redis_mq']['password']) pipe = r.pipeline() logging.warning('创建连接Kafka...') #kafka_brokers = "68.109.211.33:9092,68.109.211.34:9092,68.109.211.35:9092,68.109.211.36:9092,68.109.211.37:9092,68.109.211.39:9092,68.109.211.40:9092,68.109.211.41:9092,68.109.211.42:9092,68.109.211.43:9092" kafka_brokers = "68.109.74.4:21005,68.109.74.5:21005,68.109.74.6:21005,68.109.74.7:21005,68.109.74.8:21005" # 实例化消费者 #consumer = KafkaConsumer('txface',bootstrap_servers=kafka_brokers, auto_offset_reset='earliest') consumer = KafkaConsumer(bootstrap_servers=kafka_brokers) pn_list = []# 元素是列表,0:卡口id,1:过车总数,2:时间 pn_list.append(['卡口id','过车总数','时间','卡口名称','type_id']) for par_num in range(6): partition_1 = TopicPartition('txface',par_num) offset_start = 107720000 consumer.assign([partition_1]) consumer.seek(partition_1,offset=offset_start) recv_number = 0 start = timeit.default_timer() list1 = [] #of = open('./dec_id3.txt' , 'a') while 1: break_time = '' try: num = 0 for msg in consumer: recv_number += 1 # 消息内容 message = msg.value offset = msg.offset # kafka偏移量 print(msg.partition) if recv_number%5000==0: logging.warning('offset:%d,recv:%d',offset,recv_number) #continue row = json.loads(message) logging.info(row) # if num > 100: # break num = num + 1 tmp_list = ['','0','','',''] PASS_TIME = row['PASS_TIME'] time_stru = time.strptime(PASS_TIME,"%y%m%d%H%M%S") break_time = time_stru print(PASS_TIME) start_time = time.strptime("220727194000","%y%m%d%H%M%S") end_time = time.strptime("220727194500","%y%m%d%H%M%S") if time_stru > end_time: break if time_stru >= start_time and time_stru <= end_time: tmp_list[0] = str(row['TOLLGATE_ID']) tmp_list[1] = '1' tmp_list[2] = time.strftime('%Y-%m-%d %H:%M:%S',time_stru) tmp_list[3] = row['TOLLGATE_NAME'] tmp_list[4] = str(row['type_id']) else: continue if pn_list: flag = 0 for i in range(len(pn_list)): if tmp_list[0] in pn_list[i]: pn_list[i][1] = int(tmp_list[1]) + int(pn_list[i][1]) pn_list[i][1] = str(pn_list[i][1]) pn_list[i][2] = tmp_list[2] flag = 1 if flag == 0: pn_list.append(tmp_list) else: pn_list.append(tmp_list) try: if row['ECSB'] == 'VJ2139': logging.info(row) continue #else: #logging.info(row['PASS_TIME']+ "-" +row['PassNo']) #logging.info('null') #of.write(row['TOLLGATE_ID']+ '\n') #if len(list1) > 2000: #logging.info(list1) except Exception, e: logging.info("错误 %s",str(e)) #msg = self.format_msg(row) #if 0: # try: # print msg # continue # pipe.incr(time.strftime("%Y%m%d",time.localtime(time.time()))+":LANGXIN") # pipe.rpush(self.config['redis_mq']['queue_key_name'], json.dumps(msg))#,ensure_ascii=False # if pipe.__len__()==1: # start = timeit.default_timer() #self.MSG_QUEEN.put(msg) # except UnicodeDecodeError,ude: # logging.error('编码json时错误: %s',msg['license_plate']) #if timeit.default_timer()-start>1 or pipe.__len__()>100:#1秒或100条入redis一次 #pipe.execute() pass except KeyboardInterrupt: logging.error('Ctrl+C,终止运行') return except Exception, e: logging.exception('读取kafka时错误: %s', str(e)) time.sleep(10) if break_time > time.strptime("220727194500","%y%m%d%H%M%S"): break #print(pn_list) for i in pn_list: s = ','.join(i) #print(s) with open("count_kafka_per_location.csv",'a') as f: f.write(s) f.write('\n') except Exception, e: logging.exception('取数据时错误: %s', str(e)) sys.exit(0) if __name__ == "__main__": daemon = MyDaemon('/var/run/yisa_get_msg_kafka_test_2.pid') #daemon.run() #sys.exit(0) if len(sys.argv) == 2: if 'start' == sys.argv[1]: daemon.start() elif 'stop' == sys.argv[1]: daemon.stop() elif 'restart' == sys.argv[1]: daemon.restart() else: print "Unknown command" sys.exit(2) sys.exit(0) else: print "usage: %s start|stop|restart" % sys.argv[0] sys.exit(2)
标签:web,get,mysql,self,py,per,sys,time,import From: https://www.cnblogs.com/lfxx/p/17745201.html