springboot数据实时更新

各位大铑好,最近有些困扰,球各位大铑指点下,目前有个业务需求是:假设有1000个用户,这1000个用户所产生的数据(不同时产生),每间隔20秒就要上传一次。然后要将这1000个用户最新数据,返回给每个用户,用于前端的动态展示,相当于实时更新数据。有建议用websocket的,但是不是很懂,如果有这方面的大铑,请指教下,我这边有些图片暂时不方便放,如果有大铑指点,请思聊下我哦,跪蟹

这个主要涉及到一个上传数据和一个下载数据。
上传数据如果简单点,前端可以用定时器,每20秒上传一次,如果要稳定点的话,就前端创建一个websocket,后端自己new 一个socket绑定一个接口,接收前端的通信数据,或者直接用netty也可以。

将这1000个用户最新数据,返回给每个用户。 这个的话就直接前端在刷新页面的时候,请求一次接口拿最新数据就行。

实时展示有很多实现方式,就拿你这个例子来说,就算实时把信息以消息的形式推送给前端,也是20s才会变化一次,而因此建立了长连接或者消息队列,就有点大材小用了。
这种情况可以用非实时的方式实现实时。前端先发一次请求,获取到后端上一次同步数据的时间,然后以此时间为基准,每隔20s请求一次数据就好了,这样数据能保证一直是最新的,而且实现方式比较简单。
当然,后端的数据也可能会延迟,所以20s这个时间可以往后推几百毫秒或者一秒,这个可以看一下后端的历史数据分析一下。反正只要拿到后端最新的数据就是实时了。

1.如果不需要实时刷新,可以不需要websocket,如果确实有实时性要求才需要websocket。

2.如果你是springboot一定会做集群,集群的原理是,客户端A,B,C,D分别与集群环境下的主机E,F,建立websocket连接(A->E,B->E,C->E,D->F),集群环境下的E,F共同监听消息队列或者缓存数据库(比如redis)的频道,
以redis举例:
当用户A想与D通信时,
第一步A发送消息到主机E。
第二步主机E通过redis进行广播,对所有监听频道的主机E,F同时广播。
第三步主机F收到广播,发现自己与D没有建立链接,消息停止发送,主机E收到广播发现自己与D有链接,发送消息成功。
由于客户端只能与集群环境的唯一一台主机建立连接,所以不会针对客户端发送多条消息。

3.websocket消息分为一对一,一对多模式。
用户在客户端接收消息是属于一对多模式,由系统群发商品被评论的消息,不需要根据页面进行分别推送。
第一,用户是客户端,一定与集群环境的一台主机建立的websocket链接。
第二,当用户C打开商品A,B页面,如果用户D评论A,D通过他的websocket连接将信息发送至服务端,服务端通过群发消息,即向C,D群发A被评论的信息,
如果还有用户E也打开了商品A页面,他也能收到
第三,消息内容可以定义(D评论了A),所以D客户端可以选择性过滤,不接收此消息。

推荐一款可做websocket集成的框架,在gitee上star已达5.1k,bug较少,稳定性较高。并且具有强制下线客户端,客户端登录唯一性控制等功能
CIM是一套基于netty框架下的推送系统,或许有一些企业有着自己一套即时通讯系统的需求,那么CIM为您提供了一个解决方案,目前CIM支持websocket,android,ios,桌面应用,系统应用等多端接入支持,可应用于移动应用,物联网,智能家居,嵌入式开发,桌面应用,WEB应用以及后台系统之间的即时消服务

您的采纳就是对我最大的动力,谢谢!!!

img

就相当于是说每个用户登陆后看到的各自的数据永远是最新的,那可以用定时任务,每隔 20s 拉去一次最新数据,然后返回给前端就好了

最简单的前端没隔20秒请求一次,循环请求就好,很好实现。

定时任务每隔20秒拉取一次数据,通过WebSocket返回给前端进行展示

你定时任务把数据拿到,前端写个定时器,定时请求接口就行。

可以定义一个定时器任务,每隔多长时间推送一次数据返回给前端

