最近做的项目学习netty,时间紧来不及学,希望请教大家一些问题,
1.netty的客户端与服务端建立长连接后,客户端如何与服务端通信?要随心所欲发送自定义的消息?
2.netty的自定义编码器,解码器,希望也能得到帮助,
3.第一个问题为主要问题。
数据包结构:
包头:2字节 0ddd 0fff。
消息体:全部为文本(ASCII码),汉字是GB2312编码。
包尾:2字节 0xxx 0aaa。
最后,先感谢每一位浏览解答的朋友,我会多多赠送C币给那些乐于帮助的人。
补充一些:
我是客户端,需要与服务端长连接;接通后需要发送登陆消息数据包;
每30秒发送一条链路数据
这些我熬夜两天,时间太紧张,希望大家不吝指教。
客户端代码:
public class TCPClient {
public void connect(int port,String host)throws Exception{
//网络事件处理线程组
EventLoopGroup group=new NioEventLoopGroup();
try{
//配置客户端启动类
Bootstrap b=new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)//设置封包 使用一次大数据的写操作,而不是多次小数据的写操作
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DealMsg());//设置自定义解码器
ch.pipeline().addLast(new TCPClientHandler());//设置客户端网络IO处理器
}
});
//连接服务器 同步等待成功
ChannelFuture f=b.connect(host,port).sync();
//同步等待客户端通道关闭
f.channel().closeFuture().sync();
}finally{
//释放线程组资源
group.shutdownGracefully();
}
}
}
客户端handler
public class TCPClientHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("通信异常!!");
cause.printStackTrace();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("链接服务端成功!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("退出链接!!");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接受服务器数据:=="+msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush("数据读取完成!");
}
}
Netty是建立在NIO基础之上,Netty在NIO之上又提供了更高层次的抽象。
在Netty里面,Accept连接可以使用单独的线程池去处理,读写操作又是另外的线程池来处理。
Accept连接和读写操作也可以使用同一个线程池来进行处理。而请求处理逻辑既可以使用单独的线程池进行处理,也可以跟放在读写线程一块处理。线程池中的每一个线程都是NIO线程。用户可以根据实际情况进行组装,构造出满足系统需求的并发模型。
Netty提供了内置的常用编解码器,包括行编解码器[一行一个请求],前缀长度编解码器[前N个字节定义请求的字节长度],可重放解码器[记录半包消息的状态],HTTP编解码器,WebSocket消息编解码器等等
Netty提供了一些列生命周期回调接口,当一个完整的请求到达时,当一个连接关闭时,当一个连接建立时,用户都会收到回调事件,然后进行逻辑处理。
Netty可以同时管理多个端口,可以使用NIO客户端模型,这些对于RPC服务是很有必要的。
Netty除了可以处理TCP Socket之外,还可以处理UDP Socket。
在消息读写过程中,需要大量使用ByteBuffer,Netty对ByteBuffer在性能和使用的便捷性上都进行了优化和抽象。
代码:
服务端:
复制代码
package com.kinson.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@since:
/
public class NettyServer {
/*
public NettyServer(int port) {
this.port = port;
}
public void run() {
//EventLoopGroup是用来处理IO操作的多线程事件循环器
//负责接收客户端连接线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
//负责处理客户端i/o事件、task任务、监听任务组
EventLoopGroup workerGroup = new NioEventLoopGroup();
//启动 NIO 服务的辅助启动类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
//配置 Channel
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ServerIniterHandler());
//BACKLOG用于构造服务端套接字ServerSocket对象,
// 标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//是否启用心跳保活机制
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
//绑定服务端口监听
Channel channel = bootstrap.bind(port).sync().channel();
System.out.println("server run in port " + port);
//服务器关闭监听
/*channel.closeFuture().sync()实际是如何工作:
channel.closeFuture()不做任何操作,只是简单的返回channel对象中的closeFuture对象,对于每个Channel对象,都会有唯一的一个CloseFuture,用来表示关闭的Future,
所有执行channel.closeFuture().sync()就是执行的CloseFuturn的sync方法,从上面的解释可以知道,这步是会将当前线程阻塞在CloseFuture上*/
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//关闭事件流组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new NettyServer(8899).run();
}
}
复制代码
服务端业务逻辑处理:
复制代码
package com.kinson.netty.server;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
@since:
*/
public class ServerHandler extends SimpleChannelInboundHandler {
/**
/**
/**
/**
/**
/**
/**
}
复制代码
服务端处理器注册:
复制代码
package com.kinson.netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
复制代码
package com.kinson.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
@since:
*/
public class NettyClient {
private String ip;
private int port;
private boolean stop = false;
public NettyClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void run() throws IOException {
//设置一个多线程循环器
EventLoopGroup workerGroup = new NioEventLoopGroup();
//启动附注类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
//指定所使用的NIO传输channel
bootstrap.channel(NioSocketChannel.class);
//指定客户端初始化处理
bootstrap.handler(new ClientIniterHandler());
try {
//连接服务
Channel channel = bootstrap.connect(ip, port).sync().channel();
while (true) {
//向服务端发送内容
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String content = reader.readLine();
if (StringUtils.isNotEmpty(content)) {
if (StringUtils.equalsIgnoreCase(content, "q")) {
System.exit(1);
}
channel.writeAndFlush(content);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(1);
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new NettyClient("127.0.0.1", 8899).run();
}
}
复制代码
客户端逻辑处理:
复制代码
package com.kinson.netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
复制代码
package com.kinson.netty.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
首先,我们定义消息类型:
[java] view plain copy
/**
* 消息类型
* @author 李熠
*
*/
public enum MsgType {
PING,SEND,LOGIN,NO_TARGET
}
分别是心跳、发送、登录、找不到目标
当客户端和服务端连接后,需要向服务端发送登录请求,也就是消息类型:LOGIN,服务端接收到LOGIN请求后,会将客户端加入到队列中,
[java] view plain copy
import java.io.Serializable;
public class Message implements Serializable {
private static final long serialVersionUID = -5756901646411393269L;
private String clientId;//发送者客户端ID
private MsgType type;//消息类型
private String data;//数据
private String targetId;//目标客户端ID
public String getTargetId() {
return targetId;
}
public void setTargetId(String targetId) {
this.targetId = targetId;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public MsgType getType() {
return type;
}
public void setType(MsgType type) {
this.type = type;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public Message(){
}
public Message(MsgType type){
this.type = type;
}
}
这类是定义的消息Bean,想服务端发送消息就是发送的这个对象的数据。
接下来,实现客户端队列代码:
[java] view plain copy
import io.netty.channel.Channel;
import io.netty.channel.socket.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class NettyChannelMap {
private static Map<String , SocketChannel> map = new ConcurrentHashMap<>();
public static void add(String clientId,SocketChannel channel){
map.put(clientId, channel);
}
public static Channel get(String clientId){
return map.get(clientId);
}
public static void remove(SocketChannel channel){
for (Map.Entry<String,SocketChannel> entry:map.entrySet()){
if (entry.getValue()==channel){
map.remove(entry.getKey());
}
}
}
}
服务端:
[java] view plain copy
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
public class NettyServer {
private int port;
public SocketChannel socketChannel;
public NettyServer(int port) throws InterruptedException {
this.port = port;
bind();
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
//字符串类解析
[java] view plain copy
<span style="white-space:pre;"> </span>//这里只能添加字符串的编码和解码器,
[java] view plain copy
<span style="white-space:pre;"> </span>//网上有很多例子是这样写的:
[java] view plain copy
<span style="white-space:pre;"> </span>//这种写法只能所有客户端都用netty写,否则其他框架实现的客户端无法发送消息到服务端,因为他是转换的netty自己的Object
[java] view plain copy
<span style="white-space:pre;"> </span>//p.addLast(new ObjectEncoder());
//p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
[java] view plain copy
p.addLast(new StringEncoder(Charset.forName("UTF-8")));
p.addLast(new StringDecoder(Charset.forName("UTF-8")));
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("server start---------------");
}
}
public static void main(String []args) throws InterruptedException {
if(args.length == 0){
new NettyServer(9999);
}else{
new NettyServer(Integer.parseInt(args[0]));
}
}
[java] view plain copy
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import cn.sunsharp.netty.bean.Message;
import cn.sunsharp.netty.bean.MsgType;
import cn.sunsharp.netty.bean.NettyChannelMap;
import com.alibaba.fastjson.JSON;
[java] view plain copy
//最好继承
[java] view plain copy
SimpleChannelInboundHandler<String>表示传递字符串消息,handler会把json格式的字符串转换为Message对象
public class NettyServerHandler extends SimpleChannelInboundHandler<String>{@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //channel失效,从Map中移除 NettyChannelMap.remove((SocketChannel)ctx.channel()); }@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {//cause.printStackTrace();System.out.println("出现异常!");}@Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg)throws Exception {System.out.println(msg);Message message = JSON.parseObject(msg+"", Message.class);System.out.println("接收到消息:"+message);String clientId = message.getClientId();if(MsgType.LOGIN.equals(message.getType())){System.out.printf("将%s添加到队列\n",clientId); NettyChannelMap.add(clientId,(SocketChannel)ctx.channel()); }else{ if(NettyChannelMap.get(clientId)==null){ System.out.printf("登录失败,请重新登录!",clientId); //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录 message = new Message(MsgType.LOGIN); ctx.channel().writeAndFlush(JSON.toJSONString(message)); } } switch (message.getType()){ case PING:{ message = new Message(MsgType.PING); NettyChannelMap.get(clientId).writeAndFlush(JSON.toJSONString(message)); }break; case SEND:{ //收到客户端的请求,发送给targetId System.out.println("发送消息:"+message); if(NettyChannelMap.get(message.getTargetId()) != null){ NettyChannelMap.get(message.getTargetId()).writeAndFlush(JSON.toJSONString(message)); }else{ message.setType(MsgType.NO_TARGET); NettyChannelMap.get(clientId).writeAndFlush(JSON.toJSONString(message)); } }break; default:break; }}}客户端可以使用任何框架任何语言的Socket来连接并发送消息,为了方便,这里依然用Netty来实现客户端:
[java] view plain copy
import java.nio.charset.Charset;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import cn.sunsharp.regulation.bean.Message;
import cn.sunsharp.regulation.bean.MsgType;
import com.alibaba.fastjson.JSON;
public class NettyClient {
private int port;
private String host;
public SocketChannel socketChannel;
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
public NettyClient(int port, String host) {
this.port = port;
this.host = host;
start();
}
private void start(){
ChannelFuture future = null;
try {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
socketChannel.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
socketChannel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
future =bootstrap.connect(host,port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel)future.channel();
System.out.println("connect server 成功---------");
}else{
System.out.println("连接失败!");
System.out.println("准备重连!");
start();
}
} catch (Exception e) {
}finally{
// if(null != future){
// if(null != future.channel() && future.channel().isOpen()){
// future.channel().close();
// }
// }
// System.out.println("准备重连!");
// start();
}
}
public static void main(String[]args) throws InterruptedException {
NettyClient bootstrap=new NettyClient(9999,"192.168.1.38");
System.out.println(11111);
Message loginMsg=new Message(MsgType.LOGIN);
loginMsg.setClientId("001");
loginMsg.setTargetId("192.168.1.38");
loginMsg.setType(MsgType.LOGIN);
bootstrap.socketChannel.writeAndFlush(JSON.toJSON(loginMsg));
}
}
[java] view plain copy
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import cn.sunsharp.regulation.bean.Message;
import cn.sunsharp.regulation.bean.MsgType;
import com.alibaba.fastjson.JSON;
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
public static ChannelHandlerContext context = null;
//利用写空闲发送心跳检测消息
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
Message pingMsg=new Message(MsgType.PING);
ctx.writeAndFlush(JSON.toJSON(pingMsg));
System.out.println("send ping to server----------");
break;
default:
break;
}
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg)
throws Exception {
Message message = JSON.parseObject(msg+"", Message.class);
MsgType msgType=message.getType();
switch (msgType){
case LOGIN:{
//向服务器发起登录
message = new Message(MsgType.LOGIN);
ctx.writeAndFlush(JSON.toJSONString(message));
}break;
case PING:{
System.out.println("receive ping from server----------");
}break;
case SEND:{
//收到服务端消息
System.out.println("收到服务端消息:"+message.getData());
}break;
case NO_TARGET:{
//收到服务端消息
System.out.println("找不到targetId:"+message.getTargetId());
}break;
default:break;
}
}
}
非常感谢上面的回复,我马上验证。最迟明天。也希望其他大佬多指点后生,学习的路上永无止境,我也会把这种精神传递下去
再详细补充一些:
我在做的是TCP通信。
我需要链接服务端后,发送登录请求,请求的数据必须上面数据包格式,希望大家帮忙。。。。。目前并不能实现
Netty提供了一些列生命周期回调接口,当一个完整的请求到达时,当一个连接关闭时,当一个连接建立时,用户都会收到回调事件,然后进行逻辑处理。
试试这个https://blog.csdn.net/qq_27903845/article/details/79453412 应该符合你的要求
1.连接后,你要发送登录请求, 2 自定义协议,自己开发个编解码就行,