django + Channels如何实现websocket

需要一个django demo,实现django http,websocket同时使用,且websocket能使用路由
下面是我实现websocket的方式,websocket能实现但是不能指定路由,我知道django+Channels 是可以实现websocket指定路由访问的,可是我没找到方法,希望各位指点一下
我的实现步骤
ASGI区分请求方式

img

websocket响应

img

根据方式调用对应路由实现函数
在websocket请求时直接调用了自己的websocket_application请求函数,我改怎么配置实现只有配置路由
我这种使用方式有种弊端,需要使用下面的方式启动django 这不是我想要的

uvicorn Personalnavigationsystem.asgi:application

我想在启动django的时候,websocket也启动了,这个我在网页上看到过成熟的例子,可是我按照操作没有弄出来
需求:寻求一个demo,启动后能自定义websocket路由,路由响应函数方式如djangohttp路由那样或者指点我如何操作达成这个目标

我的环境是
django ==4.0.3
Channels == 3.0.4
python ==3.10.1

路由有点困难,可以在外部实现路由再传入websocket就行了。

https://download.csdn.net/download/jackieliuqihe/10271579?spm=1005.2026.3001.5635&utm_medium=distribute.pc_relevant_ask_down.none-task-download-2~default~OPENSEARCH~Rate-5.pc_feed_download_top3ask&depth_1-utm_source=distribute.pc_relevant_ask_down.none-task-download-2~default~OPENSEARCH~Rate-5.pc_feed_download_top3ask

前后端分离项目实现Websocket

环境版本:
    django==2.0
    channels==2.2.0
    channels-redis==2.3.2

vue实现代码:
全局配置 websocket.js

const path = window.location.host
const WSS_URL = 'wss://' + path + '/ws/chat/'
let Socket = ''
let setIntervalWebsocketPush = null

/** 建立连接 */
export function createSocket(projectId) {
  if (!Socket) {
    console.log('建立websocket连接')
    Socket = new WebSocket(WSS_URL + projectId)
    Socket.onopen = onopenWS
    Socket.onmessage = onmessageWS
    Socket.onerror = onerrorWS
    Socket.onclose = oncloseWS
  } else {
    console.log('websocket已连接')
  }
}
/** 打开WS之后发送心跳 */
export function onopenWS() {
  sendPing() // 发送心跳
}
/** 连接失败重连 */
export function onerrorWS() {
  clearInterval(setIntervalWebsocketPush)
  Socket.close()
  createSocket() // 重连
}
/** WS数据接收统一处理 */
export function onmessageWS(e) {
  window.dispatchEvent(new CustomEvent('onmessageWS', {
    detail: e.data
  }))
}
/** 发送数据 */
export function sendWSPush(eventTypeArr) {
  const obj = {
    appId: 'airShip',
    cover: 0,
    event: eventTypeArr
  }
  if (Socket !== null && Socket.readyState === 3) {
    Socket.close()
    createSocket() // 重连
  } else if (Socket.readyState === 1) {
    Socket.send(JSON.stringify(obj))
  } else if (Socket.readyState === 0) {
    setTimeout(() => {
      Socket.send(JSON.stringify(obj))
    }, 3000)
  }
}
/** 关闭WS */
export function oncloseWS() {
  clearInterval(setIntervalWebsocketPush) // 取消由setInterval()设置的timeout。
  Socket = ''
  console.log('websocket已断开')
}
/** 发送心跳 */
export function sendPing() {
  Socket.send('ping')
  setIntervalWebsocketPush = setInterval(() => {
    Socket.send('ping')
  }, 5000)
}

组件内使用 Index.vue

import { createSocket } from '@/api/websocket'
destroyed() {
    // 根据需要,销毁事件监听
    window.removeEventListener('onmessageWS', this.getDataFunc)
},
created() {
    createSocket(projectId)
    // 添加事件监听
    window.addEventListener('onmessageWS', this.getDataFunc)
},
methods:{
    // 监听ws数据响应
    getDataFunc(e) {
      const tempData = JSON.parse(e.detail)
    }
}

django实现代码:
settings.py

# 在应用中注册 channels
# Channels
ASGI_APPLICATION = 'cmdb.routing.application'
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

consumers.py

import json
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync

