在学习AIO的时候,使用CompletionHander来进行异步调用,当处理完读取的数据后 是使用ByteBuffer来进行传输数据的,但是如果传输的数据大小比ByteBuffer的容量大的话,怎么判断该次回调是同一个请求 还是两个请求。
如果在 Client 端输入超过4个字符的内容,就会出现服务器端的read方法被回调了两次或以上 怎么判断用户输入的内容是属于同一次的相应呢,或者说 怎么把这几次响应的内容连在一起
public class AIOServer {
static final int PORT = 30000;
static final String UTF_8 = "utf-8";
static List channelList = new ArrayList<>();
public static void main(String[] args) throws IOException {
AIOServer aios = new AIOServer();
aios.startListen();
System.in.read();
}
public void startListen() throws IOException {
// 创建一个线程池
ExecutorService executor = Executors.newFixedThreadPool(20);
// 以指定的线程池来创建一个AsynchronousChannelGroup
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
// 以指定线程池创建一个AsynchronousServerSocketChannel
AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(channelGroup)
.bind(new InetSocketAddress(PORT));
// 使用CompletionHandler接收来自客户端的请求
serverChannel.accept(null, new AcceptHandler(serverChannel));
}
class AcceptHandler implements CompletionHandler {
private AsynchronousServerSocketChannel serverChannel = null;
// 定义一个ByteBuffer用于读取数据
ByteBuffer buff = ByteBuffer.allocate(4);
public AcceptHandler(AsynchronousServerSocketChannel sc) {
this.serverChannel = sc;
}
@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
AIOServer.channelList.add(sc);
// 准备客户端的下一次连接
serverChannel.accept(null, this);
System.out.println("completed");
// 如果传输的数据大于buff的大小 会重复调用该read方法
sc.read(buff, null, new CompletionHandler() {
int i = 0;
/**
* 主要就是下面这个回调方法 ⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇
* 主要就是下面这个回调方法 ⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇
* 主要就是下面这个回调方法 ⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇
* 主要就是下面这个回调方法 ⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇
* 这里想实现的是,任意一客户端发送数据,服务器端都可以转发给所有人,多人聊天室的效果
* 如果在 Client 端输入超过4个字符的内容,这个方法就会被回调了两次或以上
*/
@Override
public void completed(Integer result, Object attachment) {
System.out.println("attachment:" + attachment);
System.out.println("result" + result);
buff.flip();
System.out.println(sc);
// 将buff中的内容转换成字符串
String content = StandardCharsets.UTF_8.decode(buff).toString();
System.out.println(content);
for (AsynchronousSocketChannel asc : AIOServer.channelList) {
try {
// write返回的是future对象(有返回值的线程) 通过调用该对象的方法 可以确定线程已经被执行完毕了
asc.write(ByteBuffer.wrap(content.getBytes(AIOServer.UTF_8))).get();
} catch (UnsupportedEncodingException | InterruptedException | ExecutionException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
buff.clear();
// 准备下一次的读取
sc.read(buff, null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读取数据失败:" + exc);
// 读取失败的话就将该channelList从集合中删除
AIOServer.channelList.remove(serverChannel);
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读取数据失败:" + exc);
// 读取失败的话就将该channelList从集合中删除
AIOServer.channelList.remove(serverChannel);
// 异常关闭也需要准备客户端下一次连接
serverChannel.accept(null, this);
}
}
}
//--------------------------下面是示例的Client类--------------
public class AIOClient {
final static int PORT = 30000;
// 与服务器异步通信的Channel
AsynchronousSocketChannel clientChannel;
public void init() {
Scanner scanner = new Scanner(System.in);
String content = "";
while (true) {
System.out.println("请输入内容:");
content = scanner.nextLine();
try {
// clientChannel.write(ByteBuffer.wrap(content.getBytes("utf-8"))).get();
clientChannel.write(ByteBuffer.wrap(content.getBytes("utf-8")), content.length(),
new CompletionHandler() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println(result+"----------"+attachment);
}
@Override
public void failed(Throwable exc, Object attachment) {
// TODO 自动生成的方法存根
}
});
} catch (UnsupportedEncodingException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
}
public void connect() throws IOException, InterruptedException, ExecutionException {
final ByteBuffer buff = ByteBuffer.allocate(1024);
// 定义一个线程池 容量为80
ExecutorService executor = Executors.newFixedThreadPool(80);
// 创建一个AsynchronousChannelGroup 分组管理器 绑定对应的线程池 通过该线程池来处理IO事件和触发CompletionHandler
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
// 通过open方法创建对应异步通信的管道 并且绑定对应的ChannelGroup
clientChannel = AsynchronousSocketChannel.open(channelGroup);
// 绑定需要访问的网址和对应的端口 Client方 用connect方法获取连接 Server方使用bind方法监听连接
clientChannel.connect(new InetSocketAddress("127.0.0.1", PORT)).get();
// 准备读取服务器的内容 将position设置为0 limit为Buffer的总长度
buff.clear();
clientChannel.read(buff, null, new CompletionHandler() {
@Override
public void completed(Integer result, Object attachment) {
buff.flip();
String content = StandardCharsets.UTF_8.decode(buff).toString();
System.out.println("有人说:" + content);
buff.clear();
clientChannel.read(buff, content.length(), this);
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读取数据失败" + exc);
}
});
}
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
AIOClient aioc = new AIOClient();
aioc.connect();
aioc.init();
System.in.read();
}
}
当使用AIO进行异步通信时,每次读取数据时都会创建一个新的CompletionHandler实例。如果希望将多次回调视为单个求的一部分,就可以在CompletionHandler的实例中存储状态信息,以便在每次回调时更新它。还可以在CompletionHandler中使用ByteBuffer数组来存储多次读取的数据,并在读取完所有数据后将它们合并在一起。
例如可以在CompletionHandler的实例中定义一个ByteBuffer数组和一个索引变量,该索引变量表示下一个要写入的ByteBuffer的索引。在每次回调时,可以将数据写入该索引指向的ByteBuffer中,然后将索引值加一。当读取完所有数据后,可以将这些ByteBuffer合并在一起,然后对其进行处理。
仅供参考,望采纳,谢谢。