首页 > 系统相关 >海康工业相机使用Python成像,web实时预览(Linux)

海康工业相机使用Python成像,web实时预览(Linux)

时间:2023-09-11 18:01:39浏览次数:65  
标签:node web Exception MV Python self ret camera Linux

Python 实现海康机器人工业相机的实时显示视频流及拍照功能(Linux)
代码是在ubuntu系统的orin nano 板子上跑的程序,有需要的大佬自行研究更改
支持 网口相机 和 usb口相机 并且理论上window和Linux通用但是我没有试windows平台

代码如下:

import sys
from ctypes import *
import os
import numpy as np
import time
import cv2

sys.path.append("./MvImport") #打开MVS中的MvImport文件,对于不同系统打开的文件路径跟随实际文件路径变化即可
from MvCameraControl_class import * #调用了MvCameraControl_class.py文件


class HKCamera():
    def __init__(self, CameraIdx=0, log_path=None):
        # enumerate all the camera devices
        deviceList = self.enum_devices()
        self.stop_capturing = False

        # generate a camera instance
        self.camera = self.open_camera(deviceList, CameraIdx, log_path)
        self.start_camera()

    def __del__(self):
        # 结束循环取流
        self.stop_capturing = True
        # 关闭cv2窗口
        cv2.destroyAllWindows()
        if self.camera is None:
            return

        # 停止取流
        ret = self.camera.MV_CC_StopGrabbing()
        if ret != 0:
            raise Exception("stop grabbing fail! ret[0x%x]" % ret)

        # 关闭设备
        ret = self.camera.MV_CC_CloseDevice()
        if ret != 0:
            raise Exception("close deivce fail! ret[0x%x]" % ret)

        # 销毁句柄
        ret = self.camera.MV_CC_DestroyHandle()
        if ret != 0:
            raise Exception("destroy handle fail! ret[0x%x]" % ret)

    @staticmethod
    def enum_devices(device=0, device_way=False):
        """
        device = 0  枚举网口、USB口、未知设备、cameralink 设备
        device = 1 枚举GenTL设备
        """
        if device_way == False:
            if device == 0:
                cameraType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_UNKNOW_DEVICE | MV_1394_DEVICE | MV_CAMERALINK_DEVICE
                deviceList = MV_CC_DEVICE_INFO_LIST()
                # 枚举设备
                ret = MvCamera.MV_CC_EnumDevices(cameraType, deviceList)
                if ret != 0:
                    raise Exception("enum devices fail! ret[0x%x]" % ret)
                return deviceList
            else:
                pass
        elif device_way == True:
            pass

    def open_camera(self, deviceList, CameraIdx, log_path):
        # generate a camera instance
        camera = MvCamera()

        # 选择设备并创建句柄
        stDeviceList = cast(deviceList.pDeviceInfo[CameraIdx], POINTER(MV_CC_DEVICE_INFO)).contents
        if log_path is not None:
            ret = self.camera.MV_CC_SetSDKLogPath(log_path)
            if ret != 0:
                raise Exception("set Log path  fail! ret[0x%x]" % ret)

            # 创建句柄,生成日志
            ret = camera.MV_CC_CreateHandle(stDeviceList)
            if ret != 0:
                raise Exception("create handle fail! ret[0x%x]" % ret)
        else:
            # 创建句柄,不生成日志
            ret = camera.MV_CC_CreateHandleWithoutLog(stDeviceList)
            if ret != 0:
                raise Exception("create handle fail! ret[0x%x]" % ret)
        if ret != 0:
            raise Exception("Set BalanceWhiteAuto faill! ter[0x%x]" % ret)
        # 打开相机
        ret = camera.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
        if ret != 0:
            raise Exception("open device fail! ret[0x%x]" % ret)
        # # 探测网络最佳包大小(只对GigE相机有效)
        # if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
        #     nPacketSize = camera.MV_CC_GetOptimalPacketSize()
        #     if int(nPacketSize)>0:
        #         ret = camera.MV_CC_SetIntValue("GevSCPSPacketSize",nPacketSize)
        #         if ret != 0:
        #             raise Exception("Warning: Set Packet Size faill! ret[0x%x]" % ret)
        #     else:
        #         raise Exception("Warning: Get Packet Size faill! ret[0x%x]" % nPacketSize)
        # 设置触发模式为off
        ret = camera.MV_CC_SetEnumValue("TriggerMode",MV_TRIGGER_MODE_OFF)
        if ret != 0:
            raise Exception("set TriggerMode fail! ret[0x%x]" % ret)
        # 关闭自动曝光时间
        ret = camera.MV_CC_SetEnumValue("ExposureAuto",MV_EXPOSURE_AUTO_MODE_OFF)
        if ret != 0:
            raise Exception("set ExposureAuto fail! ret[0x%x]" % ret)
        # 自动增益  连续模式
        ret = camera.MV_CC_SetEnumValue("GainAuto",MV_GAIN_MODE_CONTINUOUS)
        if ret != 0:
            raise Exception("set GainAuto fail! ret[0x%x]" % ret)
        # 增益值 范围 0 - 23.98dB
        # ret = camera.MV_CC_SetFloatValue("Gain",3)
        # if ret != 0:
        #     raise Exception("set Gain fail! ret[0x%x]" % ret)
        # 设置采集帧率 范围:0.1 - 100000
        ret = camera.MV_CC_SetFloatValue("AcquisitionFrameRate", float(50.00))
        if ret != 0:
            raise Exception("Set AcquisitionFrameRate fail! ter[0x%x]" % ret)
        # 设置曝光时间 范围 15 - 9999448
        ret = camera.MV_CC_SetFloatValue("ExposureTime", float(800.00))
        if ret != 0:
            raise Exception("Set ExposureTime fail! ter[0x%x]" % ret)
        # 设置自动白平衡
        ret = camera.MV_CC_SetEnumValue("BalanceWhiteAuto", 1)
        if ret !=0:
            raise Exception("Set BalanceWhiteAuto fail! ret[0x%x]" % ret)
        # 设置亮度 范围 0 - 255
        ret = camera.MV_CC_SetIntValue("Brightness", 80)
        if ret != 0:
            raise Exception("Set Brightness fail! ret[0x%x]" % ret)
        # 设置像素格式
        ret = camera.MV_CC_SetEnumValue("PixelFormat", 0x02180014)
        if ret != 0:
            raise Exception("Set PixelFormat fail! ret[0x%x]" % ret)

        return camera

    def start_camera(self):
        stParam = MVCC_INTVALUE()
        memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))

        ret = self.camera.MV_CC_GetIntValue("PayloadSize", stParam)
        if ret != 0:
            raise Exception("get payload size fail! ret[0x%x]" % ret)

        self.nDataSize = stParam.nCurValue
        self.pData = (c_ubyte * self.nDataSize)()
        self.stFrameInfo = MV_FRAME_OUT_INFO_EX()
        memset(byref(self.stFrameInfo), 0, sizeof(self.stFrameInfo))

        self.camera.MV_CC_StartGrabbing()

    def get_Value(self, param_type, node_name):
        """
        :param cam:            相机实例
        :param_type:           获取节点值得类型
        :param node_name:      节点名 可选 int 、float 、enum 、bool 、string 型节点
        :return:               节点值
        """
        if param_type == "int_value":
            stParam = MVCC_INTVALUE_EX()
            memset(byref(stParam), 0, sizeof(MVCC_INTVALUE_EX))
            ret = self.camera.MV_CC_GetIntValueEx(node_name, stParam)
            if ret != 0:
                raise Exception("获取 int 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))
            return stParam.nCurValue

        elif param_type == "float_value":
            stFloatValue = MVCC_FLOATVALUE()
            memset(byref(stFloatValue), 0, sizeof(MVCC_FLOATVALUE))
            ret = self.camera.MV_CC_GetFloatValue(node_name, stFloatValue)
            if ret != 0:
                raise Exception("获取 float 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))
            return stFloatValue.fCurValue

        elif param_type == "enum_value":
            stEnumValue = MVCC_ENUMVALUE()
            memset(byref(stEnumValue), 0, sizeof(MVCC_ENUMVALUE))
            ret = self.camera.MV_CC_GetEnumValue(node_name, stEnumValue)
            if ret != 0:
                raise Exception("获取 enum 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))
            return stEnumValue.nCurValue

        elif param_type == "bool_value":
            stBool = c_bool(False)
            ret = self.camera.MV_CC_GetBoolValue(node_name, stBool)
            if ret != 0:
                raise Exception("获取 bool 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))
            return stBool.value

        elif param_type == "string_value":
            stStringValue = MVCC_STRINGVALUE()
            memset(byref(stStringValue), 0, sizeof(MVCC_STRINGVALUE))
            ret = self.camera.MV_CC_GetStringValue(node_name, stStringValue)
            if ret != 0:
                raise Exception("获取 string 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))
            return stStringValue.chCurValue

        else:
            return None

    def set_Value(self, param_type, node_name, node_value):
        """
        :param cam:               相机实例
        :param param_type:        需要设置的节点值得类型
            int:
            float:
            enum:     参考于客户端中该选项的 Enum Entry Value 值即可
            bool:     对应 0 为关,1 为开
            string:   输入值为数字或者英文字符,不能为汉字
        :param node_name:         需要设置的节点名
        :param node_value:        设置给节点的值
        :return:
        """
        if param_type == "int_value":
            ret = self.camera.MV_CC_SetIntValueEx(node_name, int(node_value))
            if ret != 0:
                raise Exception("设置 int 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))

        elif param_type == "float_value":
            ret = self.camera.MV_CC_SetFloatValue(node_name, float(node_value))
            if ret != 0:
                raise Exception("设置 float 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))

        elif param_type == "enum_value":
            ret = self.camera.MV_CC_SetEnumValue(node_name, node_value)
            if ret != 0:
                raise Exception("设置 enum 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))

        elif param_type == "bool_value":
            ret = self.camera.MV_CC_SetBoolValue(node_name, node_value)
            if ret != 0:
                raise Exception("设置 bool 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))

        elif param_type == "string_value":
            ret = self.camera.MV_CC_SetStringValue(node_name, str(node_value))
            if ret != 0:
                raise Exception("设置 string 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))

    def set_exposure_time(self, exp_time):
        self.set_Value(param_type="float_value", node_name="ExposureTime", node_value=float(exp_time))

    # 获取曝光时间值
    def get_exposure_time(self):
        return self.get_Value(param_type="float_value", node_name="ExposureTime")

    # 获取自动白平衡 1开启 0关闭
    def get_balance_white_auto(self):
        return self.get_Value(param_type="enum_value", node_name="BalanceWhiteAuto")

    def get_image(self, width=None):
        """
        :param cam:     相机实例
        :active_way:主动取流方式的不同方法 分别是(getImagebuffer)(getoneframetimeout)
        :return:
        """
        ret = self.camera.MV_CC_GetOneFrameTimeout(self.pData, self.nDataSize, self.stFrameInfo, 1000)
        if ret == 0:
            # image = np.asarray(self.pData).reshape((self.stFrameInfo.nHeight, self.stFrameInfo.nWidth, 3))
            image = np.array(self.pData)
            image = image.reshape((self.stFrameInfo.nHeight,self.stFrameInfo.nWidth,3))
            # image = np.asarray(self.pData).reshape((self.stFrameInfo.nHeight, self.stFrameInfo.nWidth,2))
            if width is not None:
                image = cv2.resize(image, (width, int(self.stFrameInfo.nHeight * width / self.stFrameInfo.nWidth)))
                pass
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            return image
        else:
            return None

    def show_runtime_info(self, image):
        exp_time = self.get_exposure_time()
        cv2.putText(image, ("exposure time = %1.1fms" % (exp_time * 0.001)), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)

    def saveImage(self,filename):
        image = self.get_image()
        cv2.imwrite('./images/'+filename+'.jpg',image)

    def start(self):
        # while not self.stop_capturing:
        #  image = camera.get_image(width=800)
        #  if image is not None:
        #      camera.show_runtime_info(image)
        #      cv2.imshow("", image)
        try:

            while True:
                image = self.get_image(width=800)
                if image is not None:
                    self.show_runtime_info(image)
                    cv2.imshow("", image)
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q') or key == ord('Q'):
                    cv2.destroyAllWindows()
                    break
        except Exception as e:
            print(e)

if __name__ == '__main__':

    camera = HKCamera()
    # camera.saveImage()
    print(camera.get_exposure_time())
    # camera.set_exposure_time(1234)
    # 42715.0
    print(camera.get_balance_white_auto())
    # camera.set_exposure_time(5000.0)
    camera.start()
    # try:
    #
    #     while True:
    #      image = camera.get_image(width=800)
    #      if image is not None:
    #          camera.show_runtime_info(image)
    #          cv2.imshow("", image)
    #      key = cv2.waitKey(1) & 0xFF
    #      if key == ord('q') or key == ord('Q'):
    #          cv2.destroyAllWindows()
    #          break
    # except Exception as e:
    #     print(e)

代码主要就是设置相机的一些参数、开启相机和取流使用cv2显示,代码中会开启一个cv2窗口程序来实时显示相机内容 按q关闭cv2窗口并停止程序 注意:记得引入sdk MvImport

我的路径如下图所示 复制到你的Python项目中并且引用

如果需要在web浏览器中实时查看的话代码如下:

# -- coding: utf-8 --
import cv2
from flask import Flask, render_template, Response, request

import sys
import datetime
import logging
sys.path.append("../camera") # 相机代码的路径如果相机跟web在一个目录下可以删除
sys.path.append("../camera/MvImport") # 跟上面一样
from Raspberry程序.camera_control import HKCamera


from MvCameraControl_class import *
from JsonResponse import *
from JsonFlask import *
camera = 0

logging.basicConfig(level=logging.DEBUG,  # 控制台打印的日志级别
                    filename='hikrobot.log',
                    filemode='a',  ##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
                    # a是追加模式,默认如果不写的话,就是追加模式
                    format=
                    '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
                    # 日志格式
                    )

# 这里配置一下 template_folder为当前目录,不然可以找不到 index.html
app = JsonFlask(__name__, template_folder='.')
# index
@app.route('/')
def index():
    return render_template('./templates/index.html')


# 获取码流
def gen(camera):
    try:
        while True:
            global open
            if(not open):
                break;
            frame = camera.get_image(800) # 可自定义大小
            ret, jpeg = cv2.imencode('.jpg',frame)
            if jpeg is None:
                logging.error("jpeg is None")
                continue
            hpg_frame = jpeg.tobytes()
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + hpg_frame + b'\r\n')
    except Exception as e:
        logging.error("generate error: %s" % str(e))

# 开始预览
@app.route('/startPreview')
def startPreview():
    # 全局变量用于停止循环
    global open
    open = True
    # 全局变量用于获取相机
    global camera
    camera = HKCamera()
    if camera is None:
        logging.error("can't open camera")
        quit()
    else:
        logging.info("start to open camera")
    logging.info("open camera ok")
    logging.info("====================================")
    return Response(gen(camera), mimetype='multipart/x-mixed-replace;boundary=frame')
# 停止预览
@app.route('/stopPreview')
def stopPreview():
    global open
    open = False
    global camera
    camera.__del__()
    logging.info("camera closed successfully")
    logging.info("====================================")
    return  JsonResponse.success()

# 执行web服务, 端口号可自行修订
logging.info("start to run camera app, current_time: " + str(datetime.datetime.now()))
app.run(host='0.0.0.0', port=65432, debug=True, threaded=True)

还有两个规范格式的文件:
JsonFlask.py

# -- coding: utf-8 --

from flask import Flask, jsonify
from JsonResponse import *

class JsonFlask(Flask):
    def make_response(self, rv):
        """视图函数可以直接返回: list、dict、None"""
        if rv is None or isinstance(rv, (list, dict)):
            rv = JsonResponse.success(rv)

        if isinstance(rv, JsonResponse):
            rv = jsonify(rv.to_dict())

        return super().make_response(rv)

JsonResponse.py

# -- coding: utf-8 --

class JsonResponse(object):
    """
    统一的json返回格式
    """

    def __init__(self, code, msg, data):
        self.code = code
        self.msg = msg
        self.data = data

    @classmethod
    def success(cls, code=200, msg='success', data=None):
        return cls(code, msg, data)

    @classmethod
    def error(cls, code=-1, msg='error', data=None):
        return cls(code, msg, data)

    def to_dict(self):
        return {
            "code": self.code,
            "msg": self.msg,
            "data": self.data
        }

参考大佬 @龙凌云端 地址:https://www.cnblogs.com/miracle-luna/p/16960556.html 的代码

标签:node,web,Exception,MV,Python,self,ret,camera,Linux
From: https://www.cnblogs.com/HanYork/p/17694125.html

相关文章

  • Linux系统下对SD卡分区步骤
    原文:https://blog.csdn.net/qq_45159348/article/details/125584759好久没写博客了,今天浅写一下Linux系统下对SD卡的分区操作,SD卡分区对嵌入式系统移植是非常重要的,我也是一名小白,记录过程供给有需要的参考。如果有什么不对的地方,欢迎大家指正。一、准备工作(这里是我的配置)系统:U......
  • Python - 桌面自动化(PyAutoGUI)
    一、安装windows:pipinstallpyautogui-ihttps://pypi.tuna.tsinghua.edu.cn/simplemac:pipinstallpyobjc-corepipinstallpyobjcpipinstallpyautoguilinux:sudoapt-getinstallscrotpython3-tkpython3-devpipinstallpython3-xlibpipinstallpyautog......
  • linux常用的10个性能检测命令
    1、uptime该命令可以大致的看出计算机的整体负载情况,loadaverage后的数字分别表示计算机在1min、5min、15min内的平均负载。2、dmesg|tail打印内核环形缓存区中的内容,可以用来查看一些错误3、vmstat1打印进程、内存、交换分区、IO和CPU等的统计信息;vmstat第一次输出表示从开机......
  • Python的requests.post函数上传文件和其他数据
    当使用Python的requests.post函数时,可以在其中添加异常处理来捕获可能的网络错误或HTTP错误。以下是一个示例代码,演示如何使用try-except语句来处理requests.post可能抛出的异常:importrequestsurl='http://cbim.com/upload'files={'file1':('file1.txt',open('file1.t......
  • python一键过杀软
    python过杀软新利用python加载shellcode过360、火绒等杀软先上代码将以下代码保存到mt.pyimportbase64importosimportshutilbuf=b"这里替换shellcode"b64buf=base64.b64encode(buf)lzsds="""importctypesimportbase64shellcode=base64.b64decode(\"......
  • 用python爬取天气
    之前做过这么个小网站,能够爬取天气,然后感觉没什么用,有上网站的时间用手机都看完了,然后就寻思能不能发到自己微信或者qq或者邮箱里先写下怎么把数据提出来 importrequestsimportjsonurl=你自己的网址result=requests.get(url)#print(result.text)data=json.loads......
  • python读写xlsx文件
    importosimporttracebackfromopenpyxlimport*fromlogs.loginimportlogginfromopenpyxl.reader.excelimportload_workbookclassread_data_calss:file_name=r'../../Data/data.xlsx'#在当前路劲执行video_list=[]try:'&......
  • python读取yml文件
    classRead_data_class:defread_yml_def(self,dir_path):withopen(dir_path,'r',encoding='utf-8')asf:yaml_list=yaml.safe_load(f)returnyaml_listifname=='main':passclassmain():path=os.pa......
  • #yyds干货盘点#Linux系统sensors命令 – 检测服务器硬件信息
    sensors命令用于检测服务器硬件信息,例如CPU电压与温度、主板、风扇转速等数据。语法格式 :sensors参考实例检查当前CPU处理器得电压和温度信息[root@linuxcool~]#sensorscoretemp-isa-0000Core0:+48.0°C(high=+87.0°C,crit=+97.0°C)Core1:+46.0......
  • Python——15days
    双层语法糖三层语法糖(多层)装饰器的修复技术(了解)有参装饰器*装饰器的写法:自上而下           执行:自下而上双层:@login_auth@outer执行顺先先执行outer——被装饰名字作为参数传入outer里,get_time=outer(index)。通过调用返回值(前提是如果有双层及以上装饰器......