class MsgConsumer(WebsocketConsumer):
    def __init__(self, *args, **kwargs):
        self.room_group_name = ""
        super(MsgConsumer, self).__init__(*args, **kwargs)

    def connect(self):
        # 链接后将应用id作为组名,
        project_id = self.scope["path_remaining"]
        self.room_group_name = project_id
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
        )
        self.accept()

    def disconnect(self, close_code):
        # 断开连接时从组里面删除
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name,
            self.channel_name
        )

    def receive(self, text_data=None, bytes_data=None):
        # 接受到信息时执行
        text_data_json = json.loads(text_data)
        message = text_data_json['message']
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name, {
                'type': 'chat.message',  # 必须在MsgConsumer类中定义chat_message
                'message': message
            })

    def send_message(self, event): 
        # 发送信息是执行
        message = event['message']
        self.send(text_data=json.dumps({
            'message': message
        }))

    def chat_message(self, event):
        message = event['message']
        self.send(text_data=json.dumps({
            'message': message
        }))

在其他视图内使用
view.py

# 测试函数
def send_fun():
    from asgiref.sync import async_to_sync
    from channels.layers import get_channel_layer
    channels_layer = get_channel_layer()
    data = '我是追加的内容\n'
    for i in [0, 1, 2, 3, 4, 5, 6]:
        send_dic = {
            "type": "send.message",
            "message": {
                'step': i,
                'content': data
            }
        }
        if i < 5:
            import time
            for j in range(19):
                time.sleep(0.5)
                send_dic = {
                    "type": "send.message", # 必须在MsgConsumer类中定义send_message
                    "message": {
                        'step': i,
                        'content': data
                    }
                }
                time.sleep(0.5)
                async_to_sync(channels_layer.group_send)(room_group_name , send_dic)
        else:
            async_to_sync(channels_layer.group_send)(room_group_name , send_dic)

http://wuyouxiu.top/chat/chat/1
这是我线上实现的django Channels websocket聊天系统,我的电商网站的一部分,

img

具体demo私聊我

1、2楼是什么鬼,这样刷钱的吗。。。

金钱的力量,就差把文档都干过来了

WebSocket是什么?

WebSocket是一种在单个TCP连接上进行全双工通讯的协议。WebSocket允许服务端主动向客户端推送数据。在WebSocket协议中,客户端浏览器和服务器只需要完成一次握手就可以创建持久性的连接,并在浏览器和服务器之间进行双向的数据传输。

WebSocket有什么用?

WebSocket区别于HTTP协议的一个最为显著的特点是,WebSocket协议可以由服务端主动发起消息,对于浏览器需要及时接收数据变化的场景非常适合,例如在Django中遇到一些耗时较长的任务我们通常会使用Celery来异步执行,那么浏览器如果想要获取这个任务的执行状态,在HTTP协议中只能通过轮训的方式由浏览器不断的发送请求给服务器来获取最新状态,这样发送很多无用的请求不仅浪费资源,还不够优雅,如果使用WebSokcet来实现就很完美了

WebSocket的另外一个应用场景就是下文要说的聊天室,一个用户(浏览器)发送的消息需要实时的让其他用户(浏览器)接收,这在HTTP协议下是很难实现的,但WebSocket基于长连接加上可以主动给浏览器发消息的特性处理起来就游刃有余了

初步了解WebSocket之后,我们看看如何在Django中实现WebSocket

Channels
Django本身不支持WebSocket,但可以通过集成Channels框架来实现WebSocket

Channels是针对Django项目的一个增强框架,可以使Django不仅支持HTTP协议,还能支持WebSocket,MQTT等多种协议,同时Channels还整合了Django的auth以及session系统方便进行用户管理及认证。

我下文所有的代码实现使用以下python和Django版本

python==3.6.3
django==2.2
相关代码如下:
Django中使用
1、安装channels,要注意版本的对应,在channels官网中可以得到对应的django版本
pip install channels==2.1.7
2、修改settings.py文件

APPS中添加channels

INSTALLED_APPS = [
'django.contrib.staticfiles',
... ...
'channels',
]

指定ASGI的路由地址

ASGI_APPLICATION = 'webapp.routing.application' #ASGI_APPLICATION 指定主路由的位置为webapp下的routing.py文件中的application
3、setting.py的同级目录下创建routing.py路由文件,routing.py类似于Django中的url.py指明websocket协议的路由
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routing

