Java教程:Netty知识点详解(十七)

开课吧开课吧锤锤2021-02-24 14:56

近年来,随着云计算、大数据等热门技术的发展和普及,Java在企业中越来越受到重视,公司对于Java人才需求也是每年都在增长,对于这样热门的行业,很多人都是心动的,那么对于如何学习,怎么提高Java的技术,那就来看看本篇文章吧。

Tag 2:doResolveAndConnect0 分析

上面 Client 端的 channel 初始化与注册也看了一遍。下面我们继续看 doResolveAndConnect0 方法:io.netty.bootstrap.Bootstrap#doResolveAndConnect0

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,

                                           final SocketAddress localAddress, final ChannelPromise promise) {

    try {

        final EventLoop eventLoop = channel.eventLoop();

        // 创建一个地址解析器,其中包含一个地址格式匹配器

        final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

        // 若解析器不支持该地址 或 该地址已经解析过了,则直接对该地址进行连接,

        // 返回可修改的promise,即成功了就成功,失败了则promise中有失败信息

        if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {

            doConnect(remoteAddress, localAddress, promise);

            return promise;

        }

        // 以异步方式解析server地址

        final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

        if (resolveFuture.isDone()) {  // 处理解析完成的情况(成功或异常)

            final Throwable resolveFailureCause = resolveFuture.cause();

            if (resolveFailureCause != null) {  // 若异步解析中出现了问题,则直接关闭channel

                channel.close();

                promise.setFailure(resolveFailureCause);

            } else {  // 处理异步解析成功的情况

                // resolveFuture.getNow() 从异步对象中获取解析结果,即解析过的地址

                doConnect(resolveFuture.getNow(), localAddress, promise);

            }

            return promise;

        }

        resolveFuture.addListener(new FutureListener<SocketAddress>() {

            @Override

            public void operationComplete(Future<SocketAddress> future) throws Exception {

                if (future.cause() != null) {

                    channel.close();

                    promise.setFailure(future.cause());

                } else {

                    doConnect(future.getNow(), localAddress, promise);

                }

            }

        });

    } catch (Throwable cause) {

        promise.tryFailure(cause);

    }

    return promise;

}

 

doResolveAndConnect0 方法我们直接画个图看下逻辑流程,具体的需要我们细看的时 doConnect 的连接方法。

Java

下面我们继续跟 doConnect 方法:

private static void doConnect(

    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    final Channel channel = connectPromise.channel();

    channel.eventLoop().execute(new Runnable() {

        @Override

        public void run() {

            if (localAddress == null) {

                channel.connect(remoteAddress, connectPromise);

            } else {

                channel.connect(remoteAddress, localAddress, connectPromise);

            }

            // 为promise添加一个异常监听器。连接过程发生异常,则关闭channel

            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

        }

    });

}

这里获取 channel 中绑定的 EventLoop 调用 execute 方法,上面关于 channel 的注册绑定流程大家还有印象没?这里调用的代码就是下面这段:SingleThreadEventExecutor#execute

Java

做的事情就是添加任务,因为当前的线程就是 eventLoop ,所以 !inEventLoop == false。在这里的逻辑只是添加任务。上面在介绍的 channel 绑定注册时候走的逻辑还有启动线程,在这里连接的时候相当于线程已经启动,这里只是添加任务,最后任务会在前面介绍的 run 方法里面执行(Tag 1.3.3 eventLoop.excute() 的 run 方法执行分析)。

这里就直接看这个匿名内部内的方法,继续跟 connect 方法:io.netty.channel.AbstractChannel#connect(java.net.SocketAddress, java.net.SocketAddress, io.netty.channel.ChannelPromise)

public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {

    return pipeline.connect(remoteAddress, localAddress, promise);

}

通过 pipeline 进行连接:

public final ChannelFuture connect(

    SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {

    return tail.connect(remoteAddress, localAddress, promise);

}

pipeline 获取为节点进行调用连接:

public ChannelFuture connect(

    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (remoteAddress == null) {

        throw new NullPointerException("remoteAddress");

    }

    if (isNotValidPromise(promise, false)) {

        // cancelled

        return promise;

    }

    // 查找要处理该请求的处理器节点

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);

    // 获取处理器节点的executor

    EventExecutor executor = next.executor();

    if (executor.inEventLoop()) {

        next.invokeConnect(remoteAddress, localAddress, promise);

    } else {

        safeExecute(executor, new Runnable() {

            @Override

            public void run() {

                next.invokeConnect(remoteAddress, localAddress, promise);

            }

        }, promise, null);

    }

    return promise;

}

这里首先 findContextOutbound 找到处理器节点,后面说。然后获取处理器的 EventExecutor。执行 invokeConnect。这里主要看 invokeConnect 连接处理:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {

    if (invokeHandler()) {  // 判断该处理器节点中对应的处理器是否已经添加

        try {

            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);

        } catch (Throwable t) {

            notifyOutboundHandlerException(t, promise);

        }

    } else {

        connect(remoteAddress, localAddress, promise);

    }

}

直接继续跟connect 方法,这里找到的是匿名内部内 HeadContext: io.netty.channel.DefaultChannelPipeline.HeadContext#connect

public void connect(

    ChannelHandlerContext ctx,

    SocketAddress remoteAddress, SocketAddress localAddress,

    ChannelPromise promise) {

    // 连接

    unsafe.connect(remoteAddress, localAddress, promise);

}

获取到底层 unsafe 对象进行连接:io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

public final void connect(

    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (!promise.setUncancellable() || !ensureOpen(promise)) {

        return;

    }

    try {

        if (connectPromise != null) {

            // Already a connect in process.

            throw new ConnectionPendingException();

        }

        boolean wasActive = isActive();

        if (doConnect(remoteAddress, localAddress)) {  // 连接            

            // 省略+......

这里进行连接,找到 doConnect 连接方法:io.netty.channel.socket.nio.NioSocketChannel#doConnect

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {

    if (localAddress != null) {

        doBind0(localAddress);  // 将localAddress绑定到channel

    }

    boolean success = false;

    try {

        // 连接server地址,若本次连接成功,则成功;若不成功,则当前channel的连接就绪

        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);

        if (!connected) {

            // 指定其关注的事件为  连接就绪

            selectionKey().interestOps(SelectionKey.OP_CONNECT);

        }

        success = true;

        return connected;

    } finally {

        if (!success) {

            doClose();

        }

    }

}

doBind0(localAddress) 将Client 端指定的端口号绑定到 channel,localAddress 为配置类设置的 Client 端口号。

然后进行连接,这里首先在执行时直接进行连接,如果第一次连接成功则直接返回成功,如果失败,注册 Selector 事件 OP_CONNECT ,即将当前 channel 修改为连接就绪,后续执行到 run 方法时就会再次执行连接,直到连接成功,结束当前连接就绪。

到这就是整个 Client 的启动。整体看下来可以类比 Server 端,大体流程还是差不多的。学就完了。

以上内容由开课吧老师敖丙提供,更多Java教程尽在开课吧广场Java教程频道。更多免费课程可以关注公众号“码农集散地”

有用
分享