首页 > 其他分享 >deepsort主要代码

deepsort主要代码

时间:2023-05-05 14:46:30浏览次数:27  
标签:blue deepsort polygon 代码 list yellow 主要 image id

app.py

import json
import os
import time

import numpy as np
import requests

import objtracker
from exts import passers_by_requests_post, video_post, data_post, capture_path
from objdetector import Detector
from objdetector_car import Detector as CarDetector
import cv2

from pydub import AudioSegment
from pydub.playback import play

from flask import Flask, Response, request

from urls import host_ip
import _thread

app = Flask(__name__)


def run(VIDEO_PATH, data_post_url, data_dict, label):
    # 根据视频尺寸,填充供撞线计算使用的polygon
    width = 1920
    height = 1080
    mask_image_temp = np.zeros((height, width), dtype=np.uint8)

    # 用于记录轨迹信息
    pts = {}

    # 填充第一个撞线polygon(蓝色)
    list_pts_blue = [[204, 305], [227, 431], [605, 522], [1101, 464], [1900, 601], [1902, 495], [1125, 379], [604, 437],
                     [299, 375], [267, 289]]
    ndarray_pts_blue = np.array(list_pts_blue, np.int32)
    polygon_blue_value_1 = cv2.fillPoly(mask_image_temp, [ndarray_pts_blue], color=1)
    polygon_blue_value_1 = polygon_blue_value_1[:, :, np.newaxis]

    # 填充第二个撞线polygon(黄色)
    mask_image_temp = np.zeros((height, width), dtype=np.uint8)
    list_pts_yellow = [[181, 305], [207, 442], [603, 544], [1107, 485], [1898, 625], [1893, 701], [1101, 568],
                       [594, 637], [118, 483], [109, 303]]
    ndarray_pts_yellow = np.array(list_pts_yellow, np.int32)
    polygon_yellow_value_2 = cv2.fillPoly(mask_image_temp, [ndarray_pts_yellow], color=2)
    polygon_yellow_value_2 = polygon_yellow_value_2[:, :, np.newaxis]

    # 撞线检测用的mask,包含2个polygon,(值范围 0、1、2),供撞线计算使用
    polygon_mask_blue_and_yellow = polygon_blue_value_1 + polygon_yellow_value_2

    # 缩小尺寸,1920x1080->960x540
    polygon_mask_blue_and_yellow = cv2.resize(polygon_mask_blue_and_yellow, (width // 2, height // 2))

    # 蓝 色盘 b,g,r
    blue_color_plate = [255, 0, 0]
    # 蓝 polygon图片
    blue_image = np.array(polygon_blue_value_1 * blue_color_plate, np.uint8)

    # 黄 色盘
    yellow_color_plate = [0, 255, 255]
    # 黄 polygon图片
    yellow_image = np.array(polygon_yellow_value_2 * yellow_color_plate, np.uint8)

    # 彩色图片(值范围 0-255)
    color_polygons_image = blue_image + yellow_image

    # 缩小尺寸,1920x1080->960x540
    color_polygons_image = cv2.resize(color_polygons_image, (width // 2, height // 2))

    # list 与蓝色polygon重叠
    list_overlapping_blue_polygon = []

    # list 与黄色polygon重叠
    list_overlapping_yellow_polygon = []

    # 下行数量
    down_count = 0
    # 上行数量
    up_count = 0

    font_draw_number = cv2.FONT_HERSHEY_SIMPLEX
    draw_text_postion = (int((width / 2) * 0.01), int((height / 2) * 0.05))

    # 实例化yolov5检测器
    if label == "person":
        detector = Detector()
    elif label == "car":
        detector = CarDetector()

    # 打开视频
    capture = cv2.VideoCapture(VIDEO_PATH)

    # song = AudioSegment.from_wav("程序已经启动.wav")
    # play(song)
    # os.system("mpg123 程序已经启动.mp3")
    # try:
    #     _thread.start_new_thread(os.system, ("mpg123 程序已经启动.mp3",))
    # except:
    #     print("Error: 无法启动线程")

    # speak_old_time = time.time()
    old_time = time.time()
    while True:
        # 读取每帧图片
        _, im = capture.read()
        if im is None:
            break

        # 缩小尺寸,1920x1080->960x540
        im = cv2.resize(im, (width // 2, height // 2))

        list_bboxs = []
        # 更新跟踪器
        output_image_frame, list_bboxs = objtracker.update(detector, im)
        source_output_image_frame = output_image_frame
        # print(list_bboxs)
        # 输出图片
        output_image_frame = cv2.add(output_image_frame, color_polygons_image)

        if len(list_bboxs) > 0:
            # ----------------------判断撞线----------------------
            for item_bbox in list_bboxs:
                # print(item_bbox)
                x1, y1, x2, y2, _, track_id = item_bbox
                # 撞线检测点,(x1,y1),y方向偏移比例 0.0~1.0
                y1_offset = int(y1 + ((y2 - y1) * 0.5))
                x1_offset = int(x1 + ((x2 - x1) * 0.5))
                # 撞线的点
                y = y1_offset
                x = x1_offset

                # 然后每检测出一个预测框,就将中心点加入队列
                center = (x, y)
                if track_id in pts:
                    pts[track_id].append(center)
                else:
                    pts[track_id] = []
                    pts[track_id].append(center)

                thickness = 2
                cv2.circle(output_image_frame, (center), 1, [255, 255, 255], thickness)

                for j in range(1, len(pts[track_id])):
                    if pts[track_id][j - 1] is None or pts[track_id][j] is None:
                        continue
                    cv2.line(output_image_frame, (pts[track_id][j - 1]), (pts[track_id][j]), [255, 255, 255], thickness)

                if polygon_mask_blue_and_yellow[y, x] == 1:
                    # 如果撞 蓝polygon
                    if track_id not in list_overlapping_blue_polygon:
                        list_overlapping_blue_polygon.append(track_id)
                    # 判断 黄polygon list里是否有此 track_id
                    # 有此track_id,则认为是 UP (上行)方向
                    if track_id in list_overlapping_yellow_polygon:
                        # 上行+1
                        up_count += 1
                        # TODO print('up count:', up_count, ', up id:', list_overlapping_yellow_polygon)
                        print('up count:', up_count, ', up id:', list_overlapping_yellow_polygon)
                        # try:
                        #     passers_by_requests_post({
                        #         'up_or_down': 'up',
                        #         'count': down_count,
                        #         'id': list_overlapping_blue_polygon
                        #     })
                        # except:
                        #     print("无法连接到服务器")
                        # 删除 黄polygon list 中的此id
                        list_overlapping_yellow_polygon.remove(track_id)

                elif polygon_mask_blue_and_yellow[y, x] == 2:
                    # 如果撞 黄polygon
                    if track_id not in list_overlapping_yellow_polygon:
                        list_overlapping_yellow_polygon.append(track_id)
                    # 判断 蓝polygon list 里是否有此 track_id
                    # 有此 track_id,则 认为是 DOWN(下行)方向
                    if track_id in list_overlapping_blue_polygon:
                        # 下行+1
                        down_count += 1
                        # TODO print('down count:', down_count, ', down id:', list_overlapping_blue_polygon)
                        print('down count:', down_count, ', down id:', list_overlapping_blue_polygon)
                        # try:
                        #     passers_by_requests_post({
                        #         'up_or_down': 'down',
                        #         'count': down_count,
                        #         'id': list_overlapping_blue_polygon
                        #     })
                        # except:
                        #     print("无法连接到服务器")
                        # 删除 蓝polygon list 中的此id
                        list_overlapping_blue_polygon.remove(track_id)
            # ----------------------清除无用id----------------------
            list_overlapping_all = list_overlapping_yellow_polygon + list_overlapping_blue_polygon
            for id1 in list_overlapping_all:
                is_found = False
                for _, _, _, _, _, bbox_id in list_bboxs:
                    if bbox_id == id1:
                        is_found = True
                if not is_found:
                    # 如果没找到,删除id
                    if id1 in list_overlapping_yellow_polygon:
                        list_overlapping_yellow_polygon.remove(id1)

                    if id1 in list_overlapping_blue_polygon:
                        list_overlapping_blue_polygon.remove(id1)
            list_overlapping_all.clear()
            # 清空list
            list_bboxs.clear()
        else:
            # 如果图像中没有任何的bbox,则清空list
            list_overlapping_blue_polygon.clear()
            list_overlapping_yellow_polygon.clear()

        # 输出计数信息
        text_draw = 'lane one: ' + str(down_count) + \
                    ' , lane two: ' + str(up_count)
        source_output_image_frame = cv2.putText(img=source_output_image_frame, text=text_draw,
                                         org=draw_text_postion,
                                         fontFace=font_draw_number,
                                         fontScale=0.75, color=(0, 0, 255), thickness=2)
        output_image_frame = cv2.putText(img=output_image_frame, text=text_draw,
                                         org=draw_text_postion,
                                         fontFace=font_draw_number,
                                         fontScale=0.75, color=(0, 0, 255), thickness=2)
        # TODO send data
        res_dict = {"down_count": down_count, "up_count": up_count}
        print("send data:", res_dict)
        # data_post(res_dict, data_post_url)
        # 创建线程
        try:
            _thread.start_new_thread(data_post, (res_dict, data_post_url,))
        except:
            print("Error: 无法启动线程")

        # cv2.imshow('Counting Demo', output_image_frame)
        # cv2.waitKey(1)
        # TODO 上传图片到服务器
        if time.time() - old_time > 5:
            old_time = time.time()
            img_path = os.path.join(os.getcwd(), "results", "result.png")
            print('img_path:', img_path)
            cv2.imwrite(img_path, output_image_frame)
            files = {'file': open(img_path, 'rb')}
            # 创建线程
            try:
                _thread.start_new_thread(video_post, (files, data_dict,))
            except:
                print("Error: 无法启动线程")

        cv2.imshow("f", source_output_image_frame)
        cv2.waitKey(1)
        # TODO stream results
        ret, jpeg = cv2.imencode('.jpg', source_output_image_frame)
        frame = jpeg.tobytes()
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')

        # if time.time() - old_time > 10:
        #     old_time = time.time()
        #     img_path = os.path.join(os.getcwd(), "result.png")
        #     # cv2.imwrite(img_path, cv2.resize(output_image_frame, (160, 160)))
        #     cv2.imwrite(img_path, output_image_frame)
        #     files = {'file': open(img_path, 'rb')}
        #     try:
        #         status_ = video_post(files)
        #         # print(status_)
        #     except:
        #         print('无法连接到服务器')

    capture.release()
    cv2.destroyAllWindows()


@app.route("/")
def person():
    source_ = request.args.get("source")
    # realtime = request.args.get("realtime")
    if source_ is not None:
        source = source_
    else:
        source = "video/test_person.mp4"

    opt_dict = {
        "VIDEO_PATH": source,
        "data_post_url": "/api/passers_by/set_data",
        'data_dict': {"pic_type": 'deepsort_person'},
        'label': "person"
    }

    return Response(run(**opt_dict), mimetype='multipart/x-mixed-replace; boundary=frame')


@app.route("/car")
def car():
    realtime = request.args.get("realtime")
    if realtime == "true":
        source = capture_path()
    else:
        source = "video/test_traffic.mp4"

    opt_dict = {
        "VIDEO_PATH": source,
        "data_post_url": "/api/car/set_data",
        'data_dict': {"pic_type": 'deepsort_car'},
        'label': "car"
    }

    return Response(run(**opt_dict), mimetype='multipart/x-mixed-replace; boundary=frame')


if __name__ == "__main__":
    app.run(
        port=5002,
        host='0.0.0.0'
    )

    # with open('../../configs.json', 'r') as f:
    #     configs = json.load(f)
    #
    # if configs["function"]["5002"] is True:
    #     app.run(
    #         port=5002,
    #         host='0.0.0.0'
    #     )
    # else:
    #     print("请在配置文件中开启该功能 !!")

标签:blue,deepsort,polygon,代码,list,yellow,主要,image,id
From: https://www.cnblogs.com/bitterteaer/p/17374091.html

相关文章

  • pipeline 多个代码库到不同目录
    pipeline{agentanystages{stage('CloneRepository1'){steps{dir('repo1'){gitbranch:'main',url:'https://github.com/example/repo1.git'......
  • Typora+MinIO+Python代码打造舒适协作环境
    作者:IT王小二博客:https://itwxe.com不知不觉大半年没更新了...前面小二介绍过使用Typora+MinIO+Java代码打造舒适写作环境,然后有很多大佬啊,说用Java来实现简直是杀鸡用上牛刀,小二想了想,确实有点...正好小二最近在学习Python,所以咱们就改用Python实现一版。安装MinIO安装参考......
  • 如何通过C#/VB.NET代码将PowerPoint转换为HTML
    利用PowerPoint可以很方便的呈现多媒体信息,且信息形式多媒体化,表现力强。但难免在某些情况下我们会需要将PowerPoint转换为HTML格式。因为HTML文档能独立于各种操作系统平台(如Unix,Windows等)。并且它可以加入图片、声音、动画、影视等内容,还能从一个文件跳转到另一个文件,与世界各地......
  • 城市“一网统管”智能视频平台EasyCVR配置中心代码细节优化
    EasyCVR视频融合平台基于云边端一体化架构,具有强大的数据接入、处理及分发能力,平台支持海量视频汇聚管理,能在复杂的网络环境中,将分散的各类视频资源进行统一汇聚、整合、集中管理,实现视频直播、云端录像、云存储、检索回看、智能告警、平台级联、服务器集群、云台控制与语音对讲、......
  • Flutter 如何将代码显示到界面上
    前言如何优雅的将项目中的代码,亦或是你的demo代码展示到界面上?本文对使用简单、便于维护且通用的解决方案,进行相关的对比和探究为了节省大家的时间,把最终解决方案的相关接入和用法写在前面预览代码快速开始接入:pub,githubdependencies:code_preview:^0.1.5用法:CodeP......
  • [vscode] 代码提示不能默认选中第一项问题
    [vscode]代码提示不能默认选中第一项问题码代码时发现一个问题,有些代码提示无法选中第一项。如果是所有代码提示都无法选中第一项,直接百度。这里说的是另一种,部分代码无法默认选中。在输入类似class=“”这类代码的代码自动补全回车后,输入代码触发代码提示后不会默认选中第......
  • 命令行编译和执行java代码
    虽然现在IDE很强大又很智能,但是平常随意写点练手的代码的时候,直接在命令行中使用vim和java命令更为方便快捷,可以做到无鼠标纯键盘的操作。首先保证将java相关指令添加到了环境变量中;1.编译class文件:javac-d./Test.java编译好的class文件会放置到环境当前目录(./)中。-d......
  • 从代码上解析Meta-learning
    文章目录1.背景2.Meta-learning理解2.1Meta-learning到底做什么2.2MAML算法2.3MAML算法步骤2.4MAML代码分析和实现3.参考文章1.背景meta-learning区别于pretraining,它主要通过多个task来学习不同任务之间的内在联系,通俗点说,也即是通过多个任务来学习共同的参数。举个例子,人类在......
  • 代码自动生成:Github Copilot
    2021年,Github和OpenAI合作,基于GPT-3模型推出了可以代码自动编码的插件:githubcopilot。1.安装在vscode软件中,找到githubcopilot进行安装:因为目前copilot还是在测试阶段,需要进行测试人员申请才能够真正使用:https://copilot.github.com/2.代码生成例子自动生成在python代码自动生......
  • 基于深度神经网络的图像分类与训练系统(MATLAB GUI版,代码+图文详解)
    摘要:本博客详细介绍了基于深度神经网络的图像分类与训练系统的MATLAB实现代码,包括GUI界面和数据集,可选择模型进行图片分类,支持一键训练神经网络。首先介绍了基于GoogleNet、ResNet进行图像分类的背景、意义,系统研究现状及相关算法。然后展示了系统的界面演示效果,包括选择图片分......