导师要求做基于netty的多线程多客户通讯仿真系统,请问需要学些什么方面的知识呀

研0,导师要求做基于netty的多线程多客户通讯仿真系统,请问需要学些什么方面的知识呀

@ada; 请你帮助一下

package ProcessRequest;


import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URLDecoder;
import java.net.URLEncoder;

import org.apache.log4j.Logger;
import Utility.ExecutorGroup;
import Utility.FileHandlerFunction;
import Utility.HTTPDefineConstant;
import Utility.HTTPResponse;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.EventExecutorGroup;

public class DownloadOneHugeFileProcess extends ChannelInboundHandlerAdapter{
	// 日志
	private static final Logger logger = Logger.getLogger(DownloadOneHugeFileProcess.class);

	private String userID = "";
	
	private ChannelHandlerContext ctx = null;

	// 执行耗时操作的线程池
	private EventExecutorGroup group = ExecutorGroup.getInstance().getExecutorGroup();
	
	private String fileName;
	
	//客户端发送的需要返回的文件里面序号段,这里序号段从0开始传输文件
	private long sequenceNumber = 0;
	
	private int dataPackageNumber = 0;
	
	// 设置为每次传输5m大小的数据,实现单个文件速度的快递提高,让客户端能马上获得文件
	// 让客户端马上能播放文件,实现一边播放一边下载
	private final int chunkSize = 5*1024*1024;
	