首先这个数据整理肯定是服务端来搞,可以用定时任务,20s整理一次放到缓存中;
至于你前端怎么查那就看你心情了,使用websocket肯定是有优点的,节省每次创建连接的资源浪费,刷新频率修改也可以被服务端控制,当这个刷新周期越短,优点就越明显;
如果你比较懒那就也用定时器去定时查询服务端,没啥优点,就是开发简单。。。

《一个websocket协议通知触发的自动刷新》
没有多少内容,自己哈哈就看完了
https://juejin.cn/post/6966868210754781197

springboot 数据实时更新,需要结合ws长链接实现。
1.推送的信息全部都是模拟的所以不存在ORM的操作

要模拟推送的实体类

思路是创建连接时就创建线程,并使用定时线程池不断像session中进行推送信息

package com.hua.queerdemo.domain.response;

public class UserInfoSendResponse {
    private Long userId;
    private String name;
    private String age;
    private String address;
    private Integer fraction;

    public Integer getFraction() {
        return fraction;
    }

    public void setFraction(Integer fraction) {
        this.fraction = fraction;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}


2、websocket配置类


package com.hua.queerdemo.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    //这个bean的注册,用于扫描带有@ServerEndpoint的注解成为websocket  ,如果你使用外置的tomcat就不需要该配置文件
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3、websocket类


package com.hua.queerdemo.utils;


import com.hua.queerdemo.service.WebSocketService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

@Slf4j
@ServerEndpoint(value = "/websocket/{id}/{modId}")
@Component
public class WebSocketUtils {

    private static WebSocketService webSocketService;

    @Autowired
    public void setWebSocketService(WebSocketService webSocketService) {
        WebSocketUtils.webSocketService = webSocketService;
    }

    //连接用户
    private Session session;

    //用户ID
    private Long id;

    //存放每个用户的websocketUtils
    private static ConcurrentHashMap<Long,WebSocketUtils> websocketHashMap = new ConcurrentHashMap<Long,WebSocketUtils>();

    @OnOpen
    public synchronized void onOpen(Session session, @PathParam("id") Long id,@PathParam("modId") Long modId){
        log.debug("创建连接");
        System.out.println("创建连接");
        this.session=session;
        this.id=id;
        websocketHashMap.put(this.id,this);
        log.debug(session.getId()+"连接成功");
        //线程执行
        webSocketService.get(session,this.id,modId);
    }


    @OnClose
    public void onClose(@PathParam("id") Long id){
        System.out.println("清除线程");
        ConcurrentHashMap<Long, ScheduledFuture> scheduledFutureConcurrentHashMap = webSocketService.getScheduledFutureConcurrentHashMap();
        //清除用户的登录记录
        if (null!=websocketHashMap.get(id)){
            websocketHashMap.remove(id);
        }
        //结束线程 GC回收机制会自动清理资源
        if (null!=scheduledFutureConcurrentHashMap.get(this.id)){
            scheduledFutureConcurrentHashMap.get(this.id).cancel(false);
            scheduledFutureConcurrentHashMap.remove(this.id);
        }
    }

    @OnMessage
    public void onMessage(String message,Session session){
        System.out.println("收到的消息");
    }

    @OnError
    public void onError(Session session, Throwable error){
        System.out.println("发生错误");
        System.out.println("清除线程");
        ConcurrentHashMap<Long, ScheduledFuture> scheduledFutureConcurrentHashMap = webSocketService.getScheduledFutureConcurrentHashMap();
        //清除用户的登录记录
        if (null!=websocketHashMap.get(id)){
            websocketHashMap.remove(id);
        }
        //结束线程 GC回收机制会自动清理资源
        if (null!=scheduledFutureConcurrentHashMap.get(this.id)){
            scheduledFutureConcurrentHashMap.get(this.id).cancel(false);
            scheduledFutureConcurrentHashMap.remove(this.id);
        }
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
}

4、webSocket业务类

package com.hua.queerdemo.service.impl;

import com.alibaba.fastjson.JSON;
import com.hua.queerdemo.domain.response.UserInfoSendResponse;
import com.hua.queerdemo.service.WebSocketService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.*;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import javax.websocket.Session;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

@Slf4j
@EnableScheduling
@EnableAsync(proxyTargetClass = true)
@Service
public class  WebSocketServiceImpl implements WebSocketService, ApplicationRunner {
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    @Autowired
    Scheduler scheduled;

