首页 > 其他分享 >yisa_get_msg_from_kafka_per_pn.py

yisa_get_msg_from_kafka_per_pn.py

时间:2023-10-06 22:34:29浏览次数:40  
标签:web get mysql self py per sys time import

 

 

#!/usr/bin/python
#-*- coding: utf-8 -*-
# 抽取kafka数据到redis_mq模块
# 作者:王成
# 日期:2017-04-14
import MySQLdb
import time
import sys
import redis
import requests
import json
import yaml,logging
from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
import time,timeit,datetime,sys,os,threading
import Queue
from daemon import Daemon
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import pickle
import base64
#import pandas as pd

reload(sys)
sys.setdefaultencoding('utf-8')

class MyDaemon(Daemon):
    def execute_sql(self,sql,action='select'):
        try:
            if self.db is None:
                self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8")
            try:
                self.db.ping()
            except MySQLdb.Error,e:
                self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8")
       
            mysql_web = self.db.cursor(MySQLdb.cursors.DictCursor)
            r = mysql_web.execute(sql)
            if action=='select':
                r = mysql_web.fetchall()
            elif action=='update':
                pass
            elif action=='insert':
                r = self.db.insert_id()
            mysql_web.close()
            return r
        except Exception, e:
            logging.exception('连接数据库时错误: %s', str(e))
            r = None
            if action=='select':
                r = []
            elif action=='update':
                pass
            elif action=='insert':
                r = 0
            return r
   
    def format_msg(self,msg):
        try:
            row = {}
            if len(msg['plateNumber'])<7 or msg['plateNumber']=='未识别' or msg['plateNumber']=='无' or msg['plateNumber']=='无车牌':
                row['license_plate'] = '无牌'
            else:
                row['license_plate'] = msg['plateNumber']#.decode("gbk").encode('utf-8')
            row['plate_type_id'] = msg['plateType']
            row['region_id'] = '420100'
            #卡口编号
            row['location_id'] = base64.b64encode(msg['locationId'])
            row['loc_id'] = msg['locationId']
            #设备编号
            row['device_id'] = base64.b64encode(msg['deviceId'])
            row['dev_id'] = msg['deviceId']
            row['lane_id'] = 0
            row['speed'] = 0
            row['direction_id'] = msg['directionId']
            row['capture_time'] = msg['captureTime'].strftime('%Y-%m-%d %H:%M:%S')
            row['image_url'] = msg['imageUrl']
            row['plate_color'] = msg['plateColor']
            
            return row
        except Exception, e:
            logging.exception('格式化信息时错误: %s', str(e))
            return None
            
   
        
    def run(self):      
        config_file = open(os.path.dirname(os.path.abspath(__file__)) + '/config.yaml')
        self.config = yaml.safe_load(config_file)
        config_file.close()
        
        name = 'yisa_get_msg_kafka_loc_2' 
        logging.basicConfig(level=logging.INFO) 
        handler = RotatingFileHandler('/var/log/%s.log' % name, maxBytes=134217728, backupCount=25)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logging.getLogger('').addHandler(handler)
        #-------------------同步输出到控制台-------------------
        # console = logging.StreamHandler()
        # console.setLevel(logging.INFO)
        # formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        # console.setFormatter(formatter)
        # logging.getLogger().addHandler(console)
        #-------------------------------------------------------   
        logging.warning('启动 [%s]', name)
        concurrency_lock=threading.BoundedSemaphore(value=self.config['yisa_get_msg']['pidfile6'])                  
        self.db = None
        self.err_count = 0#统计设备id错误数
        self.err_dev_time = {}#统计设备时间错误数
        self.crop_config = {
            "420100999999999":{"crop_x":0,"crop_y":0,"crop_w":0,"crop_h":0},
        }
        self.MSG_QUEEN = Queue.Queue(0)
        try:
            r = redis.StrictRedis(unix_socket_path=self.config['redis_mq']['unix_socket_path'], password=self.config['redis_mq']['password'])
            pipe = r.pipeline()
            logging.warning('创建连接Kafka...')
            #kafka_brokers = "68.109.211.33:9092,68.109.211.34:9092,68.109.211.35:9092,68.109.211.36:9092,68.109.211.37:9092,68.109.211.39:9092,68.109.211.40:9092,68.109.211.41:9092,68.109.211.42:9092,68.109.211.43:9092"
            kafka_brokers = "68.109.74.4:21005,68.109.74.5:21005,68.109.74.6:21005,68.109.74.7:21005,68.109.74.8:21005"
            # 实例化消费者
            #consumer = KafkaConsumer('txface',bootstrap_servers=kafka_brokers, auto_offset_reset='earliest')
            consumer = KafkaConsumer(bootstrap_servers=kafka_brokers)
            pn_list = []# 元素是列表,0:卡口id,1:过车总数,2:时间
            pn_list.append(['卡口id','过车总数','时间','卡口名称','type_id'])
            for par_num in range(6):
                partition_1 = TopicPartition('txface',par_num)
                offset_start = 107720000
                consumer.assign([partition_1])
                consumer.seek(partition_1,offset=offset_start)
                recv_number = 0
                start = timeit.default_timer()
                list1 = []
                #of = open('./dec_id3.txt' , 'a')
                while 1:
                    break_time = ''
                    try:
                        num = 0
                        for msg in consumer:
                            recv_number += 1
                            # 消息内容
                            message = msg.value
                            offset = msg.offset # kafka偏移量
                            print(msg.partition)
                            if recv_number%5000==0:
                                logging.warning('offset:%d,recv:%d',offset,recv_number)   
                            #continue
                            row = json.loads(message)
                            logging.info(row)
                       #     if num > 100:
                       #         break
                            num = num + 1
                            tmp_list = ['','0','','','']
                            PASS_TIME = row['PASS_TIME']
                            time_stru = time.strptime(PASS_TIME,"%y%m%d%H%M%S")
                            break_time = time_stru
                            print(PASS_TIME)
                            start_time = time.strptime("220727194000","%y%m%d%H%M%S")
                            end_time = time.strptime("220727194500","%y%m%d%H%M%S")
                            if time_stru > end_time:
                                break
                            if time_stru >= start_time and time_stru <= end_time:
                                tmp_list[0] = str(row['TOLLGATE_ID'])
                                tmp_list[1] = '1'
                                tmp_list[2] = time.strftime('%Y-%m-%d %H:%M:%S',time_stru)
                                tmp_list[3] = row['TOLLGATE_NAME']
                                tmp_list[4] = str(row['type_id'])
                            else:
                                continue
                            if pn_list:
                                flag = 0
                                for i in range(len(pn_list)):
                                    if tmp_list[0] in pn_list[i]:
                                        pn_list[i][1] = int(tmp_list[1]) + int(pn_list[i][1])
                                        pn_list[i][1] = str(pn_list[i][1])
                                        pn_list[i][2] = tmp_list[2]
                                        flag = 1
                                if flag == 0:
                                    pn_list.append(tmp_list)
                            else:
                                pn_list.append(tmp_list)
                            try:
                                if row['ECSB'] == 'VJ2139':
                                    logging.info(row)
                                    continue
                                #else:
                                    #logging.info(row['PASS_TIME']+ "-" +row['PassNo'])
                                    #logging.info('null')
                                    #of.write(row['TOLLGATE_ID']+ '\n')
                                #if len(list1) > 2000:
                                    #logging.info(list1)
                                
                            except Exception, e:
                                logging.info("错误 %s",str(e))

                            #msg = self.format_msg(row)
                            #if 0:
                            #    try:
                                    # print msg
                                    # continue
                            #        pipe.incr(time.strftime("%Y%m%d",time.localtime(time.time()))+":LANGXIN")
                            #        pipe.rpush(self.config['redis_mq']['queue_key_name'], json.dumps(msg))#,ensure_ascii=False
                            #        if pipe.__len__()==1:
                            #            start = timeit.default_timer()
                                    #self.MSG_QUEEN.put(msg)
                            #    except UnicodeDecodeError,ude:
                            #        logging.error('编码json时错误: %s',msg['license_plate'])
                            #if timeit.default_timer()-start>1 or pipe.__len__()>100:#1秒或100条入redis一次
                                #pipe.execute()
                                pass
                    except KeyboardInterrupt:
                        logging.error('Ctrl+C,终止运行')
                        return
                    except Exception, e:
                        logging.exception('读取kafka时错误: %s', str(e))
                        time.sleep(10)
                    if break_time > time.strptime("220727194500","%y%m%d%H%M%S"):
                        break
            #print(pn_list)
            for i in pn_list:
                s = ','.join(i)
                #print(s)
                with open("count_kafka_per_location.csv",'a') as f:
                    f.write(s)
                    f.write('\n')
        except Exception, e:
            logging.exception('取数据时错误: %s', str(e))
            sys.exit(0)
    
