spring boot集群如何集成websocket?

现在项目有几个接口是通过轮询来查询的现在需要改造成web socket。以前没接触过,现在有几个疑问,望解答

例子:比如商品详情页,评价列表可以实时更新。

1.已有的评价内容也走web socket吗?

2.集群环境下会不会存在重复推送的情况呢?如果有,如何解决呢?

3.网上大多是聊天室demo,现在困惑的地方是,后端如何得知前端需要商品A的评价?如果用户打开了商品A和商品B两个页面,如何区分推送呢?

1.评论内容不需要实时刷新,所以不需要websocket,只有聊天窗口,系统消息提醒才需要websocket,评论内容只需要每次用户进入页面时获取就可以。但是可以通过websocket在系统消息处做提醒。

2.不会,集群的原理是,客户端A,B,C,D分别与集群环境下的主机E,F,建立websocket连接(A->E,B->E,C->E,D->F),集群环境下的E,F共同监听消息队列或者缓存数据库(比如redis)的频道,
以redis举例:
当用户A想与D通信时,
第一步A发送消息到主机E。
第二步主机E通过redis进行广播,对所有监听频道的主机E,F同时广播。
第三步主机F收到广播,发现自己与D没有建立链接,消息停止发送,主机E收到广播发现自己与D有链接,发送消息成功。
由于客户端只能与集群环境的唯一一台主机建立连接,所以不会针对客户端发送多条消息。

3.websocket消息分为一对一,一对多模式。
用户在客户端接收消息是属于一对多模式,由系统群发商品被评论的消息,不需要根据页面进行分别推送。
第一,用户是客户端,一定与集群环境的一台主机建立的websocket链接。
第二,当用户C打开商品A,B页面,如果用户D评论A,D通过他的websocket连接将信息发送至服务端,服务端通过群发消息,即向C,D群发A被评论的信息,
如果还有用户E也打开了商品A页面,他也能收到
第三,消息内容可以定义(D评论了A),所以D客户端可以选择性过滤,不接收此消息。

推荐一款可做websocket集成的框架,在gitee上star已达5.1k,bug较少,稳定性较高。并且具有强制下线客户端,客户端登录唯一性控制等功能
CIM是一套基于netty框架下的推送系统,或许有一些企业有着自己一套即时通讯系统的需求,那么CIM为您提供了一个解决方案,目前CIM支持websocket,android,ios,桌面应用,系统应用等多端接入支持,可应用于移动应用,物联网,智能家居,嵌入式开发,桌面应用,WEB应用以及后台系统之间的即时消服务

用MQTT推送更简单稳定,一个商品一个topic;首次获取商品评价通过接口获取。

可以参考

  1. 可以全部走websocket
  2. 不会重复推送,因为集群环境下,只需要控制一个前端只发起一个websocket请求,只会与集群中的一台机器建立websocket连接,当发起第二个的时候,断开第一个
  3. websocket只是一个通讯协议,交互的过程是通过接口和参数来确定要做的业务操作,请求参数可以带上商品ID,后端根据商品ID查询评论返回给前端

如有帮助,请采纳,十分感谢!

直接根据商品ID 查询处理后通过WebSocket返回给前端。打开哪个商品详情,建立的连接就发哪个商品的ID

先回答你的几个问题
1、已有的评价内容可以走websocket,但基本上不会这么做,因为评价比较多,多半会分页,所以正常的ajax请求即可
2、websocket在集群情况下不会重复推送,websocket其实也是基于http,但是请求的时候会多几个属性,upgrade:websocket,Connection:Upgrade,这样就告诉服务器解析数据时按照websocket的协议来解析,但是对于你集群的多台集群,只会连接上其中的一台,不是连接你集群中的每一台机器,你需要先了解一下集群的原理,大多数是有个前置路由机器(基本上是NGINX来分发请求),NGINX收到你发送的websocket连接请求时会在后面已注册的机器中根据策略选中一台进行连接
3、当你页面打开时,你需要建立一个websocket连接,成功之后,你就需要获取数据了,这个时候你将商品id传送过去,这个时候服务器就知道了这个连接(获取说这个会话)所对应的是哪个商品,只要连接不断开,服务器每次在这个连接推送给前端的数据都是有固定商品id的
websocket如何用就比较简单了,篇幅有点多,我就偷个懒啦,百度一下看下其他文章吧

分布式websocket,数据存储在redis,可以直接参照jeecg框架。
具体的业务处理其实和websocket关系不大,你http怎么处理,他也一样。

代码如下:

package org.jeecg.modules.message.websocket;

import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.CommonSendStatus;
import org.jeecg.common.modules.redis.listener.JeecgRedisListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 监听消息(采用redis发布订阅方式发送消息)
 * @author: jeecg-boot
 */
@Slf4j
@Component
public class SocketHandler implements JeecgRedisListener {

    @Autowired
    private WebSocket webSocket;