第一种设置的方法

from channels.security.websocket import AllowedHostsOriginValidator
application = ProtocolTypeRouter({
# 普通的HTTP协议在这里不需要写,框架会自己指明
'websocket': AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
# 指定去对应应用的routing中去找路由
chat.routing.websocket_urlpatterns
)
),
)
})

第二种设置的方法,需要手动指定可以访问的IP

from channels.security.websocket import OriginValidator
application = ProtocolTypeRouter({
# 普通的HTTP协议在这里不需要写,框架会自己指明
'websocket': OriginValidator(
AuthMiddlewareStack(
URLRouter(
# 指定去对应应用的routing中去找路由
chat.routing.websocket_urlpatterns
)
),
# 设置可以访问的IP列表
['*']
)
})

ProtocolTypeRouter:ASIG支持多种不同的协议,在这里可以指定特定协议的路由信息,我们只使用了websocket协议,这里只配置websocket即可

AllowedHostsOriginValidator:指定允许访问的IP,设置后会去Django中的settings.py中去查找ALLOWED_HOSTS设置的IP

AuthMiddlewareStack:用于WebSocket认证,继承了Cookie Middleware,SessionMiddleware,SessionMiddleware。django的channels封装了django的auth模块,使用这个配置我们就可以在consumer中通过下边的代码获取到用户的信息

def connect(self):
self.user = self.scope["user"]
self.scope类似于django中的request,包含了请求的type、path、header、cookie、session、user等等有用的信息

URLRouter: 指定路由文件的路径,也可以直接将路由信息写在这里,代码中配置了路由文件的路径,会去对应应用下的routeing.py文件中查找websocket_urlpatterns

chat/routing.py内容如下
from django.urls import path
from chat.consumers import ChatConsumer

websocket_urlpatterns = [
path('ws/chat/', EchoConsumer), # 这里可以定义自己的路由
path('ws/str:username/',MessagesConsumer) # 如果是传参的路由在连接中获取关键字参数方法:self.scope['url_route']['kwargs']['username']
]

routing.py路由文件跟django的url.py功能类似,语法也一样,意思就是访问ws/chat/都交给ChatConsumer处理。

4、在要使用WebSocket的应用中创建consumers.py,consumers.py是用来开发ASGI接口规范的python应用,而Django中的view.py是用来开发符合WSGI接口规范的python应用。

首先了解下面的意思:
event loop事件循环、event handler事件处理器、sync同步、async异步

下面是一个同步的consumers.py:

from channels.consumer import SyncConsumer

class EchoConsumer(SyncConsumer):
def websocket_connect(self, event):
self.send({
'type': "websocket.accept" # 这里是固定的写法,type不可以改变,是ASGI的接口规范,
})

def websocket_receive(self, event):
    user = self.scope['user'] # 获取当前用户,没有登录显示匿名用户
    path = self.scope['path'] # Request请求的路径,HTTP,WebSocket
    
    # ORM 同步代码 假如要查询数据库
    user = User.objects.filter(username=username)
    self.send({
        "type": "websocket.send",  # 这里是固定的写法,type不可以改变
        "text": event['text']  # 把前端返回过来的text返回回去
    })

异步的consumers.py
from channels.consumer import AsyncConsumer

class EchoConsumer(AsyncConsumer):
async def websocket_connect(self, event):
await self.send({
'type': "websocket.accept"
})
async def websocket_receive(self, event):
# 在异步中所有的操作都需要异步执行,比如发送请求,操作ORM
# 对于异步的请求可以使用模块aiohttp实现异步的request请求
# ORM 异步代码 假如要查询数据库
# 第一种方式 使用channels通过的模块
from channels.db import database_sync_to_async
user = await database_sync_to_async(User.objects.filter(username=username))
# 第二种方式 使用装饰器
@database_sync_to_async
def get_username():
return User.objects.filter(username=username)
await self.send({
"type": "websocket.send",
"text": event['text']
})
继承WebSocketConsumer的连接
from channels.generic.websocket import AsyncWebsocketConsumer

class MessageConsumer(AsyncWebsocketConsumer):
def connect(self):
if self.scope['user'].is_anonymous:
# 没有登陆的用户直接断开连接
self.close()
else:
# 加入聊天组,并监听对应的频道
# self.channel_layer进行监听频道
# self.scope['user'].username以用户名作为组名,
# self.channel_name 要进行监听的频道,会自己生成唯一的频道
self.channel_layer.group_add(self.scope['user'].username,self.channel_name)
self.accept()

