有偿援助!各位大铑好,目前有个需求是需要实时向前端推送数据,有考虑说用websocket建立长连接,这样的话前端定时提交数据,我后端这边也定时推送数据,但是这方面不是很懂,所以想请教下各位大铑,有偿援助,愿意指点的话,请私信下我
已私信
后端如何使用websocket的代码网上一查一大堆,技术已经很成熟了,代码例子也很多的
给你一篇案例源码,你参考用吧
很简单啊,代码完全可以提供
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.text.ParseException;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j //用于日志
@ServerEndpoint(value = "/websocket/xxx/xxx") //将该类定义为一个webSocket的服务端
@Component //实例化到spring容器,泛指各种组件,不需要归类的时候,需要加上。在websocket必加
public class testWebSocket{
//开始
public static TestService testService ; //前文有讲过,需要注入【跳转在上面】
@Autowired
public void setSenderService(TestService testService){
testWebSocket.testService= testService;
}
//结束
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8);
/** 记录当前在线连接数 */
private static AtomicInteger onlineCount = new AtomicInteger(0);
/** 存放所有在线的客户端 */
private static Map<String, Session> clients = new ConcurrentHashMap<>();
private static Map<String, String> clientParmas = new ConcurrentHashMap<>();
@PostConstruct
public void init(){
//新建定时线程池
Task task = new Task();
//用于定时发送
scheduledExecutorService.scheduleAtFixedRate(task,1,10, TimeUnit.SECONDS);
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
onlineCount.incrementAndGet(); // 在线数加1
clients.put(session.getId(), session);
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
onlineCount.decrementAndGet(); // 在线数减1
clients.remove(session.getId());
log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
}
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 服务端发送消息给客户端
*/
private void sendMessage(String message, Session toSession) {
try {
log.info("服务端给客户端[{}]发送消息[{}]", toSession.getId(), message);
toSession.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("服务端发送消息给客户端失败:{}", e);
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message
* 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws ParseException {
log.info("服务端收到客户端[{}]的消息[{}]", session.getId(), message);
this.params = message;
clientParmas.put(session.getId(),message);
if(!StringUtils.isEmpty(message)){
//前端传输过来是一个base64的字符,转换成一个map
String deStr = new String(Base64.getDecoder().decode(parmas));
Map<Object,Object> map = JSON.parseObject(deStr,Map.class);
//具体业务编写。
List<String> list = ....;
//然后推送回前端
sendMessage(JSON.toJSONString(list), session);
}
}
//定时自动推送数据
class Task implements Runnable {
@Override
public void run() {
clients.keySet().forEach(key -> {
Session toSession = clients.get(key);
if (toSession != null) {
String parmas = clientParmas.get(toSession.getId());
if (!StringUtils.isEmpty(parmas)) {
String deStr = new String(Base64.getDecoder().decode(parmas));
Map<Object,Object> map = JSON.parseObject(deStr,Map.class);
//具体业务编写。
List<String> list = ....;
//然后推送回前端
sendMessage(JSON.toJSONString(list), toSession);
}
}
});
}
}
}
websocket工具类
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @Author: liury
* @Description: webSocket
* @DateTime: 2022/6/19 20:54
**/
@Slf4j
@Component
@ServerEndpoint("/webSocket")
public class WebSocket {
private static int onlineCount = 0;
private static CopyOnWriteArraySet<Session> sessions = new CopyOnWriteArraySet<Session>();
private Session session;
private String username;
@OnOpen
public void onOpen( Session session) throws IOException {
this.session = session;
// this.username = userId;
addOnlineCount();
sessions.add(session);
log.info("已连接--------------: "+session);
}
@OnClose
public void onClose(Session session) throws IOException {
sessions.remove(session);
subOnlineCount();
log.info("关闭连接--------------: "+session);
}
@OnMessage
public void onMessage(String message) throws IOException {
sendMessageAll("给所有人");
}
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
public void sendMessageTo(String message, String To) throws IOException {
// session.getBasicRemote().sendText(message);
//session.getAsyncRemote().sendText(message);
for (Session session : sessions) {
log.info("--------------------------------------------------webSocket发送消息{}:" + System.currentTimeMillis(), To);
session.getAsyncRemote().sendText(message);
}
}
public static void sendMessageAll(String message) throws IOException {
if (sessions.size() != 0) {
for (Session s : sessions) {
if (s != null) {
s.getAsyncRemote().sendText(message);
}
}
}
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocket.onlineCount--;
}
}
开启websocket支持
代码如下(示例):
package com.yzd.web.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Author: liury
* @Description: 开启webSocket支持
* @DateTime: 2022/6/18 17:02
**/
@Configuration
public class WebSocketConfig {
/**
* 服务器节点
*
* 如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
注意http 用的是 ws ,https用的是wss
//WebSocket发送消息给前端
ArrayList<Object> wsList = new ArrayList<>();
wsList.add(eventAddREQ);
String s = JSONObject.toJSONString(wsList);
try {
WebSocket.sendMessageAll(s);
} catch (IOException e) {
e.printStackTrace();
}
只能帮你到这里了