完整代码: https://gitee.com/mom925/django-system
项目结构:
先安装所需库:
pip install channels
下面将websocket作为插件一样的只需要引入配置的结构
asgi.py文件 http请求不变、修改websocket请求调用路径
import os import django from channels.http import AsgiHandler from channels.routing import ProtocolTypeRouter, URLRouter from . import routing os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Wchime.settings') django.setup() application = ProtocolTypeRouter({ "http": AsgiHandler(), "websocket": URLRouter(routing.websocket_urlpatterns), })
同级目录下的请求路由文件 routing.py
from django.urls import path, re_path from .consumers import RollCallConsumer, CustomRollCallConsumer websocket_urlpatterns = [ path('ws/', RollCallConsumer.as_asgi()), re_path(r'ws/(?P<group>\w+)/', CustomRollCallConsumer.as_asgi()), ]
创建一个consumers.py 定义收发消息
import json from channels.generic.websocket import AsyncJsonWebsocketConsumer from channels.layers import get_channel_layer from asgiref.sync import async_to_sync class RollCallConsumer(AsyncJsonWebsocketConsumer): group_name = "default_group" async def connect(self): await self.channel_layer.group_add( self.group_name, self.channel_name ) await self.accept() async def disconnect(self, close_code): await self.channel_layer.group_discard( self.group_name, self.channel_name ) async def receive(self, text_data=None, bytes_data=None, **kwargs): await self.send(text_data) async def send(self, text_data=None, bytes_data=None, close=False): await super(RollCallConsumer, self).send(text_data, bytes_data, close) async def push_messages(self, message): message_type = message['default_group'] message = message['message'] data = { 'message_type': message_type, 'message': message } await self.send(json.dumps(data, ensure_ascii=False)) class CustomRollCallConsumer(AsyncJsonWebsocketConsumer): group_name = "default_group" async def connect(self): self.group_name = self.scope['url_route']['kwargs'].get('group', 'default_group') await self.channel_layer.group_add( self.group_name, self.channel_name ) await self.accept() async def disconnect(self, close_code): self.group_name = self.scope['url_route']['kwargs'].get('group', 'default_group') await self.channel_layer.group_discard( self.group_name, self.channel_name ) async def receive(self, text_data=None, bytes_data=None, **kwargs): await self.send(text_data) async def send(self, text_data=None, bytes_data=None, close=False): await super(CustomRollCallConsumer, self).send(text_data, bytes_data, close) async def push_messages(self, message): message_type = message['default_group'] message = message['message'] data = { 'message_type': message_type, 'message': message } await self.send(json.dumps(data, ensure_ascii=False)) def send_message(group_name="default_group", message=None, message_type=None): """ WS广播: 外部函数调用 :param message_type: 消息类型 :param group_name: 组名称 :param message: 消息内容 :return: """ # print(get_channel_layer()) # print("send msg") channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'push.messages', # push_messages函数 'message': message, 'message_type': message_type } )
定义一个配置文件settings.py,里面主要是注册app,添加asgi等配置
import os import sys from django.conf import settings from conf.env import * sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ASGI_APPLICATION = 'websocket_plugin.asgi.application' # redis_host = "redis://" + REDIS_HOST + ":" + REDIS_PORT + "/" + REDIS_LIBRARY redis_host = "redis://:" + REDIS_PWD + "@" + REDIS_HOST + ":" + REDIS_PORT + "/" + REDIS_LIBRARY secret_k = 'wchime' CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { # 连接自己的redis "hosts": [redis_host], "symmetric_encryption_keys": [secret_k], }, }, } apps = ['channels'] settings.INSTALLED_APPS += [app for app in apps if app not in settings.INSTALLED_APPS]
最后只要在Django项目的settings.py文件中引入配置即可
from plugin.websocket_plugin.settings import *
成功启动服务
标签:websocket,name,data,self,Django,import,group,message,安装 From: https://www.cnblogs.com/moon3496694/p/17967244