研0,导师要求做基于netty的多线程多客户通讯仿真系统,请问需要学些什么方面的知识呀
@ada; 请你帮助一下
package ProcessRequest;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URLDecoder;
import java.net.URLEncoder;
import org.apache.log4j.Logger;
import Utility.ExecutorGroup;
import Utility.FileHandlerFunction;
import Utility.HTTPDefineConstant;
import Utility.HTTPResponse;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.EventExecutorGroup;
public class DownloadOneHugeFileProcess extends ChannelInboundHandlerAdapter{
// 日志
private static final Logger logger = Logger.getLogger(DownloadOneHugeFileProcess.class);
private String userID = "";
private ChannelHandlerContext ctx = null;
// 执行耗时操作的线程池
private EventExecutorGroup group = ExecutorGroup.getInstance().getExecutorGroup();
private String fileName;
//客户端发送的需要返回的文件里面序号段,这里序号段从0开始传输文件
private long sequenceNumber = 0;
private int dataPackageNumber = 0;
// 设置为每次传输5m大小的数据,实现单个文件速度的快递提高,让客户端能马上获得文件
// 让客户端马上能播放文件,实现一边播放一边下载
private final int chunkSize = 5*1024*1024;
private RandomAccessFile downloadFile;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
this.userID = "" + ctx.channel().remoteAddress();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
this.userID = "" + ctx.channel().remoteAddress();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
// 获取请求方法名
String method = request.method().toString();
// 下载文件(2022-03-25)
if ("DownloadOneFileRequest".equals(method)) {
System.out.println("DownloadOneHugeFileRequest Process");
/*
* 服务器根据客户端发送的数据序号,从文件中截取指定长度的文件发送回客户端
* */
fileName =URLDecoder.decode(request.headers().get("oneFileName"), "UTF-8");
System.out.println("下载的文件名"+fileName);
sequenceNumber = Long.parseLong(request.headers().get("sequenceNumber"));
startSend(System.getProperty("user.dir") + "/AudioAndVideo/" + fileName, "668");
((FullHttpRequest) msg).release();
} else {
// 激活下一个Handler
ctx.fireChannelRead(msg);
ctx.channel().pipeline().remove(this);
}
} else {
ctx.fireChannelRead(msg);
}
}
public void startSend(String path, String statusCode) {
// 提交耗时任务给线程池
group.submit(new Runnable() {
@Override
public void run() {
try {
/*
*downloadFile的内置开始位置在getContent()中设置
*这个文件在等全部分包完成再关闭吧
* */
downloadFile = new RandomAccessFile(path, "r");
long fileLen = downloadFile.length();
// 文件不为空且文件长度小于0
if (fileLen < 0 && downloadFile != null) {
logger.error("读取待批改文件遇到异常!");
downloadFile.close();
}
else {
dataPackageNumber = (int) (fileLen/(chunkSize));
int tem = (int) (fileLen%(chunkSize) );
if(tem != 0) {
//有余数
dataPackageNumber++;
}
}
//生成一个临时文件
sendHugeFile(statusCode,sequenceNumber,createOnePartSmallFile());
} catch (IOException e) {
logger.error(e);
e.printStackTrace();
}
}
});
}
private void sendHugeFile( String statusCode ,long number, RandomAccessFile temFile) throws IOException {
HttpResponseStatus status = null;
switch (statusCode) {
case "668":
status = HTTPDefineConstant.DOWNLOAD_ONEFILE_SUCCESS;
break;
case "669":
status = HTTPDefineConstant.DOWNLOAD_ONEFILE_FAIL;
break;
default:
break;
}
if (null == ctx.pipeline().get(ChunkedWriteHandler.class)) {
ctx.pipeline().addBefore("downloadOneHugeFile", "chunked writer", new ChunkedWriteHandler());
}
ChannelFuture sendFileFuture;
HttpResponse response;
if(number != dataPackageNumber-1)
{
response = HTTPResponse.downloadOneHugeFileResponse(status, temFile.length() , URLEncoder.encode(fileName, "UTF-8") , String.valueOf(number) , "NO");
ctx.write(response);
}else {
logger.info("sequenceNumber 文件包序号为:"+ number + " 为最后一个文件包");
response = HTTPResponse.downloadOneHugeFileResponse(status, temFile.length() , URLEncoder.encode(fileName, "UTF-8") , String.valueOf(number) , "YES");
ctx.write(response);
}
sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(temFile,0, (long)temFile.length(),(int)temFile.length()) ), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
if (future.isSuccess()) {
try {
temFile.close();
logger.info("downloadFile1文件关闭成功");
} catch (IOException e) {
logger.error("downloadFile文件关闭出现异常:" + e);
}
// try {
// temFile.write(new byte[] {1, 2, 3});
//
// } catch (IOException e) {
// e.printStackTrace(); //如果流已经关闭,这里会报
// }
//进行删除文件的操作
String temFilePath = System.getProperty("user.dir") + "/AudioAndVideo/" + number + fileName;
if(FileHandlerFunction.delete(temFilePath))
{
System.out.println("成功删除:"+temFilePath);
}else {
System.out.println("删除失败:"+temFilePath);
}
if(number == dataPackageNumber-1)
{
try {
downloadFile.close();
logger.info("最后一个downloadFile文件关闭成功");
} catch (IOException e) {
logger.error("downloadFile文件关闭出现异常:" + e);
}
logger.info("下载文件 :" + temFile.getFD() + " 传输完毕\n接收用户:" + userID);
ctx.pipeline().remove("downloadOneHugeFile");
ctx.pipeline().remove("chunked writer");
}
}
else {//发送失败的情况,重新发送
sendHugeFile(statusCode,number,temFile);
}
}
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
throws Exception {
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error(cause);
}
private RandomAccessFile createOnePartSmallFile() throws IOException
{
long fileLen = downloadFile.length();
// logger.info("客户端请求传输大文件 fileLen:"+ fileLen);
// logger.info("客户端请求传输大文件 dataPackageNumber:"+ dataPackageNumber);
String temFilePath = System.getProperty("user.dir") + "/AudioAndVideo/" + sequenceNumber + fileName;
RandomAccessFile temFile = null;
try {
temFile = new RandomAccessFile(temFilePath, "rw");
} catch (IOException e) {
logger.error("文件打开出现异常:" + e);
}
temFile.seek(0);
if(dataPackageNumber-1 != sequenceNumber )
{
logger.info("生成下载文件,文件序号为:"+ sequenceNumber);
temFile.write(getContent(sequenceNumber*chunkSize,chunkSize, downloadFile));
}
else {
int tem1 = (int) (fileLen%(chunkSize) );
if(tem1 != 0) {
//有余数
logger.info("生成最后一个文件,有余数,文件序号为:"+sequenceNumber);
long temChunkSize = fileLen-(sequenceNumber*chunkSize);
temFile.write(getContent(sequenceNumber*temChunkSize,temChunkSize, downloadFile));
}else {
//无余数
logger.info("生成最后一个文件,无余数,文件序号为:"+sequenceNumber);
temFile.write(getContent(sequenceNumber*chunkSize,chunkSize, downloadFile));
}
}
return temFile;
}
private byte[] getContent(long off , long len, RandomAccessFile downloadFile) throws IOException {
downloadFile.seek(off);
byte[] bytes = new byte[(int) len];
downloadFile.read(bytes,(int)0, (int)len);
return bytes;
}
}
为成功完成基于Netty的多线程多客户通讯仿真系统的开发,你需要学习以下知识和技能:
Netty框架:了解Netty框架的基本原理和使用方法。Netty是一个高性能的网络编程框架,它提供了简单且强大的API,可以用于开发高性能的网络通讯应用。
多线程编程:学习多线程编程的基本概念和技巧,以实现多客户端的同时通讯。Netty本身就是基于多线程模型实现的,所以你需要熟悉多线程编程的基本原理和使用方法。
网络编程基础:了解基本的网络编程概念和协议,例如TCP/IP、HTTP等。网络编程是基于网络的通讯应用开发的基础,你需要了解网络编程的基本原理和相关协议。
自定义处理器:学习如何创建自定义的处理器,实现自己的业务逻辑。你可以参考参考资料中提供的示例代码,自定义处理器可以用来处理接收到的消息、发送消息等操作。
下面是一个示例代码,展示了如何使用Netty框架创建一个基于Netty的多线程多客户通讯仿真系统:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.*;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class NettyServer {
public static void main(String[] args) throws Exception {
// 创建两个EventLoopGroup,一个负责接收客户端的连接,一个负责处理客户端的请求
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建ServerBootstrap对象,用于启动服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定使用NIO方式的通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加编码器和解码器,用于处理字节流与字符串的相互转换
pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
// 添加自定义处理器,用于处理接收到的消息
pipeline.addLast("handler", new NettyServerHandler());
}
});
// 绑定端口,开始接收客户端连接
ChannelFuture future = serverBootstrap.bind(8888).sync();
System.out.println("Server started and listen on " + future.channel().localAddress());
// 等待服务器关闭
future.channel().closeFuture().sync();
} finally {
// 关闭EventLoopGroup,释放资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("Received message from client: " + buf.toString(CharsetUtil.UTF_8));
// TODO: 处理接收到的消息
// 向客户端发送响应消息
ByteBuf response = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
ctx.writeAndFlush(response);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("New client connected: " + ctx.channel().remoteAddress());
// TODO: 处理新的客户端连接
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class NettyClient {
public static void main(String[] args) throws Exception {
// 创建EventLoopGroup,用于处理客户端请求
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建Bootstrap对象,用于启动客户端
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class) // 指定使用NIO方式的通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加编码器和解码器,用于处理字节流与字符串的相互转换
pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
// 添加自定义处理器,用于处理接收到的消息
pipeline.addLast("handler", new NettyClientHandler());
}
});
// 连接服务器,并发送一条消息
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
System.out.println("Client connected to server");
// 发送消息给服务器
ByteBuf msg = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
future.channel().writeAndFlush(msg);
// 等待连接断开
future.channel().closeFuture().sync();
} finally {
// 关闭EventLoopGroup,释放资源
group.shutdownGracefully();
}
}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("Received message from server: " + buf.toString(CharsetUtil.UTF_8));
// TODO: 处理接收到的消息
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
以上示例代码展示了如何使用Netty框架创建一个基于Netty的多线程多客户通讯仿真系统。你可以根据自己的需求和业务逻辑进行进一步的扩展和开发。希望以上信息对你有所帮助,祝你顺利完成这个项目!
简单的说就是java,稍微复杂点java的springboot+netty+mysql(你自己看用哪个数据库),基于netty,你也不用考虑太多,就直接看netty在java中的用法就行,其他知识你基本应该都学过,跟socket那种似的