if __name__ == "__main__":
    daemon = MyDaemon('/var/run/yisa_get_msg_kafka_test_2.pid')
    #daemon.run()
    #sys.exit(0)
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)

 

标签:web,get,mysql,self,py,per,sys,time,import
From: https://www.cnblogs.com/lfxx/p/17745201.html

相关文章

  • dl_images_4.py
      #!/usr/bin/envpython3importosimportsysimportpandasaspdimportrequestsfromrequests.packages.urllib3.utilimportRetryfromrequests.adaptersimportHTTPAdapterfromrequestsimportSessionimporttimeimportloggingfromlogging.handlersi......
  • dl_images_gt.py
      #!/usr/bin/envpython3importosimportsysimportdatetimeimportpandasaspdimportrequestsfromrequestsimportSessionfromrequests.packages.urllib3.utilimportRetryfromrequests.adaptersimportHTTPAdapterimporttimeimportloggingfromlo......
  • pn_recognize_fail_3.py
      #!/usr/bin/python3importosimportsysimportreimportpymysqlimporttimeimportloggingimportpandasaspdimportrequestsfromclickhouse_driverimportClient"""统计佛山市市级卡口的港澳过车总数,识别率及格的总数"""if__name__==&#......
  • pn_recognize_fail_YLKK.py
      #!/usr/bin/python3importosimportsysimportreimportpymysqlimporttimefromdatetimeimporttimedeltafromdatetimeimportdatetimeimportloggingimportpandasaspdimportrequestsfromclickhouse_driverimportClient"""统......
  • pn_recognize_fail_SJKK_2.py
      #!/usr/bin/python3importosimportsysimportreimportpymysqlimporttimefromdatetimeimporttimedeltafromdatetimeimportdatetimeimportloggingimportpandasaspdimportrequestsfromclickhouse_driverimportClientfrompathlibimportPath......
  • pn_recognize_xny2.py
      #!/usr/bin/python3importosimportsysimportreimportpymysqlimporttimeimportloggingimportpandasaspdimportrequestsfromclickhouse_driverimportClientfrompathlibimportPath"""统计佛山市所有卡口的港澳过车总数,识别率""&quo......
  • tcc_pn_recognize_fail.py
      #!/usr/bin/python3importosimportsysimportreimportpymysqlimporttimeimportloggingimportpandasaspdimportrequestsfromclickhouse_driverimportClientfrompathlibimportPath"""统计佛山市停车场的所有卡口的过车总数"""......
  • pn_recognize_fail_SJKK_4.py
      #!/usr/bin/python3importos,statimportsysimportreimportpymysqlimporttimefromdatetimeimporttimedeltafromdatetimeimportdatetimeimportloggingimportpandasaspdimportrequestsfromclickhouse_driverimportClientfrompathlibimport......
  • Python 元组完全指南1
    元组用于在单个变量中存储多个项目。mytuple=("apple","banana","cherry")元组是Python中的4种内置数据类型之一,用于存储数据集合,另外还有列表、集合和字典,它们都具有不同的特性和用途。元组是有序且不可更改的集合。元组使用圆括号表示。示例,创建一个元组:thistuple=......
  • area_recognize_fail.py
      #!/usr/bin/python3importosimportsysimportreimportpymysqlimporttimeimportloggingimportpandasaspdimportrequestsfromclickhouse_driverimportClientif__name__=='__main__':logging.basicConfig(filename=os.path.dirname......