websocket实时推送数据

有偿援助!各位大铑好,目前有个需求是需要实时向前端推送数据,有考虑说用websocket建立长连接,这样的话前端定时提交数据,我后端这边也定时推送数据,但是这方面不是很懂,所以想请教下各位大铑,有偿援助,愿意指点的话,请私信下我

已私信

后端如何使用websocket的代码网上一查一大堆,技术已经很成熟了,代码例子也很多的

给你一篇案例源码,你参考用吧

很简单啊,代码完全可以提供


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.text.ParseException;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
 
@Slf4j  //用于日志
@ServerEndpoint(value = "/websocket/xxx/xxx")   //将该类定义为一个webSocket的服务端
@Component  //实例化到spring容器,泛指各种组件,不需要归类的时候,需要加上。在websocket必加
public class testWebSocket{
 
 
    //开始
    public static TestService testService ;  //前文有讲过,需要注入【跳转在上面】
 
    @Autowired
    public void setSenderService(TestService testService){
        testWebSocket.testService= testService;
    }
    //结束
 
   
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8);
 
 
    /** 记录当前在线连接数 */
    private static AtomicInteger onlineCount = new AtomicInteger(0);
 
    /** 存放所有在线的客户端 */
    private static Map<String, Session> clients = new ConcurrentHashMap<>();
    private static Map<String, String> clientParmas = new ConcurrentHashMap<>();
 
    @PostConstruct
    public  void init(){
        //新建定时线程池
        Task task = new Task();
        //用于定时发送
        scheduledExecutorService.scheduleAtFixedRate(task,1,10, TimeUnit.SECONDS);
    }
 
 
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        onlineCount.incrementAndGet(); // 在线数加1
        clients.put(session.getId(), session);
    }
 
 
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        onlineCount.decrementAndGet(); // 在线数减1
        clients.remove(session.getId());
        log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }
 
 
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }
 
 
     /**
      * 服务端发送消息给客户端
      */
    private void sendMessage(String message, Session toSession) {
        try {
            log.info("服务端给客户端[{}]发送消息[{}]", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败:{}", e);
        }
    }
 
 
 
 
 
     /**
      * 收到客户端消息后调用的方法
      *
      * @param message
      * 客户端发送过来的消息
      */
    @OnMessage
    public void onMessage(String message, Session session) throws ParseException {
        log.info("服务端收到客户端[{}]的消息[{}]", session.getId(), message);
        this.params = message;
        clientParmas.put(session.getId(),message);
        if(!StringUtils.isEmpty(message)){
            //前端传输过来是一个base64的字符,转换成一个map
           String deStr = new String(Base64.getDecoder().decode(parmas));
           Map<Object,Object> map = JSON.parseObject(deStr,Map.class);
 
            //具体业务编写。
            List<String> list = ....;
            //然后推送回前端
           sendMessage(JSON.toJSONString(list), session);
            
        }
    }
 
 
    //定时自动推送数据
    class Task implements Runnable {
        @Override
        public void run() {
            clients.keySet().forEach(key -> {
                Session toSession = clients.get(key);
                if (toSession != null) {
                    String parmas = clientParmas.get(toSession.getId());
                    if (!StringUtils.isEmpty(parmas)) {
                        String deStr = new String(Base64.getDecoder().decode(parmas));
                        Map<Object,Object> map = JSON.parseObject(deStr,Map.class);
                       //具体业务编写。
                        List<String> list = ....;
                        //然后推送回前端
                        sendMessage(JSON.toJSONString(list), toSession);
                    }
                }
            });
 
        }
    }
 
}


websocket工具类

 
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
 
/**
 * @Author: liury
 * @Description: webSocket
 * @DateTime: 2022/6/19 20:54
 **/
@Slf4j
@Component
@ServerEndpoint("/webSocket")
public class WebSocket {
    private static int onlineCount = 0;
 
    private static CopyOnWriteArraySet<Session> sessions = new CopyOnWriteArraySet<Session>();
 
    private Session session;
 
    private String username;
 
 
    @OnOpen
    public void onOpen( Session session) throws IOException {
 
 
        this.session = session;
//        this.username = userId;
        addOnlineCount();
        sessions.add(session);
        log.info("已连接--------------: "+session);
 
    }
 
 
 
    @OnClose
 
    public void onClose(Session session) throws IOException {
 
        sessions.remove(session);
 
        subOnlineCount();
        log.info("关闭连接--------------: "+session);
 
    }
 
 
 
    @OnMessage
 
    public void onMessage(String message) throws IOException {
 
        sendMessageAll("给所有人");
 
    }
 
 
 
    @OnError
 
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
 
    }
 
 
 
    public void sendMessageTo(String message, String To) throws IOException {
 
        // session.getBasicRemote().sendText(message);
 
        //session.getAsyncRemote().sendText(message);
 
        for (Session session : sessions) {
 
            log.info("--------------------------------------------------webSocket发送消息{}:" + System.currentTimeMillis(), To);
            session.getAsyncRemote().sendText(message);
 
        }
 
    }
 
 
 
    public static void sendMessageAll(String message) throws IOException {
 
        if (sessions.size() != 0) {
            for (Session s : sessions) {
                if (s != null) {
                    s.getAsyncRemote().sendText(message);
                }
            }
        }
 
 
    }
 
 
 
 
 
 
    public static synchronized void addOnlineCount() {
 
        WebSocket.onlineCount++;
 
    }
 
 
 
    public static synchronized void subOnlineCount() {
 
        WebSocket.onlineCount--;
 
    }
 
 
}

开启websocket支持
代码如下(示例):


package com.yzd.web.config;
 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 
/**
 * @Author: liury
 * @Description: 开启webSocket支持
 * @DateTime: 2022/6/18 17:02
 **/
@Configuration
public class WebSocketConfig {
 
    /**
     * 服务器节点
     *
     * 如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

注意http 用的是 ws ,https用的是wss

 //WebSocket发送消息给前端
            ArrayList<Object> wsList = new ArrayList<>();
            wsList.add(eventAddREQ);
            String s = JSONObject.toJSONString(wsList);
 
            try {
                WebSocket.sendMessageAll(s);
            } catch (IOException e) {
                e.printStackTrace();
            }

只能帮你到这里了