	private RandomAccessFile downloadFile;
	
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		this.ctx = ctx;
		this.userID = "" + ctx.channel().remoteAddress();
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		this.ctx = ctx;
		this.userID = "" + ctx.channel().remoteAddress();
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		if (msg instanceof FullHttpRequest) {
			FullHttpRequest request = (FullHttpRequest) msg;
			// 获取请求方法名
			String method = request.method().toString();
			// 下载文件(2022-03-25)
			if ("DownloadOneFileRequest".equals(method)) {
				System.out.println("DownloadOneHugeFileRequest Process");
				
				/*
				 * 服务器根据客户端发送的数据序号,从文件中截取指定长度的文件发送回客户端
				 * */
				fileName =URLDecoder.decode(request.headers().get("oneFileName"), "UTF-8");
				System.out.println("下载的文件名"+fileName);
				sequenceNumber = Long.parseLong(request.headers().get("sequenceNumber"));
				startSend(System.getProperty("user.dir") + "/AudioAndVideo/" + fileName, "668");

				((FullHttpRequest) msg).release();
			} else {
				// 激活下一个Handler
				ctx.fireChannelRead(msg);
				ctx.channel().pipeline().remove(this);
			}
	
		} else {
			ctx.fireChannelRead(msg);
		}
	}

	
	
	public void startSend(String path, String statusCode) {
		// 提交耗时任务给线程池
		group.submit(new Runnable() {
			@Override
			public void run() {
				try {
					
					/*
					 *downloadFile的内置开始位置在getContent()中设置 
					 *这个文件在等全部分包完成再关闭吧
					 * */
					downloadFile = new RandomAccessFile(path, "r");

					long fileLen = downloadFile.length();
					
					// 文件不为空且文件长度小于0
					if (fileLen < 0 && downloadFile != null) {
						logger.error("读取待批改文件遇到异常!");
						downloadFile.close();
					} 
					else {

						dataPackageNumber = (int) (fileLen/(chunkSize));
						int tem = (int) (fileLen%(chunkSize) );
						if(tem != 0) {
							//有余数
							dataPackageNumber++;		
						}
					}
					//生成一个临时文件
					sendHugeFile(statusCode,sequenceNumber,createOnePartSmallFile());	
					
				} catch (IOException e) {
					logger.error(e);
					e.printStackTrace();
				}
			}
		});
	}
	
	private void sendHugeFile( String statusCode ,long number, RandomAccessFile temFile) throws IOException {
		
		HttpResponseStatus status = null;
		switch (statusCode) {
		case "668":
			status = HTTPDefineConstant.DOWNLOAD_ONEFILE_SUCCESS;
			break;
		case "669":
			status = HTTPDefineConstant.DOWNLOAD_ONEFILE_FAIL;
			break;
		default:
			break;
		}		
		
		if (null == ctx.pipeline().get(ChunkedWriteHandler.class)) {
			ctx.pipeline().addBefore("downloadOneHugeFile", "chunked writer", new ChunkedWriteHandler()); 
		}
		
		ChannelFuture sendFileFuture;
		HttpResponse response;
		if(number != dataPackageNumber-1)
		{
			response = HTTPResponse.downloadOneHugeFileResponse(status, temFile.length() , URLEncoder.encode(fileName, "UTF-8") , String.valueOf(number) , "NO");
			ctx.write(response);
		}else {
			logger.info("sequenceNumber 文件包序号为:"+ number + " 为最后一个文件包");
			response = HTTPResponse.downloadOneHugeFileResponse(status, temFile.length() , URLEncoder.encode(fileName, "UTF-8") , String.valueOf(number) , "YES");
			ctx.write(response);
		}
		sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(temFile,0, (long)temFile.length(),(int)temFile.length()) ), ctx.newProgressivePromise());
	
		sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
			@Override
			public void operationComplete(ChannelProgressiveFuture future) throws Exception {
				if (future.isSuccess()) {
					try {
						temFile.close();
						logger.info("downloadFile1文件关闭成功");
					} catch (IOException e) {
						logger.error("downloadFile文件关闭出现异常:" + e);
					}
//					try {
//						temFile.write(new byte[] {1, 2, 3});
//		                
//		            } catch (IOException e) {
//		                e.printStackTrace();    //如果流已经关闭,这里会报
//		            }
					//进行删除文件的操作
					String temFilePath = System.getProperty("user.dir") + "/AudioAndVideo/" + number + fileName;
					if(FileHandlerFunction.delete(temFilePath))
					{
						System.out.println("成功删除:"+temFilePath);
					}else {
						System.out.println("删除失败:"+temFilePath);
						
					}	
					
					if(number == dataPackageNumber-1)
					{
						try {
							downloadFile.close();
							logger.info("最后一个downloadFile文件关闭成功");
						} catch (IOException e) {
							logger.error("downloadFile文件关闭出现异常:" + e);
						}
						
						logger.info("下载文件 :" + temFile.getFD() +  " 传输完毕\n接收用户:" + userID);
						ctx.pipeline().remove("downloadOneHugeFile");
						ctx.pipeline().remove("chunked writer");
					}
				}
				else {//发送失败的情况,重新发送
					sendHugeFile(statusCode,number,temFile);
				}
			}
			@Override
			public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
					throws Exception {
			}
		});
		
		
	}
	
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		logger.error(cause);
	}

	private RandomAccessFile createOnePartSmallFile() throws IOException
	{
		long fileLen = downloadFile.length();
//		logger.info("客户端请求传输大文件 fileLen:"+ fileLen);		
//		logger.info("客户端请求传输大文件 dataPackageNumber:"+ dataPackageNumber);	
			
		String temFilePath = System.getProperty("user.dir") + "/AudioAndVideo/" + sequenceNumber + fileName;
		RandomAccessFile temFile = null;
		try {
			temFile = new RandomAccessFile(temFilePath, "rw");
		} catch (IOException e) {
			logger.error("文件打开出现异常:" + e);
		}	
		temFile.seek(0);
			
		if(dataPackageNumber-1 != sequenceNumber )
		{
			logger.info("生成下载文件,文件序号为:"+ sequenceNumber);
			temFile.write(getContent(sequenceNumber*chunkSize,chunkSize, downloadFile));
		}
		else {
			
			int tem1 = (int) (fileLen%(chunkSize) );
			if(tem1 != 0) {
				//有余数
				logger.info("生成最后一个文件,有余数,文件序号为:"+sequenceNumber);
				long temChunkSize  = fileLen-(sequenceNumber*chunkSize);
				temFile.write(getContent(sequenceNumber*temChunkSize,temChunkSize, downloadFile));
				
			}else {
				//无余数
				logger.info("生成最后一个文件,无余数,文件序号为:"+sequenceNumber);
				temFile.write(getContent(sequenceNumber*chunkSize,chunkSize, downloadFile));
				
			}
		}
		
		return temFile;
	}

	
	private byte[] getContent(long off , long len, RandomAccessFile downloadFile) throws IOException {
		downloadFile.seek(off);
		byte[] bytes = new byte[(int) len];	
		downloadFile.read(bytes,(int)0, (int)len);
		return bytes;
	}
	
}
  • 您还可以看一下 黄桂钊老师的基于Netty解决粘包拆包的问题课程中的 基于Netty演示粘包拆包问题小节, 巩固相关知识点
  • 以下回答由chatgpt基于相关博客总结生成:

    基于Netty的多线程多客户通讯仿真系统所需的学习要点

    为成功完成基于Netty的多线程多客户通讯仿真系统的开发,你需要学习以下知识和技能:

    1. Netty框架:了解Netty框架的基本原理和使用方法。Netty是一个高性能的网络编程框架,它提供了简单且强大的API,可以用于开发高性能的网络通讯应用。

    2. 多线程编程:学习多线程编程的基本概念和技巧,以实现多客户端的同时通讯。Netty本身就是基于多线程模型实现的,所以你需要熟悉多线程编程的基本原理和使用方法。

    3. 网络编程基础:了解基本的网络编程概念和协议,例如TCP/IP、HTTP等。网络编程是基于网络的通讯应用开发的基础,你需要了解网络编程的基本原理和相关协议。

    4. 自定义处理器:学习如何创建自定义的处理器,实现自己的业务逻辑。你可以参考参考资料中提供的示例代码,自定义处理器可以用来处理接收到的消息、发送消息等操作。

    下面是一个示例代码,展示了如何使用Netty框架创建一个基于Netty的多线程多客户通讯仿真系统:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.*;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    public class NettyServer {
    
        public static void main(String[] args) throws Exception {
            // 创建两个EventLoopGroup,一个负责接收客户端的连接,一个负责处理客户端的请求
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                // 创建ServerBootstrap对象,用于启动服务器
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class) // 指定使用NIO方式的通道
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                // 添加编码器和解码器,用于处理字节流与字符串的相互转换
                                pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                                pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                                // 添加自定义处理器,用于处理接收到的消息
                                pipeline.addLast("handler", new NettyServerHandler());
                            }
                        });
    
                // 绑定端口,开始接收客户端连接
                ChannelFuture future = serverBootstrap.bind(8888).sync();
                System.out.println("Server started and listen on " + future.channel().localAddress());
    
                // 等待服务器关闭
                future.channel().closeFuture().sync();
            } finally {
                // 关闭EventLoopGroup,释放资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("Received message from client: " + buf.toString(CharsetUtil.UTF_8));
            // TODO: 处理接收到的消息
    
            // 向客户端发送响应消息
            ByteBuf response = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("New client connected: " + ctx.channel().remoteAddress());
            // TODO: 处理新的客户端连接
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    public class NettyClient {
    
        public static void main(String[] args) throws Exception {
            // 创建EventLoopGroup,用于处理客户端请求
            EventLoopGroup group = new NioEventLoopGroup();
    
            try {
                // 创建Bootstrap对象,用于启动客户端
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class) // 指定使用NIO方式的通道
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                // 添加编码器和解码器,用于处理字节流与字符串的相互转换
                                pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                                pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                                // 添加自定义处理器,用于处理接收到的消息
                                pipeline.addLast("handler", new NettyClientHandler());
                            }
                        });
    
                // 连接服务器,并发送一条消息
                ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
                System.out.println("Client connected to server");
    
                // 发送消息给服务器
                ByteBuf msg = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
                future.channel().writeAndFlush(msg);
    
                // 等待连接断开
                future.channel().closeFuture().sync();
            } finally {
                // 关闭EventLoopGroup,释放资源
                group.shutdownGracefully();
            }
        }
    }
    
    class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("Received message from server: " + buf.toString(CharsetUtil.UTF_8));
            // TODO: 处理接收到的消息
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ByteBuf msg = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
            ctx.writeAndFlush(msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    以上示例代码展示了如何使用Netty框架创建一个基于Netty的多线程多客户通讯仿真系统。你可以根据自己的需求和业务逻辑进行进一步的扩展和开发。希望以上信息对你有所帮助,祝你顺利完成这个项目!

简单的说就是java,稍微复杂点java的springboot+netty+mysql(你自己看用哪个数据库),基于netty,你也不用考虑太多,就直接看netty在java中的用法就行,其他知识你基本应该都学过,跟socket那种似的