    UserInfoSendResponse userInfoOne = new UserInfoSendResponse();

    UserInfoSendResponse userInfoTwo = new UserInfoSendResponse();

    UserInfoSendResponse userInfoThree = new UserInfoSendResponse();

    //初始化三条模拟数据
    @Override
    public void run(ApplicationArguments args) throws Exception {
        userInfoOne.setUserId(1L);
        userInfoOne.setName("张山");
        userInfoOne.setAge("18");
        userInfoOne.setAddress("北京西安");
        userInfoOne.setFraction(20);

        userInfoTwo.setUserId(2L);
        userInfoTwo.setName("小明");
        userInfoTwo.setAge("18");
        userInfoTwo.setAddress("火葬场");
        userInfoOne.setFraction(30);

        userInfoThree.setUserId(3L);
        userInfoThree.setName("马鞍山");
        userInfoThree.setAge("18");
        userInfoThree.setAddress("龙归");
        userInfoOne.setFraction(40);
    }

    //模拟三条数据随机更新数值
    @Scheduled(cron = "0/1 * * * * ?")
    public void updateInfo(){
        userInfoOne.setFraction((int) (Math.random()*100+1));
        userInfoTwo.setFraction((int) (Math.random()*100+1));
        userInfoThree.setFraction((int) (Math.random()*100+1));
    }


    private static ConcurrentHashMap<Long, ScheduledFuture> scheduledFutureConcurrentHashMap = new ConcurrentHashMap<Long,ScheduledFuture>();

    Integer test = 0 ;

    public ConcurrentHashMap<Long, ScheduledFuture> getScheduledFutureConcurrentHashMap() {
        return scheduledFutureConcurrentHashMap;
    }



    @Async("asyncServiceExecutor")
    public void get(Session session, Long id,Long modId){
        System.out.println("开启线程"+Thread.currentThread().getName());
        String cron = "0/1 * * * * ?";
        //在该线程下执行定时任务
        threadPoolTaskScheduler.setPoolSize(1);
        //判断之前是否是相同的用户,是则清除之前的线程
        if (null!=scheduledFutureConcurrentHashMap.get(id)) {
            System.out.println("清理之前存在的线程");
            scheduledFutureConcurrentHashMap.get(id).cancel(false);
            scheduledFutureConcurrentHashMap.remove(id);
        }
        Thread thread = new Thread(() -> {
            System.out.println("id:" + id + "的线程");
            System.out.println("查询对应数据" + Thread.currentThread().getName());
            String userInfoJson = null;
            switch (modId.intValue()){
                case 1:  userInfoJson = JSON.toJSONString(userInfoOne); break;
                case 2:  userInfoJson = JSON.toJSONString(userInfoTwo); break;
                case 3:  userInfoJson = JSON.toJSONString(userInfoThree); break;
            }
            try {
                session.getBasicRemote().sendText(userInfoJson);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(thread, new CronTrigger(cron));
        scheduledFutureConcurrentHashMap.put(id,schedule);
    }
}

5、线程池配置信息

package com.hua.queerdemo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "taskScheduler")
    public ThreadPoolTaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10);
        scheduler.setThreadNamePrefix("task-");
        scheduler.setAwaitTerminationSeconds(600);
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        return scheduler;
    }


    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

非实时性要求,20秒延迟完全不用websocket实现,建议通过20s定时器获取数据更新前端。

websocket只是用于后台和前端之间做通信的,你不清楚那部分?

Spring提供了@Scheduled注解用于定时任务你可以直接设置成20S

这种需求为啥要用websocket,前端一个轮询不香吗

出于性能考虑,可以使用websocket;
websocke使用可以参考以下代码;
功能实现也很简单,只需要把最新的数据缓存起来,单体架构的话可以放在进程变量中,分布式不熟的话可以放在redis.

若有帮助,谢谢采纳~

package org.jeecg.modules.message.websocket;
 
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.CommonSendStatus;
import org.jeecg.common.modules.redis.listener.JeecgRedisListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
/**
 * 监听消息(采用redis发布订阅方式发送消息)
 * @author: jeecg-boot
 */
@Slf4j
@Component
public class SocketHandler implements JeecgRedisListener {
 
