uniapp websocket 发送 sub 监听后,代码自动发送了 unsub ,解除了监听

项目背景

uniapp 项目使用 websocket 监听数据,使用了 rxjs 中的 Observable

相关代码

封装的 GlobalWebsocket 代码


import store from '@/store/index.js';
import Config from '@/core/config'
import {
    Observable
} from "rxjs";

// 后端api地址
const wsHost = Config.get('wsUrl')
let ws;
let count = 0;
var subs = {};
let timer = {};
const MAX_RETRIES = 2000;
let trySendCount = 0;
let tempQueue = [];
let socketOpen = false;
const initWebSocket = () => {
    let token = store.state.token ? store.state.token : store.getters.token;
    const wsUrl = `${wsHost}/ws/messaging?:X_Access_Token=${token}&:X_Type=2`;
    try {
        //微信websocket最大并发不能超过5个
        //https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.html

        if (count > 0) {
            return ws;
        }
        clearInterval(timer);
        ws = uni.connectSocket({
            url: wsUrl,
            complete: () => {}
        });
        count += 1;
        uni.onSocketClose(function() {
            socketOpen = false;
            ws = undefined;
            setTimeout(initWebSocket, 5000 * count);
        });
        uni.onSocketOpen(function() {
            socketOpen = true;
        });
        uni.onSocketMessage(function(msg) {
            var data = JSON.parse(msg.data);
            if (data.type === 'error') {
                uni.showToast({
                    title: data.message,
                    icon: "none",
                    duration: 3500
                })
            }
            if (subs[data.requestId]) {
                if (data.type === 'complete') {
                    subs[data.requestId].forEach(function(element) {
                        element.complete();
                    });;
                } else if (data.type === 'result') {
                    subs[data.requestId].forEach(function(element) {
                        element.next(data);
                    });;
                }
            }
        });
    } catch (error) {
        setTimeout(initWebSocket, 5000 * count);
    }

    timer = setInterval(function() {
        try {
            ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({
                "type": "ping"
            })) : 0;
        } catch (error) {
            console.error(error, '发送心跳错误');
        }
        //同时判断
        if (tempQueue.length > 0 && ws && ws.readyState === 1) {
            sendSocketMessage(tempQueue[0], 1);
        }
    }, 2000);
    return ws;
};

//flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
function sendSocketMessage(msg, flag) {
    if (socketOpen) {
        uni.sendSocketMessage({
            data: msg
        });
        if (flag === 1) {
            tempQueue.splice(0, 1);
        }
    } else {
        if (flag != 1) {
            tempQueue.push(msg);
        }
    }
}


const getWebsocket = (id, topic, parameter) => {
    return Observable.create(function(observer) {
        if (!subs[id]) {
            subs[id] = [];
        }
        subs[id].push({
            next: function(val) {
                observer.next(val);
            },
            complete: function() {
                observer.complete();
            }
        });
        var msg = JSON.stringify({
            id: id,
            topic: topic,
            parameter: parameter,
            type: 'sub'
        });
        var thisWs = initWebSocket();
        if (thisWs) {
            try {
                sendSocketMessage(msg);
            } catch (error) {
                initWebSocket();
                uni.showToast({
                    title: 'websocket服务连接失败',
                    icon: "none",
                    duration: 3500
                })
            }
        } else {
            tempQueue.push(msg);
            ws = undefined
            count = 0
            initWebSocket();
        }
        return function() {
            console.log("这里")
            
            var unsub = JSON.stringify({
                id: id,
                type: "unsub"
            });
            
            console.log(subs[id])
            console.log(unsub)
            delete subs[id];
            if (thisWs) {
                sendSocketMessage(unsub)
            }
        };
    });
};
exports.getWebsocket = getWebsocket;

使用 websocket 的位置

    // 初始化 WebSocket
            initWebSocket() {
                const app = this
                const productId = 'DC-TOWER';
                const deviceId = '2209001';
                const groupId = '$TOWER_SENSOR';
                // deviceOnlineStatus && deviceOnlineStatus.unsubscribe();
                app.deviceOnlineStatus = getWebsocket(
                  `location-info-status-online-${deviceId}`,
                  `/device/${productId}/${deviceId}/status`,
                ).subscribe((resp) => {
                  const { payload } = resp;

                  if (resp.requestId === `location-info-status-${deviceId}`) {

                  }
                });
                

                
                app.propertyStatus = getWebsocket(
                  `location-info-message-property-${deviceId}`,
                  `/device/${productId}/${deviceId}/${groupId}/message/property/report`,
                ).subscribe((resp) => {
                    console.log(resp)
                  const { payload } = resp;
                  if (resp.requestId === `location-info-message-property-${deviceId}`) {

                  }
                });
            },
            removeSubscribe() {
                console.log("解绑了")
                const app = this
                console.log(app.deviceOnlineStatus)
                console.log(app.propertyStatus)
                if (app.deviceOnlineStatus) {
                    
                    app.deviceOnlineStatus.unsubscribe();
                }
                if (app.propertyStatus) {
                    app.propertyStatus.unsubscribe();
                }
            },

运行结果及报错内容

在 initWebsocket 方法中,两次调用 GlobalWebsocket 中的 getWebsocket() ,应该会通过传递的参数,发送两次监听
但是现在在 sub 之后,直接自己 unsub 了,而且在 unsub 之后按说不该接收到对应的 payload ,但是还是接收到了(这里没有调用 unsubscribe,它自己执行了)

img

我的解答思路和尝试过的方法

现在的分析是 : websocket ,没有调用 unsub 结果它自己执行 unsub,调用的时候反而没执行
发送 sub 是通过 GlobalWebsocket 中的 getWebsocket() 中发送的
而 unsub 则应该是调用 getWebsocket 返回的对象的unsubscribe来执行的
但是在上面的代码当中没有调用过 unsubscribe ,它自己在 返回 completed 之后 unsub 了,而且 debugg 的时候,在 getWebsocket 中返回的函数中打了断点,断点没有卡上,不过当中的输出语句还正常输出,正常执行了 unsub

我想要达到的结果

在调用 getWebsocket() 之后,通过传递的参数,执行 send 方法,发送 sub 订阅,当不需要的时候通过调用 unsubscribe 来发送 unsub

1、你的做法有点尴尬;
2、uniapp跟其他前端的操作方式有点不一样;
3、你可以试试搞一个全局变量,然后根据状态来处理ws即可。

参考一下

参考

uni-app中websocket的使用 断开重连、心跳机制 '前言' 前言 最近关于H5和APP的开发中使用到了webSocket,由于web/app有时候会出现网络不稳定或者服务端主动断开,这时候导致消息推送不了的情况,需要客户端进行重连。查阅资料后发现了一个心跳机制,也就是客户端间隔一段时间就向服务器发送一条消息,如果服务器收到消息就回复一条信息过来,如果一定时间内没有回复,则表示已经与服务器断开连接了,这个时候就需要进行重连。 被动断开则进行重连,主动断开的不重连。 说明:下图针对两个Tab项(Open Trades 和 Closed Trades),只希望在 tabIndex = 0 (Open Trades 高亮时)触发webSocket , 如果点击第二个栏目 , tabIndex = 1(Closed Trades高亮时)则主动关闭webSodket连接。 TabIndex = 0 时 ,被动断开则自动重连。 '效果' 效果 1. webScoket连接并接收推送的消息 https://cdn.jsdelivr.net/gh/tzy13755126023/BLOG SOURCE/theme f/loading.gif 1. 将接收的消息转换成目标数据 https://copyfuture.com/blogs-details/202112190252213199