E:\song\Flask-SocketIO-Chat-master\run.py
#!/bin/env python
from app import create_app, socketio
app = create_app(debug=False)
if __name__ == '__main__':
socketio.run(app, host="127.0.0.1", port=8000)
# app.run(host="127.0.0.1", port=8000, threaded=True)
E:\song\Flask-SocketIO-Chat-master\app\__init__.py
import os
from flask import Flask
from flask_socketio import SocketIO
from flask_cors import CORS
socketio = SocketIO()
def create_app(debug=False):
"""Create an application."""
app = Flask(__name__, template_folder='templates')
app.debug = debug
app.config['SECRET_KEY'] = 'gjr39dkjn344_!67#'
os.environ['OPENCV_CAMERA_SOURCE'] = '0'
from app.routes.main import main as main_blueprint
from app.routes.camera import camrea as camera_blueprint
app.register_blueprint(main_blueprint, url_prefix='/')
app.register_blueprint(camera_blueprint, url_prefix='/camera')
socketio.init_app(app, async_mode='threading', cors_allowed_origins='*')
CORS(app)
return app
E:\song\Flask-SocketIO-Chat-master\app\camera\base_camera.py
import time
import threading
import cv2
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from threading import get_ident
except ImportError:
from _thread import get_ident
class CameraEvent(object):
"""An Event-like class that signals all active clients when a new frame is
available.
"""
def __init__(self):
self.events = {}
def wait(self):
"""Invoked from each client's thread to wait for the next frame."""
ident = get_ident()
if ident not in self.events:
# this is a new client
# add an entry for it in the self.events dict
# each entry has two elements, a threading.Event() and a timestamp
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()
def set(self):
"""Invoked by the camera thread when a new frame is available."""
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
# if this client's event is not set, then set it
# also update the last set timestamp to now
event[0].set()
event[1] = now
else:
# if the client's event is already set, it means the client
# did not process a previous frame
# if the event stays set for more than 5 seconds, then assume
# the client is gone and remove it
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]
def clear(self):
"""Invoked from each client's thread after a frame was processed."""
self.events[get_ident()][0].clear()
class BaseCamera(object):
thread = None # background thread that reads frames from camera
img = None
frame = None # current frame is stored here by background thread
last_access = 0 # time of last client access to the camera
event = CameraEvent()
def __init__(self):
"""Start the background camera thread if it isn't running yet."""
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()
# start background frame thread
BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()
# wait until first frame is available
BaseCamera.event.wait()
def get_frame(self):
"""Return the current camera frame."""
BaseCamera.last_access = time.time()
# wait for a signal from the camera thread
BaseCamera.event.wait()
BaseCamera.event.clear()
return BaseCamera.frame
def get_pic(self):
BaseCamera.last_access = time.time()
# wait for a signal from the camera thread
BaseCamera.event.wait()
BaseCamera.event.clear()
return BaseCamera.frame
@staticmethod
def frames():
""""Generator that returns frames from the camera."""
raise RuntimeError('Must be implemented by subclasses.')
@classmethod
def _thread(cls):
"""Camera background thread."""
# print('Starting camera thread.')
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
# BaseCamera.img = cv2.imdecode(frame, cv2.IMREAD_UNCHANGED)
# cv2.imencode('.jpg', img)[1].tobytes()
BaseCamera.event.set() # send signal to clients
time.sleep(0)
# if there hasn't been any clients asking for frames in
# the last 10 seconds then stop the thread
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print('Stopping camera thread due to inactivity.')
break
BaseCamera.thread = None
E:\song\Flask-SocketIO-Chat-master\app\camera\camera_opencv.py
import os
import cv2
from .base_camera import BaseCamera
class Camera(BaseCamera):
video_source = 0
current_img = None
def __init__(self):
if os.environ.get('OPENCV_CAMERA_SOURCE'):
Camera.set_video_source(int(os.environ['OPENCV_CAMERA_SOURCE']))
super().__init__()
@staticmethod
def set_video_source(source):
Camera.video_source = source
@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError('Could not start camera.')
while True:
# read current frame
_, img = camera.read()
# res = cv2.imwrite('app/cache/test.jpg', img)
# print(res)
# encode as a jpeg image and return it
# Camera.current_img = img
yield cv2.imencode('.jpg', img)[1].tobytes()
E:\song\Flask-SocketIO-Chat-master\app\routes\camera\events.py
from flask_socketio import emit
from app import socketio as sio
@sio.on('camera')
def camera(data):
print(data)
print('截图')
# emit('camera', {'msg': "msg from serve"})
E:\song\Flask-SocketIO-Chat-master\app\routes\camera\routes.py
import cv2
from flask import session, redirect, url_for, render_template, request, Response
from flask import send_file
from . import camrea
from app.camera.camera_opencv import Camera
def gen(camera):
"""Video streaming generator function."""
yield b'--frame\r\n'
while True:
frame = camera.get_frame()
yield b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n--frame\r\n'
@camrea.route('/video', methods=['GET'])
def get():
"""Video streaming route. Put this in the src attribute of an img tag."""
return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')
def get_picture():
return Camera().get_pic()
@camrea.route('/picture', methods=['get'])
def post():
return Response(get_picture(), mimetype='image/jpeg')
E:\song\Flask-SocketIO-Chat-master\app\routes\camera\__init__.py
from flask import Blueprint
camrea = Blueprint('camera', __name__)
from . import routes,events
E:\song\Flask-SocketIO-Chat-master\app\routes\main\events.py
from flask import session
from flask_socketio import emit, join_room, leave_room
from app import socketio as sio
# 连接 事件
@sio.event
def connect(msg):
# 连接成功之后,会自动恢复一个connect事件给前端
print(f'~~~~msg={msg} 连接成功~~~~')
# 断开事件
@sio.event
def disconnect():
print(f'~~~~断开连接~~~~')
# 消息事件
# @sio.event
# def message(msg):
# print(f'~~~~msg = {msg},接受消息 ~~~~')
E:\song\Flask-SocketIO-Chat-master\app\routes\main\routes.py
from flask import session, redirect, url_for, render_template, request
from . import main
@main.route('/', methods=['GET'])
def get():
return 'main get'
# return render_template(url_for('chat.html'))
@main.route('/', methods=['POST'])
def post():
return 'main post'
E:\song\Flask-SocketIO-Chat-master\app\routes\main\__init__.py
from flask import Blueprint
main = Blueprint('main', __name__)
from . import routes, events
E:\song\Flask-SocketIO-Chat-master\app\templates\chat.html
<html>
<head>
<title>Flask-SocketIO-Chat: {{ room }}</title>
<script type="text/javascript" src="//code.jquery.com/jquery-1.4.2.min.js"></script>
<script type="text/javascript" src="//cdn.socket.io/4.4.1/socket.io.min.js"></script>
<script type="text/javascript" charset="utf-8">
var socket;
$(document).ready(function(){
socket = io.connect('http://' + document.domain + ':' + location.port + '/chat');
socket.on('connect', function() {
socket.emit('joined', {});
});
socket.on('status', function(data) {
$('#chat').val($('#chat').val() + '<' + data.msg + '>\n');
$('#chat').scrollTop($('#chat')[0].scrollHeight);
});
socket.on('message', function(data) {
$('#chat').val($('#chat').val() + data.msg + '\n');
$('#chat').scrollTop($('#chat')[0].scrollHeight);
});
$('#text').keypress(function(e) {
var code = e.keyCode || e.which;
if (code == 13) {
text = $('#text').val();
$('#text').val('');
socket.emit('text', {msg: text});
}
});
});
function leave_room() {
socket.emit('left', {}, function() {
socket.disconnect();
// go back to the login page
window.location.href = "{{ url_for('main.index') }}";
});
}
</script>
</head>
<body>
<h1>Flask-SocketIO-Chat: {{ room }}</h1>
<textarea id="chat" cols="80" rows="20"></textarea><br><br>
<input id="text" size="80" placeholder="Enter your message here"><br><br>
<a href="#" onclick="leave_room();">Leave this room</a>
</body>
</html>
E:\song\Flask-SocketIO-Chat-master\app\templates\index.html
<html>
<head>
<title>Flask-SocketIO-Chat</title>
</head>
<body>
<h1>Flask-SocketIO-Chat</h1>
<form method="POST">
{{ form.hidden_tag() }}
{{ form.name.label }}: {{ form.name() }} {% for error in form.name.errors %}{{ error }}{% endfor %}<br>
{{ form.room.label }}: {{ form.room() }} {% for error in form.room.errors %}{{ error }}{% endfor %}<br>
{{ form.submit() }}
</form>
</body>
</html>
E:\song\Flask-SocketIO-Chat-master\app\test\osenv.py
import os
print(os.environ.get('OPENCV_CAMERA_SOURCE'))
print(os.environ.get('OPENCV_CAMERA_SOURCE'))
E:\song\Flask-SocketIO-Chat-master\app\test\shot.py
import cv2
#基本绘图
# import numpy
#
cv2.namedWindow("Image") #创建窗口
#抓取摄像头视频图像
cap = cv2.VideoCapture(0) #创建内置摄像头变量
while(cap.isOpened()): #isOpened() 检测摄像头是否处于打开状态
ret,img = cap.read() #把摄像头获取的图像信息保存之img变量
if ret == True: #如果摄像头读取图像成功
# cv2.imshow('Image',img)
k = cv2.waitKey(100)
if k == ord('a') or k == ord('A'):
cv2.imwrite('test.jpg',img)
break
cap.release() #关闭摄像头
cv2.waitKey(0)
cv2.destroyAllWindow()
标签:thread,stream,socketio,flask,app,camera,import,frame,def
From: https://www.cnblogs.com/zhuoss/p/17123858.html