def receive(self, text_data=None, bytes_data=None):
    '''接受私信'''
    self.send(text_data=json.dumps(text_data)) # 将接收到的信息返回出去
def disconnect(self, code):
    '''离开聊天组'''
    # self.scope['user'].username要结束的组名
     self.channel_layer.group_discard(self.scope['user'].username,self.channel_name)

项目中可以在视图中直接推送信息给用户
view.py
from channels.layers import get_channel_layer
def send_message(request):
... ...
channel_layer = get_channel_layer()
payload = {
'type':'receive', # 这里的写法是固定的,receive代表的是consumers中的receive函数
'message':'要发送的信息',
'sender':sender.username, # 发送者的昵称
}
channel_layer.group_send(receiver_username,payload)
前端实现WebSocket
WebSocket对象一个支持四个消息:onopen,onmessage,oncluse和onerror,我们这里用了两个onmessage和onclose

onopen: 当浏览器和websocket服务端连接成功后会触发onopen消息

onerror: 如果连接失败,或者发送、接收数据失败,或者数据处理出错都会触发onerror消息

onmessage: 当浏览器接收到websocket服务器发送过来的数据时,就会触发onmessage消息,参数e包含了服务端发送过来的数据
% extends "base.html" %}

{% block content %}





{% endblock %}

{% block js %}

{% endblock %}

文章目录
WebSocket
ajax轮询
long poll
Websocket
Channels
WSGI
ASGI
Django中使用
信息交互的周期
前端实现WebSocket
前后端分离项目实现Websocket
WebSocket
在讲Websocket之前,先了解下 long poll 和 ajax轮询 的原理。

ajax轮询
ajax轮询的原理非常简单,让浏览器隔个几秒就发送一次请求,询问服务器是否有新信息。

long poll
long poll 其实原理跟 ajax轮询 差不多,都是采用轮询的方式,不过采取的是阻塞模型(一直打电话,没收到就不挂电话),也就是说,客户端发起连接后,如果没消息,就一直不返回Response给客户端。直到有消息才返回,返回完之后,客户端再次建立连接,周而复始。

ajax轮询 需要服务器有很快的处理速度和资源(速度)。long poll 需要有很高的并发,也就是说同时接待客户的能力(场地大小)。

Websocket
WebSocket是一种在单个TCP连接上进行全双工通讯的协议。WebSocket允许服务端主动向客户端推送数据。在WebSocket协议中,客户端浏览器和服务器只需要完成一次握手就可以创建持久性的连接,并在浏览器和服务器之间进行双向的数据传输。

WebSocket的请求头中重要的字段:

Connection和Upgrade:表示客户端发起的WebSocket请求
Sec-WebSocket-Version:客户端所使用的WebSocket协议版本号,服务端会确认是否支持该版本号
Sec-WebSocket-Key:一个Base64编码值,由浏览器随机生成,用于升级request
WebSocket的响应头中重要的字段:

HTTP/1.1 101 Swi tching Protocols:切换协议,WebSocket协议通过HTTP协议来建立运输层的TCP连接
Connection和Upgrade:表示服务端发起的WebSocket响应
Sec-WebSocket-Accept:表示服务器接受了客户端的请求,由Sec-WebSocket-Key计算得来
WebSocket协议的优点:

支持双向通信,实时性更强
数据格式比较轻量,性能开销小,通信高效
支持扩展,用户可以扩展协议或者实现自定义的子协议(比如支持自定义压缩算法等)
WebSocket协议的优点:

少部分浏览器不支持,浏览器支持的程度与方式有区别
长连接对后端处理业务的代码稳定性要求更高,后端推送功能相对复杂
成熟的HTTP生态下有大量的组件可以复用,WebSocket较少
WebSocket的应用场景:

即时聊天通信,网站消息通知
在线协同编辑,如腾讯文档
多玩家在线游戏,视频弹幕,股票基金实施报价
Channels
Django本身不支持WebSocket,但可以通过集成Channels框架来实现WebSocket