    @Autowired
    private WebSocket webSocket;
 
    @Override
    public void onMessage(BaseMap map) {
        log.info("【SocketHandler消息】Redis Listerer:" + map.toString());
 
        String userId = map.get("userId");
        String message = map.get("message");
        if (ObjectUtil.isNotEmpty(userId)) {
            webSocket.pushMessage(userId, message);
            //app端消息推送
            webSocket.pushMessage(userId+CommonSendStatus.APP_SESSION_SUFFIX, message);
        } else {
            webSocket.pushMessage(message);
        }
 
    }
}
 
package org.jeecg.common.modules.redis.listener;
 
import org.jeecg.common.base.BaseMap;
 
/**
 * @Description: 自定义消息监听
 * @author: scott
 * @date: 2020/01/01 16:02
 */
public interface JeecgRedisListener {
    /**
     * 接受消息
     *
     * @param message
     */
    void onMessage(BaseMap message);
 
}
package org.jeecg.modules.message.websocket;
 
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
 
import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
 
import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.WebsocketConst;
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.stereotype.Component;
 
import com.alibaba.fastjson.JSONObject;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * @Author scott
 * @Date 2019/11/29 9:41
 * @Description: 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocket {
 
    private Session session;
 
    /**
     * 用户ID
     */
    private String userId;
 
    private static final String REDIS_TOPIC_NAME = "socketHandler";
 
    @Resource
    private JeecgRedisClient jeecgRedisClient;
 
    /**
     * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
     */
    private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
    /**
     * 线程安全Map
     */
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
 
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            //TODO 通过header中获取token,进行check
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }
 
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }
 
 
    /**
     * 服务端推送消息
     *
     * @param userId
     * @param message
     */
    public void pushMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
                synchronized (session){
                    log.info("【websocket消息】 单点消息:" + message);
                    session.getBasicRemote().sendText(message);
                }
                //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    /**
     * 服务器端推送消息
     */
    public void pushMessage(String message) {
        try {
            webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
 
    @OnMessage
    public void onMessage(String message) {
        //todo 现在有个定时任务刷,应该去掉
        log.debug("【websocket消息】收到客户端消息:" + message);
        JSONObject obj = new JSONObject();
        //业务类型
        obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
        //消息内容
        obj.put(WebsocketConst.MSG_TXT, "心跳响应");
        //update-begin-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473
        for (WebSocket webSocket : webSockets) {
            webSocket.pushMessage(obj.toJSONString());
        }
        //update-end-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473
    }
 
    /**
     * 后台发送消息到redis
     *
     * @param message
     */
    public void sendMessage(String message) {
        log.info("【websocket消息】广播消息:" + message);
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", "");
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }
 
    /**
     * 此为单点消息
     *
     * @param userId
     * @param message
     */
    public void sendMessage(String userId, String message) {
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", userId);
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }
 
    /**
     * 此为单点消息(多人)
     *
     * @param userIds
     * @param message
     */
    public void sendMessage(String[] userIds, String message) {
        for (String userId : userIds) {
            sendMessage(userId, message);
        }
    }
 
}
 
 
package org.jeecg.common.constant;
 
/**
 *     系统通告 - 发布状态
 * @Author LeeShaoQing
 *
 */
public interface CommonSendStatus {
 
    /**
     * 未发布
     */
    public static final String UNPUBLISHED_STATUS_0 = "0";
 
    /**
     * 已发布
     */
    public static final String PUBLISHED_STATUS_1 = "1";
 
    /**
     * 撤销
     */
    public static final String REVOKE_STATUS_2 = "2";
 
    /**
     * app端推送会话标识后缀
     */
    public static final String  APP_SESSION_SUFFIX = "_app";
 
 
    /**流程催办——系统通知消息模板*/
    public static final String TZMB_BPM_CUIBAN = "bpm_cuiban";
    /**标准模板—系统消息通知*/
    public static final String TZMB_SYS_TS_NOTE = "sys_ts_note";
    /**流程超时提醒——系统通知消息模板*/
    public static final String TZMB_BPM_CHAOSHI_TIP = "bpm_chaoshi_tip";
}
 
package org.jeecg.modules.message.websocket;
 
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
 
import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
 
import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.WebsocketConst;
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.stereotype.Component;
 
import com.alibaba.fastjson.JSONObject;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * @Author scott
 * @Date 2019/11/29 9:41
 * @Description: 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocket {
 
    private Session session;
 
    /**
     * 用户ID
     */
    private String userId;
 
    private static final String REDIS_TOPIC_NAME = "socketHandler";
 
    @Resource
    private JeecgRedisClient jeecgRedisClient;
 
    /**
     * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
     */
    private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
    /**
     * 线程安全Map
     */
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
 
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            //TODO 通过header中获取token,进行check
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }
 
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }
 
 
    /**
     * 服务端推送消息
     *
     * @param userId
     * @param message
     */
    public void pushMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
                synchronized (session){
                    log.info("【websocket消息】 单点消息:" + message);
                    session.getBasicRemote().sendText(message);
                }
                //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    /**
     * 服务器端推送消息
     */
    public void pushMessage(String message) {
        try {
            webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
 
    @OnMessage
    public void onMessage(String message) {
        //todo 现在有个定时任务刷,应该去掉
        log.debug("【websocket消息】收到客户端消息:" + message);
        JSONObject obj = new JSONObject();
        //业务类型
        obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
        //消息内容
        obj.put(WebsocketConst.MSG_TXT, "心跳响应");
        //update-begin-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473
        for (WebSocket webSocket : webSockets) {
            webSocket.pushMessage(obj.toJSONString());
        }
        //update-end-author:taoyan date:20220308 for: 消息通知长连接启动心跳机制,后端代码小bug #3473
    }
 
    /**
     * 后台发送消息到redis
     *
     * @param message
     */
    public void sendMessage(String message) {
        log.info("【websocket消息】广播消息:" + message);
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", "");
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }
 
    /**
     * 此为单点消息
     *
     * @param userId
     * @param message
     */
    public void sendMessage(String userId, String message) {
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", userId);
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }
 
    /**
     * 此为单点消息(多人)
     *
     * @param userIds
     * @param message
     */
    public void sendMessage(String[] userIds, String message) {
        for (String userId : userIds) {
            sendMessage(userId, message);
        }
    }
 
}
 
 
package org.jeecg.common.modules.redis.client;
 
import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.GlobalConstants;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
 
import javax.annotation.Resource;
 
/**
* @Description: redis客户端
* @author: scott
* @date: 2020/01/01 16:01
*/
@Configuration
public class JeecgRedisClient {
 
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
 
 
    /**
     * 发送消息
     *
     * @param handlerName
     * @param params
     */
    public void sendMessage(String handlerName, BaseMap params) {
        params.put(GlobalConstants.HANDLER_NAME, handlerName);
        redisTemplate.convertAndSend(GlobalConstants.REDIS_TOPIC_NAME, params);
    }
 
 
}
 
package org.jeecg.common.constant;
 
/**
 * @Description: Websocket常量类
 * @author: taoyan
 * @date: 2020年03月23日
 */
public class WebsocketConst {
 
 
    /**
     * 消息json key:cmd
     */
    public static final String MSG_CMD = "cmd";
 
    /**
     * 消息json key:msgId
     */
    public static final String MSG_ID = "msgId";
 
    /**
     * 消息json key:msgTxt
     */
    public static final String MSG_TXT = "msgTxt";
 
    /**
     * 消息json key:userId
     */
    public static final String MSG_USER_ID = "userId";
 
    /**
     * 消息类型 heartcheck
     */
    public static final String CMD_CHECK = "heartcheck";
 
    /**
     * 消息类型 user 用户消息
     */
    public static final String CMD_USER = "user";
 
    /**
     * 消息类型 topic 系统通知
     */
    public static final String CMD_TOPIC = "topic";
 
    /**
     * 消息类型 email
     */
    public static final String CMD_EMAIL = "email";
 
    /**
     * 消息类型 meetingsign 会议签到
     */
    public static final String CMD_SIGN = "sign";
 
    /**
     * 消息类型 新闻发布/取消
     */
    public static final String NEWS_PUBLISH = "publish";
 
}
package org.jeecg.common.base;
 
 
import cn.hutool.core.util.ObjectUtil;
 
import org.apache.commons.beanutils.ConvertUtils;
 
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
 
/**
* BaseMap
* 
* @author: scott
* @date: 2020/01/01 16:17
*/
public class BaseMap extends HashMap<String, Object> {
 
    private static final long serialVersionUID = 1L;
 
 
    public BaseMap() {
 
    }
 
    public BaseMap(Map<String, Object> map) {
        this.putAll(map);
    }
 
 
    @Override
    public BaseMap put(String key, Object value) {
        super.put(key, Optional.ofNullable(value).orElse(""));
        return this;
    }
 
    public BaseMap add(String key, Object value) {
        super.put(key, Optional.ofNullable(value).orElse(""));
        return this;
    }
 
    @SuppressWarnings("unchecked")
    public <T> T get(String key) {
        Object obj = super.get(key);
        if (ObjectUtil.isNotEmpty(obj)) {
            return (T) obj;
        } else {
            return null;
        }
    }
 
    @SuppressWarnings("unchecked")
    public Boolean getBoolean(String key) {
        Object obj = super.get(key);
        if (ObjectUtil.isNotEmpty(obj)) {
            return Boolean.valueOf(obj.toString());
        } else {
            return false;
        }
    }
 
    public Long getLong(String key) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return new Long(v.toString());
        }
        return null;
    }
 
