下面是部分代码,已做到客户端域服务器的收发,但是目前为止未找到服务器向客户端发送一条或者多条消息的实现方法,
需要实现的内容有,接受到一条消息后,可以实现向客户端发送一条或多条消息,或者只要客户端与服务器保持链接,就可以无限发送消息
想要实现上诉方式,该如何修改代码,或者需要什么模块
目前实现django websocket的方式是
channels ==3.0.4
from channels.generic.websocket import AsyncWebsocketConsumer
class MessagesConsumer(AsyncWebsocketConsumer):
async def connect(self):
#acceptlist.append(self)
#await self.accept()
await self.accept()
async def receive(self,text_data = None,bytes_data = None):
'''接收到后端发来的私信'''
import json
jsonobj = json.loads(text_data)
# for i in acceptlist:
# if i == self:
# if not acceptdict.get(jsonobj['id']):
# acceptdict[jsonobj['id']] = i
# await self.channel_layer.group_add(
# jsonobj['id'],
# self.channel_name
# )
if jsonobj['title'] == "port":
if jsonobj['option'] == "get":
jsonstr = getportname(jsonobj)
await self.send(text_data = jsonstr)
if jsonobj['option'] == "read":
jsonobj['data'] = "打开操作成功"
jsonstr = json.dumps(jsonobj)
await self.send(text_data = jsonstr)
async def disconnect(self,code):
'''离开聊天组'''
# await self.channel_layer.group_discard(self.scope['user'].username,self.channel_name)
# 关闭
await self.channel_layer.group_discard(self.scope,self.channel_name)
# await self.close()
你要做的就是:当客户端和服务器端连上时会有个session, 服务器上做一个链接池将客户端session缓存下来,然后要给哪个客户端发信息就调取哪个来发送就可以了
提供一个思路:
在async def receive(self,text_data = None,bytes_data = None)
函数里写一个for
循环。
在不需要回复消息的时候,将循环阻塞;需要回复消息的时候, 将循环解除组阻塞。定义一个类成员存储要发送的数据,从外部传入要发送的数据即可,这样就可以实现不断传输数据了。
另外,我不太懂你的需求:你是要做一个服务端向客户端推送消息系统吗?还是保持长连接,不断相互交换消息?如果是后者,最好双发使用心跳,保证超时断开连接,免得一直占用资源;如果是前者,要注意等判断对方上线,然后传输数据。
可以采用长连接,定时发送消息给客户端。需要处理好客户关闭后处理,不然服务器会被无效连接撑满。
前后端分离项目实现Websocket
环境版本:
django==2.0
channels==2.2.0
channels-redis==2.3.2
vue实现代码:
全局配置 websocket.js
const path = window.location.host
const WSS_URL = 'wss://' + path + '/ws/chat/'
let Socket = ''
let setIntervalWebsocketPush = null
/** 建立连接 */
export function createSocket(projectId) {
if (!Socket) {
console.log('建立websocket连接')
Socket = new WebSocket(WSS_URL + projectId)
Socket.onopen = onopenWS
Socket.onmessage = onmessageWS
Socket.onerror = onerrorWS
Socket.onclose = oncloseWS
} else {
console.log('websocket已连接')
}
}
/** 打开WS之后发送心跳 */
export function onopenWS() {
sendPing() // 发送心跳
}
/** 连接失败重连 */
export function onerrorWS() {
clearInterval(setIntervalWebsocketPush)
Socket.close()
createSocket() // 重连
}
/** WS数据接收统一处理 */
export function onmessageWS(e) {
window.dispatchEvent(new CustomEvent('onmessageWS', {
detail: e.data
}))
}
/** 发送数据 */
export function sendWSPush(eventTypeArr) {
const obj = {
appId: 'airShip',
cover: 0,
event: eventTypeArr
}
if (Socket !== null && Socket.readyState === 3) {
Socket.close()
createSocket() // 重连
} else if (Socket.readyState === 1) {
Socket.send(JSON.stringify(obj))
} else if (Socket.readyState === 0) {
setTimeout(() => {
Socket.send(JSON.stringify(obj))
}, 3000)
}
}
/** 关闭WS */
export function oncloseWS() {
clearInterval(setIntervalWebsocketPush) // 取消由setInterval()设置的timeout。
Socket = ''
console.log('websocket已断开')
}
/** 发送心跳 */
export function sendPing() {
Socket.send('ping')
setIntervalWebsocketPush = setInterval(() => {
Socket.send('ping')
}, 5000)
}
组件内使用 Index.vue
import { createSocket } from '@/api/websocket'
destroyed() {
// 根据需要,销毁事件监听
window.removeEventListener('onmessageWS', this.getDataFunc)
},
created() {
createSocket(projectId)
// 添加事件监听
window.addEventListener('onmessageWS', this.getDataFunc)
},
methods:{
// 监听ws数据响应
getDataFunc(e) {
const tempData = JSON.parse(e.detail)
}
}
django实现代码:
settings.py
# 在应用中注册 channels
# Channels
ASGI_APPLICATION = 'cmdb.routing.application'
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [('127.0.0.1', 6379)],
},
},
}
consumers.py
import json
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync
class MsgConsumer(WebsocketConsumer):
def __init__(self, *args, **kwargs):
self.room_group_name = ""
super(MsgConsumer, self).__init__(*args, **kwargs)
def connect(self):
# 链接后将应用id作为组名,
project_id = self.scope["path_remaining"]
self.room_group_name = project_id
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
self.accept()
def disconnect(self, close_code):
# 断开连接时从组里面删除
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
def receive(self, text_data=None, bytes_data=None):
# 接受到信息时执行
text_data_json = json.loads(text_data)
message = text_data_json['message']
async_to_sync(self.channel_layer.group_send)(
self.room_group_name, {
'type': 'chat.message', # 必须在MsgConsumer类中定义chat_message
'message': message
})
def send_message(self, event):
# 发送信息是执行
message = event['message']
self.send(text_data=json.dumps({
'message': message
}))
def chat_message(self, event):
message = event['message']
self.send(text_data=json.dumps({
'message': message
}))
在其他视图内使用
view.py
# 测试函数
def send_fun():
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
channels_layer = get_channel_layer()
data = '我是追加的内容\n'
for i in [0, 1, 2, 3, 4, 5, 6]:
send_dic = {
"type": "send.message",
"message": {
'step': i,
'content': data
}
}
if i < 5:
import time
for j in range(19):
time.sleep(0.5)
send_dic = {
"type": "send.message", # 必须在MsgConsumer类中定义send_message
"message": {
'step': i,
'content': data
}
}
time.sleep(0.5)
async_to_sync(channels_layer.group_send)(room_group_name , send_dic)
else:
async_to_sync(channels_layer.group_send)(room_group_name , send_dic)
下载码来一个
https://www.cnblogs.com/godlover/p/12699057.html
参考这个博客就可以搞定
采纳哦