Channels是针对Django项目的一个增强框架,可以使Django不仅支持HTTP协议,还能支持WebSocket,MQTT等多种协议,同时Channels还整合了Django的auth以及session系统方便进行用户管理及认证。

channels中文件和配置的含义

asgi.py:介于网络协议服务和Python应用之间的接口,能够处理多种通用协议类型,包括HTTP、HTTP2和WebSocket
channel_layers:在settings.py中配置。类似于一个通道,发送者(producer)在一段发送消息,消费者(consumer)在另一端进行监听
routings.py:相当于Django中的urls.py
consumers.py:相当于Django中的views.py
WSGI
WSGI(Python Web Server Gateway Interface):为Python语言定义的Web服务器和Web应用程序或者框架之间的一种简单而通用的接口。

ASGI
ASGI(Asynchronous Web Server Gateway Interface):异步网关协议接口,一个介于网络协议服务和Python应用之间的标准接口,能够处理多种通用的协议类型,包括HTTP,HTTP2和WebSocket。

WSGI是基于HTTP协议模式的,不支持WebSocket,而ASGI的诞生则是为了解决Python常用的WSGI不支持当前Web开发中的一些新的协议标准。同时,ASGI对于WSGI原有的模式的支持和WebSocket的扩展,即ASGI是WSGI的扩展。

Django中使用
1、安装channels,要注意版本的对应,在channels官网中可以得到对应的django版本

pip install channels==2.1.7
1
2、修改settings.py文件,

APPS中添加channels

INSTALLED_APPS = [
'django.contrib.staticfiles',
... ...
'channels',
]

指定ASGI的路由地址

ASGI_APPLICATION = 'webapp.routing.application' #ASGI_APPLICATION 指定主路由的位置为webapp下的routing.py文件中的application
1
2
3
4
5
6
7
8
3、setting.py的同级目录下创建routing.py路由文件,routing.py类似于Django中的url.py指明websocket协议的路由

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routing

第一种设置的方法

from channels.security.websocket import AllowedHostsOriginValidator
application = ProtocolTypeRouter({
# 普通的HTTP协议在这里不需要写,框架会自己指明
'websocket': AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
# 指定去对应应用的routing中去找路由
chat.routing.websocket_urlpatterns
)
),
)
})

第二种设置的方法,需要手动指定可以访问的IP

