channelFuture.awaitUninterruptibly

程序在调用下面一段代码时发生死锁,可能有哪些原因导致死锁

执行代码:
channelFuture = cb.connect(new InetSocketAddress(this.host, this.port));
channelFuture.awaitUninterruptibly(connTimeout, TimeUnit.MILLISECONDS);

异常内容:
2018-03-08 14:37:19[ERROR][New I/O worker #28][com.cmbchina.ocean.transport.netty.TcpLongConnectionServerHandler:124] 异常:await*() in I/O thread causes a dead lock or sudden performance drop. Use addListener() instead or call await*() from a different thread.
java.lang.IllegalStateException: await*() in I/O thread causes a dead lock or sudden performance drop. Use addListener() instead or call await*() from a different thread.
at org.jboss.netty.channel.DefaultChannelFuture.checkDeadLock(DefaultChannelFuture.java:342)
at org.jboss.netty.channel.DefaultChannelFuture.await0(DefaultChannelFuture.java:306)
at org.jboss.netty.channel.DefaultChannelFuture.awaitUninterruptibly(DefaultChannelFuture.java:277)
at com.cmbchina.ocean.transport.tcp.TCPClient.connect(TCPClient.java:351)

地址端口是否正确,防火墙是否阻止,网络连接是否正常,超时设置是过短。

https://www.cnblogs.com/sorheart/p/3195099.html

https://www.cnblogs.com/sorheart/p/3195099.html
详细原因可参考上面链接,大概原因是在handler里面调用await方法导致死锁异常。
上一篇文件浅析了Netty中的事件驱动过程,这篇主要写一下异步相关的东东。

转载内容:

首先,什么是异步了?

异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

异步的好处是不会造成阻塞,在高并发情形下会更稳定和更高的吞吐量。

说到Netty中的异步,就不得不提ChannelFuture。Netty中的IO操作是异步的,包括bind、write、connect等操作会简单的返回一个ChannelFuture,调用者并不能立刻获得结果。

当future对象刚刚创建时,处于非完成状态。可以通过isDone()方法来判断当前操作是否完成。通过isSuccess()判断已完成的当前操作是否成功,getCause()来获取已完成的当前操作失败的原因,isCancelled()来判断已完成的当前操作是否被取消。

调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

其实同步的阻塞和异步的非阻塞可以直接通过代码看出:

这是一段阻塞的代码:

复制代码
printTime("开始connect: ");
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

    // Wait until the connection is closed or the connection attempt fails.
    future.getChannel().getCloseFuture().awaitUninterruptibly();

    printTime("connect结束: ");
    // Shut down thread pools to exit.
    bootstrap.releaseExternalResources();

复制代码
这段代码的输出结果是:

开始connect: 2013-07-17 14:45:28

connect结束: 2013-07-17 14:45:29
很明显的可以看出,connect操作导致整段代码阻塞了大概1秒。

以下这段是异步非阻塞的代码:

复制代码
printTime("开始connect: ");
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

    future.addListener(new ChannelFutureListener()
    {
        public void operationComplete(final ChannelFuture future)
            throws Exception
        {
            printTime("connect结束: ");
        }
    });

    printTime("异步时间: ");

    // Shut down thread pools to exit.
    bootstrap.releaseExternalResources();

复制代码
输出结果是:

开始connect: 2013-07-17 14:50:09
异步时间: 2013-07-17 14:50:09
connect结束: 2013-07-17 14:50:09
可以明显的看出,在异步模式下,上面这段代码没有阻塞,在执行connect操作后直接执行到printTime("异步时间: "),随后connect完成,future的监听函数输出connect操作完成。

关于同步的阻塞和异步的非阻塞可以打一个很简单的比方,A向B打电话,通知B做一件事。

在同步模式下,A告诉B做什么什么事,然后A依然拿着电话,等待B做完,才可以做下一件事;

在异步模式下,A告诉B做什么什么事,A挂电话,做自己的事。B做完后,打电话通知A做完了。

如上面代码所显示的,ChannelFuture同时提供了阻塞和非阻塞方法,接下来就简单的分析一下各自是怎么实现的。

阻塞方法是await系列,这些方法要小心翼翼的使用,不可以在handler内调用这些方法,否则会造成死锁。

复制代码
public ChannelFuture awaitUninterruptibly() {
boolean interrupted = false;
synchronized (this) {
//循环等待到完成
while (!done) {
checkDeadLock();
waiters++;
try {
wait();
} catch (InterruptedException e) {
//不允许中断
interrupted = true;
} finally {
waiters--;
}
}
}

    if (interrupted) {
        Thread.currentThread().interrupt();
    }

    return this;
}

复制代码
一个标志位,一个while循环,代码简洁明了。

非阻塞则是添加监听类ChannelFutureListener,通过覆盖ChannelFutureListener的operationComplete执行业务逻辑。

复制代码
public void addListener(final ChannelFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}

    boolean notifyNow = false;
    synchronized (this) {
        if (done) {
            notifyNow = true;
        } else {
            if (firstListener == null) {
                //listener链表头
                firstListener = listener;
            } else {
                if (otherListeners == null) {
                    otherListeners = new ArrayList<ChannelFutureListener>(1);
                }
                //添加到listener链表中,以便操作完成后遍历操作
                otherListeners.add(listener);
            }

           ......

    if (notifyNow) {
        //通知listener进行处理
        notifyListener(listener);
    }
}

复制代码
然后当操作完成后直接遍历listener链表,把每个listener取出来执行。以setSuccess为例,如下:

复制代码
public boolean setSuccess() {
synchronized (this) {
// Allow only once.
if (done) {
return false;
}

        done = true;
        //唤醒所有等待
        if (waiters > 0) {
            notifyAll();
        }
    }

    //通知所有listener
    notifyListeners();
    return true;
}

复制代码
复制代码
private void notifyListeners() {
if (firstListener != null) {
//执行listener表头
notifyListener(firstListener);
firstListener = null;

        //挨个执行其余的listener
        if (otherListeners != null) {
            for (ChannelFutureListener l: otherListeners) {
                notifyListener(l);
            }
            otherListeners = null;
        }
    }
}

复制代码

其实这部分代码的逻辑很简单,就是注册回调函数,当操作完成后自动调用回调函数,就达到了异步的效果。