    @Override
    public void onMessage(BaseMap map) {
        log.info("【SocketHandler消息】Redis Listerer:" + map.toString());

        String userId = map.get("userId");
        String message = map.get("message");
        if (ObjectUtil.isNotEmpty(userId)) {
            webSocket.pushMessage(userId, message);
            //app端消息推送
            webSocket.pushMessage(userId+CommonSendStatus.APP_SESSION_SUFFIX, message);
        } else {
            webSocket.pushMessage(message);
        }

    }
}

package org.jeecg.common.modules.redis.listener;

import org.jeecg.common.base.BaseMap;

/**
 * @Description: 自定义消息监听
 * @author: scott
 * @date: 2020/01/01 16:02
 */
public interface JeecgRedisListener {
    /**
     * 接受消息
     *
     * @param message
     */
    void onMessage(BaseMap message);

}
package org.jeecg.modules.message.websocket;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.WebsocketConst;
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import lombok.extern.slf4j.Slf4j;

/**
 * @Author scott
 * @Date 2019/11/29 9:41
 * @Description: 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocket {

    private Session session;

    /**
     * 用户ID
     */
    private String userId;

    private static final String REDIS_TOPIC_NAME = "socketHandler";

    @Resource
    private JeecgRedisClient jeecgRedisClient;

    /**
     * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
     */
    private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
    /**
     * 线程安全Map
     */
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            //TODO 通过header中获取token,进行check
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }

    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }


    /**
     * 服务端推送消息
     *
     * @param userId
     * @param message
     */
    public void pushMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
                synchronized (session){
                    log.info("【websocket消息】 单点消息:" + message);
                    session.getBasicRemote().sendText(message);
                }
                //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 服务器端推送消息
     */
    public void pushMessage(String message) {
        try {
            webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @OnMessage
    public void onMessage(String message) {
        //todo 现在有个定时任务刷,应该去掉
        log.debug("【websocket消息】收到客户端消息:" + message);
        JSONObject obj = new JSONObject();
        //业务类型
        obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
        //消息内容
        obj.put(WebsocketConst.MSG_TXT, "心跳响应");
        //update-begin-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473
        for (WebSocket webSocket : webSockets) {
            webSocket.pushMessage(obj.toJSONString());
        }
        //update-end-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473
    }

    /**
     * 后台发送消息到redis
     *
     * @param message
     */
    public void sendMessage(String message) {
        log.info("【websocket消息】广播消息:" + message);
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", "");
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }

    /**
     * 此为单点消息
     *
     * @param userId
     * @param message
     */
    public void sendMessage(String userId, String message) {
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", userId);
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }

    /**
     * 此为单点消息(多人)
     *
     * @param userIds
     * @param message
     */
    public void sendMessage(String[] userIds, String message) {
        for (String userId : userIds) {
            sendMessage(userId, message);
        }
    }

}


package org.jeecg.common.constant;

/**
 *     系统通告 - 发布状态
 * @Author LeeShaoQing
 *
 */
public interface CommonSendStatus {

    /**
     * 未发布
     */
    public static final String UNPUBLISHED_STATUS_0 = "0";

    /**
     * 已发布
     */
    public static final String PUBLISHED_STATUS_1 = "1";

    /**
     * 撤销
     */
    public static final String REVOKE_STATUS_2 = "2";

    /**
     * app端推送会话标识后缀
     */
    public static final String  APP_SESSION_SUFFIX = "_app";


    /**流程催办——系统通知消息模板*/
    public static final String TZMB_BPM_CUIBAN = "bpm_cuiban";
    /**标准模板—系统消息通知*/
    public static final String TZMB_SYS_TS_NOTE = "sys_ts_note";
    /**流程超时提醒——系统通知消息模板*/
    public static final String TZMB_BPM_CHAOSHI_TIP = "bpm_chaoshi_tip";
}

package org.jeecg.modules.message.websocket;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.WebsocketConst;
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import lombok.extern.slf4j.Slf4j;

/**
 * @Author scott
 * @Date 2019/11/29 9:41
 * @Description: 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocket {

    private Session session;

    /**
     * 用户ID
     */
    private String userId;

    private static final String REDIS_TOPIC_NAME = "socketHandler";

    @Resource
    private JeecgRedisClient jeecgRedisClient;

    /**
     * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
     */
    private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
    /**
     * 线程安全Map
     */
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            //TODO 通过header中获取token,进行check
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }

    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }


    /**
     * 服务端推送消息
     *
     * @param userId
     * @param message
     */
    public void pushMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
                synchronized (session){
                    log.info("【websocket消息】 单点消息:" + message);
                    session.getBasicRemote().sendText(message);
                }
                //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 服务器端推送消息
     */
    public void pushMessage(String message) {
        try {
            webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @OnMessage
    public void onMessage(String message) {
        //todo 现在有个定时任务刷,应该去掉
        log.debug("【websocket消息】收到客户端消息:" + message);
        JSONObject obj = new JSONObject();
        //业务类型
        obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
        //消息内容
        obj.put(WebsocketConst.MSG_TXT, "心跳响应");
        //update-begin-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473
        for (WebSocket webSocket : webSockets) {
            webSocket.pushMessage(obj.toJSONString());
        }
        //update-end-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473
    }

    /**
     * 后台发送消息到redis
     *
     * @param message
     */
    public void sendMessage(String message) {
        log.info("【websocket消息】广播消息:" + message);
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", "");
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }

    /**
     * 此为单点消息
     *
     * @param userId
     * @param message
     */
    public void sendMessage(String userId, String message) {
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", userId);
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }

    /**
     * 此为单点消息(多人)
     *
     * @param userIds
     * @param message
     */
    public void sendMessage(String[] userIds, String message) {
        for (String userId : userIds) {
            sendMessage(userId, message);
        }
    }

}


package org.jeecg.common.modules.redis.client;

import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.GlobalConstants;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;

import javax.annotation.Resource;

/**
* @Description: redis客户端
* @author: scott
* @date: 2020/01/01 16:01
*/
@Configuration
public class JeecgRedisClient {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;


    /**
     * 发送消息
     *
     * @param handlerName
     * @param params
     */
    public void sendMessage(String handlerName, BaseMap params) {
        params.put(GlobalConstants.HANDLER_NAME, handlerName);
        redisTemplate.convertAndSend(GlobalConstants.REDIS_TOPIC_NAME, params);
    }


}

package org.jeecg.common.constant;

/**
 * @Description: Websocket常量类
 * @author: taoyan
 * @date: 2020年03月23日
 */
public class WebsocketConst {


    /**
     * 消息json key:cmd
     */
    public static final String MSG_CMD = "cmd";

    /**
     * 消息json key:msgId
     */
    public static final String MSG_ID = "msgId";

    /**
     * 消息json key:msgTxt
     */
    public static final String MSG_TXT = "msgTxt";

    /**
     * 消息json key:userId
     */
    public static final String MSG_USER_ID = "userId";

    /**
     * 消息类型 heartcheck
     */
    public static final String CMD_CHECK = "heartcheck";

    /**
     * 消息类型 user 用户消息
     */
    public static final String CMD_USER = "user";

    /**
     * 消息类型 topic 系统通知
     */
    public static final String CMD_TOPIC = "topic";

    /**
     * 消息类型 email
     */
    public static final String CMD_EMAIL = "email";

    /**
     * 消息类型 meetingsign 会议签到
     */
    public static final String CMD_SIGN = "sign";

    /**
     * 消息类型 新闻发布/取消
     */
    public static final String NEWS_PUBLISH = "publish";

}
package org.jeecg.common.base;


import cn.hutool.core.util.ObjectUtil;

import org.apache.commons.beanutils.ConvertUtils;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* BaseMap
* 
* @author: scott
* @date: 2020/01/01 16:17
*/
public class BaseMap extends HashMap<String, Object> {

    private static final long serialVersionUID = 1L;


    public BaseMap() {

    }

    public BaseMap(Map<String, Object> map) {
        this.putAll(map);
    }


    @Override
    public BaseMap put(String key, Object value) {
        super.put(key, Optional.ofNullable(value).orElse(""));
        return this;
    }

    public BaseMap add(String key, Object value) {
        super.put(key, Optional.ofNullable(value).orElse(""));
        return this;
    }

    @SuppressWarnings("unchecked")
    public <T> T get(String key) {
        Object obj = super.get(key);
        if (ObjectUtil.isNotEmpty(obj)) {
            return (T) obj;
        } else {
            return null;
        }
    }

    @SuppressWarnings("unchecked")
    public Boolean getBoolean(String key) {
        Object obj = super.get(key);
        if (ObjectUtil.isNotEmpty(obj)) {
            return Boolean.valueOf(obj.toString());
        } else {
            return false;
        }
    }

    public Long getLong(String key) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return new Long(v.toString());
        }
        return null;
    }

    public Long[] getLongs(String key) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return (Long[]) v;
        }
        return null;
    }

    public List<Long> getListLong(String key) {
        List<String> list = get(key);
        if (ObjectUtil.isNotEmpty(list)) {
            return list.stream().map(e -> new Long(e)).collect(Collectors.toList());
        } else {
            return null;
        }
    }

    public Long[] getLongIds(String key) {
        Object ids = get(key);
        if (ObjectUtil.isNotEmpty(ids)) {
            return (Long[]) ConvertUtils.convert(ids.toString().split(","), Long.class);
        } else {
            return null;
        }
    }


    public Integer getInt(String key, Integer def) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return Integer.parseInt(v.toString());
        } else {
            return def;
        }
    }

    public Integer getInt(String key) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return Integer.parseInt(v.toString());
        } else {
            return 0;
        }
    }

    public BigDecimal getBigDecimal(String key) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return new BigDecimal(v.toString());
        }
        return new BigDecimal("0");
    }


    @SuppressWarnings("unchecked")
    public <T> T get(String key, T def) {
        Object obj = super.get(key);
        if (ObjectUtil.isEmpty(obj)) {
            return def;
        }
        return (T) obj;
    }

    public static BaseMap toBaseMap(Map<String, Object> obj) {
        BaseMap map = new BaseMap();
        map.putAll(obj);
        return map;
    }


}