    public Long[] getLongs(String key) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return (Long[]) v;
        }
        return null;
    }
 
    public List<Long> getListLong(String key) {
        List<String> list = get(key);
        if (ObjectUtil.isNotEmpty(list)) {
            return list.stream().map(e -> new Long(e)).collect(Collectors.toList());
        } else {
            return null;
        }
    }
 
    public Long[] getLongIds(String key) {
        Object ids = get(key);
        if (ObjectUtil.isNotEmpty(ids)) {
            return (Long[]) ConvertUtils.convert(ids.toString().split(","), Long.class);
        } else {
            return null;
        }
    }
 
 
    public Integer getInt(String key, Integer def) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return Integer.parseInt(v.toString());
        } else {
            return def;
        }
    }
 
    public Integer getInt(String key) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return Integer.parseInt(v.toString());
        } else {
            return 0;
        }
    }
 
    public BigDecimal getBigDecimal(String key) {
        Object v = get(key);
        if (ObjectUtil.isNotEmpty(v)) {
            return new BigDecimal(v.toString());
        }
        return new BigDecimal("0");
    }
 
 
    @SuppressWarnings("unchecked")
    public <T> T get(String key, T def) {
        Object obj = super.get(key);
        if (ObjectUtil.isEmpty(obj)) {
            return def;
        }
        return (T) obj;
    }
 
    public static BaseMap toBaseMap(Map<String, Object> obj) {
        BaseMap map = new BaseMap();
        map.putAll(obj);
        return map;
    }
 
 
}

后端数据更新时更新缓存,前端定时调用接口就可以

  • 使用http协议的long pulling可以实现
  • 也可以使用websocket长连接实现
  • 每个用户连接后端时,后端把所有的长连接都记录到Map里面
  • 当有数据更新时,需要通知哪个用户,就从Map取出这个用户的连接然后把数据写到前端

如有帮助,请采纳,十分感谢!

你好我建议你使用Redis存储用户信息,key为用户id,value为用户数据,当用户生产出新的数据是直接根据当前用户id通过redis的set方法进行数据覆盖即可,每次前台请求数据都从redis中拿出来给他!!!