from channels.security.websocket import OriginValidator
application = ProtocolTypeRouter({
# 普通的HTTP协议在这里不需要写,框架会自己指明
'websocket': OriginValidator(
AuthMiddlewareStack(
URLRouter(
# 指定去对应应用的routing中去找路由
chat.routing.websocket_urlpatterns
)
),
# 设置可以访问的IP列表
['*']
)
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ProtocolTypeRouter:ASIG支持多种不同的协议,在这里可以指定特定协议的路由信息,我们只使用了websocket协议,这里只配置websocket即可

AllowedHostsOriginValidator:指定允许访问的IP,设置后会去Django中的settings.py中去查找ALLOWED_HOSTS设置的IP

AuthMiddlewareStack:用于WebSocket认证,继承了Cookie Middleware,SessionMiddleware,SessionMiddleware。django的channels封装了django的auth模块,使用这个配置我们就可以在consumer中通过下边的代码获取到用户的信息

def connect(self):
self.user = self.scope["user"]
1
2
self.scope类似于django中的request,包含了请求的type、path、header、cookie、session、user等等有用的信息

URLRouter: 指定路由文件的路径,也可以直接将路由信息写在这里,代码中配置了路由文件的路径,会去对应应用下的routeing.py文件中查找websocket_urlpatterns

chat/routing.py内容如下

from django.urls import path
from chat.consumers import ChatConsumer

websocket_urlpatterns = [
path('ws/chat/', EchoConsumer), # 这里可以定义自己的路由
path('ws/str:username/',MessagesConsumer) # 如果是传参的路由在连接中获取关键字参数方法:self.scope['url_route']['kwargs']['username']
]
1
2
3
4
5
6
7
routing.py路由文件跟django的url.py功能类似,语法也一样,意思就是访问ws/chat/都交给ChatConsumer处理。

4、在要使用WebSocket的应用中创建consumers.py,consumers.py是用来开发ASGI接口规范的python应用,而Django中的view.py是用来开发符合WSGI接口规范的python应用。

首先了解下面的意思:
event loop事件循环、event handler事件处理器、sync同步、async异步

下面是一个同步的consumers.py:

from channels.consumer import SyncConsumer

class EchoConsumer(SyncConsumer):
def websocket_connect(self, event):
self.send({
'type': "websocket.accept" # 这里是固定的写法,type不可以改变,是ASGI的接口规范,
})

def websocket_receive(self, event):
    user = self.scope['user'] # 获取当前用户,没有登录显示匿名用户
    path = self.scope['path'] # Request请求的路径,HTTP,WebSocket
    
    # ORM 同步代码 假如要查询数据库
    user = User.objects.filter(username=username)
    self.send({
        "type": "websocket.send",  # 这里是固定的写法,type不可以改变
        "text": event['text']  # 把前端返回过来的text返回回去
    })

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
异步的consumers.py

from channels.consumer import AsyncConsumer

class EchoConsumer(AsyncConsumer):
async def websocket_connect(self, event):
await self.send({
'type': "websocket.accept"
})
async def websocket_receive(self, event):
# 在异步中所有的操作都需要异步执行,比如发送请求,操作ORM
# 对于异步的请求可以使用模块aiohttp实现异步的request请求
# ORM 异步代码 假如要查询数据库
# 第一种方式 使用channels通过的模块
from channels.db import database_sync_to_async
user = await database_sync_to_async(User.objects.filter(username=username))
# 第二种方式 使用装饰器
@database_sync_to_async
def get_username():
return User.objects.filter(username=username)
await self.send({
"type": "websocket.send",
"text": event['text']
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
需要注意的是在异步中所有的逻辑都应该是异步的,不可以那同步的和异步的代码混合使用。

继承WebSocketConsumer的连接

from channels.generic.websocket import AsyncWebsocketConsumer

class MessageConsumer(AsyncWebsocketConsumer):
def connect(self):
if self.scope['user'].is_anonymous:
# 没有登陆的用户直接断开连接
self.close()
else:
# 加入聊天组,并监听对应的频道
# self.channel_layer进行监听频道
# self.scope['user'].username以用户名作为组名,
# self.channel_name 要进行监听的频道,会自己生成唯一的频道
self.channel_layer.group_add(self.scope['user'].username,self.channel_name)
self.accept()

def receive(self, text_data=None, bytes_data=None):
    '''接受私信'''
    self.send(text_data=json.dumps(text_data)) # 将接收到的信息返回出去
def disconnect(self, code):
    '''离开聊天组'''
    # self.scope['user'].username要结束的组名
     self.channel_layer.group_discard(self.scope['user'].username,self.channel_name)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
要改为异步和前面的方法一致

信息交互的周期

项目中可以在视图中直接推送信息给用户

view.py
from channels.layers import get_channel_layer
def send_message(request):
... ...
channel_layer = get_channel_layer()
payload = {
'type':'receive', # 这里的写法是固定的,receive代表的是consumers中的receive函数
'message':'要发送的信息',
'sender':sender.username, # 发送者的昵称
}
channel_layer.group_send(receiver_username,payload)
1
2
3
4
5
6
7
8
9
10
11
前端实现WebSocket
WebSocket对象一个支持四个消息:onopen,onmessage,oncluse和onerror,我们这里用了两个onmessage和onclose

onopen: 当浏览器和websocket服务端连接成功后会触发onopen消息

onerror: 如果连接失败,或者发送、接收数据失败,或者数据处理出错都会触发onerror消息

onmessage: 当浏览器接收到websocket服务器发送过来的数据时,就会触发onmessage消息,参数e包含了服务端发送过来的数据

onclose: 当浏览器接收到websocket服务器发送过来的关闭连接请求时,会触发onclose消息载请注明出处。

% extends "base.html" %}

{% block content %}





{% endblock %}

{% block js %}

{% endblock %}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
参考博客:https://juejin.im/post/5cb67fc3e51d456e6a1d0237

前后端分离项目实现Websocket
环境版本:
django==2.0
channels==2.2.0
channels-redis==2.3.2
1
2
3
4
vue实现代码:

全局配置 websocket.js

const path = window.location.host
const WSS_URL = 'wss://' + path + '/ws/chat/'
let Socket = ''
let setIntervalWebsocketPush = null

/** 建立连接 /
export function createSocket(projectId) {
if (!Socket) {
console.log('建立websocket连接')
Socket = new WebSocket(WSS_URL + projectId)
Socket.onopen = onopenWS
Socket.onmessage = onmessageWS
Socket.onerror = onerrorWS
Socket.onclose = oncloseWS
} else {
console.log('websocket已连接')
}
}
/
* 打开WS之后发送心跳 /
export function onopenWS() {
sendPing() // 发送心跳
}
/
* 连接失败重连 /
export function onerrorWS() {
clearInterval(setIntervalWebsocketPush)
Socket.close()
createSocket() // 重连
}
/
* WS数据接收统一处理 /
export function onmessageWS(e) {
window.dispatchEvent(new CustomEvent('onmessageWS', {
detail: e.data
}))
}
/
* 发送数据 /
export function sendWSPush(eventTypeArr) {
const obj = {
appId: 'airShip',
cover: 0,
event: eventTypeArr
}
if (Socket !== null && Socket.readyState === 3) {
Socket.close()
createSocket() // 重连
} else if (Socket.readyState === 1) {
Socket.send(JSON.stringify(obj))
} else if (Socket.readyState === 0) {
setTimeout(() => {
Socket.send(JSON.stringify(obj))
}, 3000)
}
}
/
* 关闭WS /
export function oncloseWS() {
clearInterval(setIntervalWebsocketPush) // 取消由setInterval()设置的timeout。
Socket = ''
console.log('websocket已断开')
}
/
* 发送心跳 */
export function sendPing() {
Socket.send('ping')
setIntervalWebsocketPush = setInterval(() => {
Socket.send('ping')
}, 5000)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
组件内使用 Index.vue

import { createSocket } from '@/api/websocket'
destroyed() {
// 根据需要,销毁事件监听
window.removeEventListener('onmessageWS', this.getDataFunc)
},
created() {
createSocket(projectId)
// 添加事件监听
window.addEventListener('onmessageWS', this.getDataFunc)
},
methods:{
// 监听ws数据响应
getDataFunc(e) {
const tempData = JSON.parse(e.detail)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
django实现代码:

settings.py

在应用中注册 channels

Channels

ASGI_APPLICATION = 'cmdb.routing.application'
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [('127.0.0.1', 6379)],
},
},
}
1
2
3
4
5
6
7
8
9
10
11
consumers.py

import json
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync

class MsgConsumer(WebsocketConsumer):
def init(self, *args, **kwargs):
self.room_group_name = ""
super(MsgConsumer, self).init(*args, **kwargs)

def connect(self):
    # 链接后将应用id作为组名,
    project_id = self.scope["path_remaining"]
    self.room_group_name = project_id
    async_to_sync(self.channel_layer.group_add)(
        self.room_group_name,
        self.channel_name
    )
    self.accept()

def disconnect(self, close_code):
    # 断开连接时从组里面删除
    async_to_sync(self.channel_layer.group_discard)(
        self.room_group_name,
        self.channel_name
    )

def receive(self, text_data=None, bytes_data=None):
    # 接受到信息时执行
    text_data_json = json.loads(text_data)
    message = text_data_json['message']
    async_to_sync(self.channel_layer.group_send)(
        self.room_group_name, {
            'type': 'chat.message',  # 必须在MsgConsumer类中定义chat_message
            'message': message
        })

def send_message(self, event): 
    # 发送信息是执行
    message = event['message']
    self.send(text_data=json.dumps({
        'message': message
    }))

def chat_message(self, event):
    message = event['message']
    self.send(text_data=json.dumps({
        'message': message
    }))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
在其他视图内使用

view.py

测试函数

def send_fun():
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
channels_layer = get_channel_layer()
data = '我是追加的内容\n'
for i in [0, 1, 2, 3, 4, 5, 6]:
send_dic = {
"type": "send.message",
"message": {
'step': i,
'content': data
}
}
if i < 5:
import time
for j in range(19):
time.sleep(0.5)
send_dic = {
"type": "send.message", # 必须在MsgConsumer类中定义send_message
"message": {
'step': i,
'content': data
}
}
time.sleep(0.5)
async_to_sync(channels_layer.group_send)(room_group_name , send_dic)
else:
async_to_sync(channels_layer.group_send)(room_group_name , send_dic)
————————————————
版权声明:本文为CSDN博主「LinWoW」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/linwow/article/details/99119445