今天下午在测试RPC模块的时候,发现一个BUG,已经折磨一下午了。
BUG主要是在自定义解码器的反序列化方法上。客户端发送请求消息时,经过编码器消息编码传送到服务端,服务端在自定义解码器正常解码获取到请求消息,接着在调用服务获取到结果,服务端把结果通过管道发送给客户端。消息首先会经过自定义编码器,客户端接收后在自定义解码器调用反序列化方法时抛出异常。令人奇怪的是,第一次服务端在解码器中能正常反序列化消息,但是客户端在接收结果时,调用反序列化方法却报错。佬们有啥思路吗?
1、更改序列化算法
2、更改netty版本
最后都没有解决。
package com.dai.rpc.netty.codec;
import com.dai.rpc.compress.Compress;
import com.dai.rpc.constant.CompressTypeEnum;
import com.dai.rpc.constant.MyRpcConstants;
import com.dai.rpc.constant.SerializationTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
public class MyRpcEncoder extends MessageToByteEncoder {
private static final AtomicInteger ATOMICINTEGER = new AtomicInteger(0);
/**
* 每一次发送数据都会经过编码器
* 编码器职责:按照协议自定义数据
* @param channelHandlerContext
* @param rpcMessage
* @param byteBuf
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage, ByteBuf byteBuf) throws Exception {
/**
* 1. 4B magic code(魔法数)
* 2. 1B version(版本)
* 3. 4B full length(整个报文消息长度)
* 4. 1B messageType(消息类型)
* 5. 1B codec(序列化类型)
* 6. 1B compress(压缩类型)
* 7. 4B requestId(请求的Id)
* 8. body(object类型数据)
*/
byte codecType = rpcMessage.getCodec();
byte compressType = rpcMessage.getCompress();
byte messageType = rpcMessage.getMessageType();
byteBuf.writeBytes(MyRpcConstants.RPC_MAGIC_TABLE);
byteBuf.writeByte(MyRpcConstants.RPC_VERSION);
// 报文消息长度预留
// 无参writerIndex作用:返回当前写下标。有参writerIndex作用:更改写下标
byteBuf.writerIndex(byteBuf.writerIndex() + 4);
byteBuf.writeByte(messageType);
byteBuf.writeByte(codecType);
byteBuf.writeByte(compressType);
// 使用原子类保证请求id唯一
int requestId = MyRpcEncoder.ATOMICINTEGER.getAndIncrement();
byteBuf.writeInt(requestId);
// 写入数据
int fullLength = MyRpcConstants.TOTAL_LENGTH;
byte[] bodyBuff = null;
// 先要序列化
Serializer serializer = loadSerializer(codecType);
bodyBuff = serializer.serialize(rpcMessage.getData());
// 压缩
Compress compress = loadCompress(compressType);
bodyBuff = compress.compress(bodyBuff);
fullLength += bodyBuff.length;
byteBuf.writeBytes(bodyBuff);
int writeIndex = byteBuf.writerIndex();
// 更改写下标将fullLength写入之前预留的位置
byteBuf.writerIndex(writeIndex - fullLength + MyRpcConstants.RPC_MAGIC_TABLE.length + 1);
byteBuf.writeInt(fullLength);
// 更改写下标
byteBuf.writerIndex(writeIndex);
}
private Serializer loadSerializer(byte codecType) {
// 根据spi技术找到需要加载的实现类
String serializerName = SerializationTypeEnum.getName(codecType);
ServiceLoader load = ServiceLoader.load(Serializer.class);
for (Serializer serializer : load) {
if(serializer.name().equals(serializerName)) return serializer;
}
throw new MyRpcException("未找到对应的序列化类型");
}
private Compress loadCompress(byte compressType) {
// 根据spi技术找到需要加载的实现类
String compressName = CompressTypeEnum.getName(compressType);
ServiceLoader load = ServiceLoader.load(Compress.class);
for (Compress compress : load) {
if(compress.name().equals(compressName)) return compress;
}
throw new MyRpcException("未找到对应的压缩类型");
}
}
package com.dai.rpc.netty.codec;
import com.dai.rpc.compress.Compress;
import com.dai.rpc.constant.CompressTypeEnum;
import com.dai.rpc.constant.MessageTypeEnum;
import com.dai.rpc.constant.MyRpcConstants;
import com.dai.rpc.constant.SerializationTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.message.RpcRequest;
import com.dai.rpc.message.RpcResponse;
import com.dai.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.util.ServiceLoader;
/**
* RPC框架解码器
* LengthFieldBasedFrameDecoder内部解决TCP粘包、拆包问题
*/
public class MyRpcDecoder extends LengthFieldBasedFrameDecoder {
/**
* 1. 4B magic code(魔法数)
* 2. 1B version(版本)
* 3. 4B full length(整个报文消息长度)
* 4. 1B messageType(消息类型)
* 5. 1B codec(序列化类型)
* 6. 1B compress(压缩类型)
* 7. 4B requestId(请求的Id)
* 8. body(object类型数据)
*/
public MyRpcDecoder(){
super(1024 * 1024 * 8, 5, 4, -9, 0);
}
/**
*
* @param maxFrameLength 最大帧长度。它决定可以接收的数据的最大长度。如果超过,数据将被丢弃,根据实际环境定义
* @param lengthFieldOffset 数据长度字段开始的偏移量, magic code+version=长度为5
* @param lengthFieldLength 消息长度的大小 full length(消息长度) 长度为4
* @param lengthAdjustment 补偿值 lengthAdjustment+数据长度取值=长度字段之后剩下包的字节数(x + 16=7 so x = -9)
* @param initialBytesToStrip 忽略的字节长度,如果要接收所有的header+body 则为0,如果只接收body 则为header的长度 ,我们这为0
*/
public MyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object decode = super.decode(ctx, in);
// 解码
if(decode instanceof ByteBuf){
ByteBuf frame = (ByteBuf) decode;
if(frame.readableBytes() < MyRpcConstants.TOTAL_LENGTH){
throw new MyRpcException("消息长度不符,格式有误");
}
return decodeFrame(frame);
}
return decode;
}
private Object decodeFrame(ByteBuf frame) {
// 1.检查魔法数
checkMagicCode(frame);
// 2.检查版本号是否在定义范围内
checkVersion(frame);
// 3.读取消息长度
int fullLength = frame.readInt();
// 4.读取消息类型
byte messageType = frame.readByte();
// 5.读取序列化类型
byte codecType = frame.readByte();
// 6.读取压缩类型
byte compressType = frame.readByte();
// 7.读取请求id
int requestId = frame.readInt();
// 8.读取具体数据
int bodyLength = fullLength - MyRpcConstants.TOTAL_LENGTH;
RpcMessage message = RpcMessage.builder().codec(codecType)
.messageType(messageType)
.compress(compressType)
.requestId(requestId)
.build();
if(bodyLength > 0){
byte[] bodyData = new byte[bodyLength];
frame.readBytes(bodyData);
// 发送方发过来时,先是序列化,再压缩
// 接收方接收时,应该先解压缩再反序列化
// 解压缩
Compress compress = loadCompress(compressType);
bodyData = compress.decompress(bodyData);
// 反序列化
Serializer serializer = loadSerializer(codecType);
// 根据消息类型把数据反序列化并设置到message中
if(MessageTypeEnum.REQUEST.getCode() == messageType){
RpcRequest rpcRequest = (RpcRequest) serializer.deserialize(bodyData, RpcRequest.class);
message.setData(rpcRequest);
}
if(MessageTypeEnum.RESPONSE.getCode() == messageType){
RpcResponse rpcResponse = (RpcResponse) serializer.deserialize(bodyData, RpcResponse.class);
message.setData(rpcResponse);
}
}
return message;
}
private Serializer loadSerializer(byte codecType) {
// 根据spi技术找到需要加载的实现类
String serializerName = SerializationTypeEnum.getName(codecType);
ServiceLoader load = ServiceLoader.load(Serializer.class);
for (Serializer serializer : load) {
if(serializer.name().equals(serializerName)) return serializer;
}
throw new MyRpcException("未找到对应的序列化类型");
}
private Compress loadCompress(byte compressType) {
// 根据spi技术找到需要加载的实现类
String compressName = CompressTypeEnum.getName(compressType);
ServiceLoader load = ServiceLoader.load(Compress.class);
for (Compress compress : load) {
if(compress.name().equals(compressName)) return compress;
}
throw new MyRpcException("未找到对应的压缩类型");
}
private void checkVersion(ByteBuf frame) {
byte version = frame.readByte();
if(version != MyRpcConstants.RPC_VERSION){
throw new MyRpcException("版本号不符,格式有误");
}
}
private void checkMagicCode(ByteBuf frame) {
// 检查魔法值,不对就抛出异常
byte[] magics = new byte[MyRpcConstants.RPC_MAGIC_TABLE.length];
frame.readBytes(magics);
for(int i = 0;i < magics.length;i ++){
if(magics[i] != MyRpcConstants.RPC_MAGIC_TABLE[i]){
throw new MyRpcException("魔法值不符,格式有误");
}
}
}
}
package com.dai.rpc.netty.handler;
import com.dai.rpc.constant.MessageTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.factory.SingletonFactory;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.message.RpcRequest;
import com.dai.rpc.message.RpcResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter {
private MyRequestHandler myRequestHandler;
public MyNettyServerHandler() {
this.myRequestHandler = SingletonFactory.getInstance(MyRequestHandler.class);
}
/**
* 编解码之后的消息经由处理器处理
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 服务端接收消息
// 接收客户端发来的数据,数据肯定包括要调用的服务提供者的接口和方法
// 解析消息,去找到对应的服务提供者,然后调用得到结果,发消息给客户端即可
try{
if(msg instanceof RpcMessage){
// 如果接收到的消息是RpcMessage,那么进行处理
RpcMessage message = (RpcMessage) msg;
byte messageType = message.getMessageType();
if(MessageTypeEnum.REQUEST.getCode() == messageType){
// 如果是请求消息,那么请求调用服务获取结果并返回
RpcRequest data = (RpcRequest)message.getData();
Object result = myRequestHandler.handler(data);
// 装入结果
if(ctx.channel().isActive() && ctx.channel().isWritable()){
RpcResponse
不知道你这个问题是否已经解决, 如果还没有解决的话: