请教一个netty的问题,服务端注册了4个channel,2个InboundHandler,2个OutboundHandler,我在客户端的ChannelInboundHandler的channelActive方法里向服务端发了一条消息,发现这个OutboundHandler里的read会被调用两次,一次在InboundHandler的read方法之前,另一次在这个InboundHandler.channelReadComplete之后
服务端打印的结果:
==========Server console: SecondOutServerHandler.read
==========Server console: FirstOutServerHandler.read
==========Server console: FirstInServerHandler.channelRead ClientInHandler ->
==========Server console:SecondInServerHandler.channelRead FirstInServerHandler ->
==========Server console: FirstInServerHandler.channelReadComplete
==========Server console: SecondInServerHandler.channelReadComplete
==========Server console: SecondOutServerHandler.read
==========Server console: FirstOutServerHandler.read
Disconnected from the target VM, address: '127.0.0.1:56338', transport: 'socket'
我的问题是这个OutboundHandler不是拦截出站事件的么,怎么象拦截的这个InboundHandler,一次写入怎么会调用两次服务 的OutboundHandlerread()方法?netty 的ChannelOutboundHandler中的read()方法和write()方法什么时候调用?
package com.bugStack.testHandle;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientInHandler extends SimpleChannelInboundHandler<ByteBuf>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("==========Client Console ClientInHandler.channelRead0: " + msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("ClientInHandler -> ", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package com.bugStack.testHandle;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
public class ClientOutHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("=========ClientOutHandler.write "+ in.toString(CharsetUtil.UTF_8));
/* ctx.writeAndFlush(Unpooled.copiedBuffer(in,
Unpooled.copiedBuffer("ClientOutHandler", CharsetUtil.UTF_8)));*/
super.write(ctx, msg, promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package com.bugStack.testHandle;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class FirstInServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("==========Server console: FirstInServerHandler.channelRead " + in.toString(CharsetUtil.UTF_8));
ctx.fireChannelRead(Unpooled.copiedBuffer("FirstInServerHandler -> ",
CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("==========Server console: FirstInServerHandler.channelReadComplete ");
super.channelReadComplete(ctx);
}
}
package com.bugStack.testHandle;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
public class FirstOutServerHandler extends ChannelOutboundHandlerAdapter {
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
System.out.println("==========Server console: FirstOutServerHandler.read ");
super.read(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("==========Server console: FirstOutServerHandler.write " + in.toString(CharsetUtil.UTF_8));
/* ctx.writeAndFlush(Unpooled.copiedBuffer(in,
Unpooled.copiedBuffer("FirstOutServerHandler", CharsetUtil.UTF_8)))
.addListener(ChannelFutureListener.CLOSE);*/
super.write(ctx, msg, promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package com.bugStack.testHandle;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class SecondInServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("==========Server console:SecondInServerHandler.channelRead " + in.toString(CharsetUtil.UTF_8));
/* ctx.write(Unpooled.copiedBuffer(in,
Unpooled.copiedBuffer("SecondInServerHandler -> ", CharsetUtil.UTF_8)));*/
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("==========Server console: SecondInServerHandler.channelReadComplete ");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package com.bugStack.testHandle;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
public class SecondOutServerHandler extends ChannelOutboundHandlerAdapter {
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
System.out.println("==========Server console: SecondOutServerHandler.read ");
super.read(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("==========Server console: SecondOutServerHandler.write" + in.toString(CharsetUtil.UTF_8));
/* ctx.writeAndFlush(Unpooled.copiedBuffer(in,
Unpooled.copiedBuffer("SecondOutServerHandler -> ", CharsetUtil.UTF_8)));*/
super.write(ctx, msg, promise);
}
}
package com.bugStack.testHandle;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class HelloClientDemo {
public static void main(String[] args) {
HelloClientDemo client = new HelloClientDemo();
client.start("localhost", 20000);
}
public void start(String ip, int port) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ClientOutHandler());
pipeline.addLast(new ClientInHandler());
}
});
ChannelFuture future = bootstrap.connect(ip, port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
package com.bugStack.testHandle;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HelloServerDemo {
public static void main(String[] args) {
HelloServerDemo server = new HelloServerDemo();
server.start(20000);
}
public void start(int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new FirstOutServerHandler());
pipeline.addLast(new SecondOutServerHandler());
pipeline.addLast(new FirstInServerHandler());
pipeline.addLast(new SecondInServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
https://blog.csdn.net/weixin_33995481/article/details/92057287