Java教程:Netty知识点详解(八)

开课吧开课吧锤锤2021-02-23 15:25

近年来,随着云计算、大数据等热门技术的发展和普及,Java在企业中越来越受到重视,公司对于Java人才需求也是每年都在增长,对于这样热门的行业,很多人都是心动的,那么对于如何学习,怎么提高Java的技术,那就来看看本篇文章吧。

NioEventLoopGroup 初始化代码分析

上面说了基本的了解内容,下面具体分析,从 NioEventLoopGroup 的初始化进入源码分析。

入口我们直接找 NioEventLoopGroup 的无参构造。

    public NioEventLoopGroup() {

        this(0);

    }

    public NioEventLoopGroup(int nThreads) {

        // 第二个参数是这个group所包含的executor

        this(nThreads, (Executor) null);

    }

    public NioEventLoopGroup(int nThreads, Executor executor) {

        // 第三个参数是provider,其用于提供selector及selectable的channel,

        // 这个provider是当前JVM中唯一的一个单例的provider

        this(nThreads, executor, SelectorProvider.provider());

    }

    public NioEventLoopGroup(

            int nThreads, Executor executor, final SelectorProvider selectorProvider) {

        // 第四个参数是一个选择策略工厂实例

        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);

    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,

                             final SelectStrategyFactory selectStrategyFactory) {

        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());

    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {

        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

    }

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {

        // 第三个参数是选择器工厂实例

        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);

    }

跟到此,可以发现无参构造的基本参数被初始化, nThreads :DEFAULT_EVENT_LOOP_THREADS//默认当前CPU逻辑核心数的两倍,selectorProvide:SelectorProvider.provider()//当前JVM中唯一的一个单例的provider,SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默认选择策略工厂实例,chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//选择器工厂实例。到这里只是基本的初始化参数,重点方法为MultithreadEventExecutorGroup 的构造方法。下面重点分析:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,

                                            EventExecutorChooserFactory chooserFactory, Object... args) {

        if (nThreads <= 0) {

            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));

        }

        if (executor == null) {

            // 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程

            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {

            boolean success = false;

            try {

                // 创建eventLoop

                children[i] = newChild(executor, args);

                success = true;

            } catch (Exception e) {

                throw new IllegalStateException("failed to create a child event loop", e);

            } finally {

                // 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop

                if (!success) {

                    // 关闭之前所有已经创建好的eventLoop

                    for (int j = 0; j < i; j ++) {

                        children[j].shutdownGracefully();

                    }

                    // 终止所有eventLoop上所执行的任务

                    for (int j = 0; j < i; j ++) {

                        EventExecutor e = children[j];

                        try {

                            while (!e.isTerminated()) {

                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

                            }

                        } catch (InterruptedException interrupted) {

                            Thread.currentThread().interrupt();

                            break;

                        }

                    }

                }

            }

        }

        // 创建一个选择器

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {

            @Override

            public void operationComplete(Future<Object> future) throws Exception {

                if (terminatedChildren.incrementAndGet() == children.length) {

                    terminationFuture.setSuccess(null);

                }

            }

        };

        for (EventExecutor e: children) {

            e.terminationFuture().addListener(terminationListener);

        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);

        Collections.addAll(childrenSet, children);

 

        readonlyChildren = Collections.unmodifiableSet(childrenSet);

    }

根据无参构造直接往下跟,可以看到核心部分在最后一个父类的构造里。也就是 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)。

再这里完成整个 NioEventLoopGroup 的实例初始化,这里分析下,然后再画个图回顾下。

初始化构造参数中的 Executor 参数,当其为空时,将其初始化

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

首先 newDefaultThreadFactory()) 创建默认的线程工厂,有兴趣可以跟进去看看。然后再创建ThreadPerTaskExecutor线程 Executor 对象。(PS:这里创建的 Executor 就是 NioEventLoopGroup 内的 Executor 对象,并不是当前 NioEventLoopGroup 自身,可以称其为 总 Executor)。

然后可以看到这里创建了一个 children 数组,根据需要创建的线程数创建对应数量的数组。

children = new EventExecutor[nThreads];

因为每个 NioEventLoopGroup 都是 NioEventLoop 的集合,所以这里的 children 数组就是当前 NioEventLoopGroup 的 NioEventLoop。所以 NioEventLoop 的创建的实在 NioEventLoopGroup 初始化的时候。下面看 NioEventLoop 的初始化:

// 逐个创建nioEventLoop实例

for (int i = 0; i < nThreads; i ++) {

    boolean success = false;

    try {

        // 创建eventLoop

        children[i] = newChild(executor, args);

        success = true;

    } catch (Exception e) {

        // TODO: Think about if this is a good exception type

        throw new IllegalStateException("failed to create a child event loop", e);

    } finally {

        // 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop

        if (!success) {

            // 闭之前所有已经创建好的eventLoop

            for (int j = 0; j < i; j ++) {

                children[j].shutdownGracefully();

            }

            // 终止所有eventLoop上所执行的任务

            for (int j = 0; j < i; j ++) {

                EventExecutor e = children[j];

                try {

                    while (!e.isTerminated()) {

                        e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

                    }

                } catch (InterruptedException interrupted) {

                    // Let the caller handle the interruption.

                    Thread.currentThread().interrupt();

                    break;

                }

            }

        }

    }

}

先整体看这段 NioEventLoop 的创建代码,可以看到整个过程中存在一个成功标志,catch 每个 NioEventLoop 创建完成过程,如果发生异常则将所有已经创建的 NioEventLoop 关闭。重点的代码也就在 NioEventLoop 的创建了。所以我们继续跟:children[i] = newChild(executor, args);往下走,直接找到 io.netty.channel.nio.NioEventLoopGroup#newChild ,因为当前是 NioEventLoopGroup 的创建,所以知道找到子类的 newChild 实现。

@Override

protected EventLoop newChild(Executor executor, Object... args) throws Exception {

    return new NioEventLoop(this, executor, (SelectorProvider) args[0],

            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);

}

又将之前合并的 args 参数强转回来,继续跟进 NioEventLoop 构造:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,

             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {

    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

    if (selectorProvider == null) {

        throw new NullPointerException("selectorProvider");

    }

    if (strategy == null) {

        throw new NullPointerException("selectStrategy");

    }

    provider = selectorProvider;

    // 创建一个selector的二元组

    final SelectorTuple selectorTuple = openSelector();

    selector = selectorTuple.selector;

    unwrappedSelector = selectorTuple.unwrappedSelector;

    selectStrategy = strategy;

}

这里我们先整体看下,将之前的默认参数初始化到 NioEventLoop 属性中。其中有两处:openSelector()super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler)。这里先看父类构造:

往下跟,直接就是 SingleThreadEventLoop -> SingleThreadEventExecutor 的初始化,这些也可以在 NioEventLoop 的继承体系可以看到:

// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,

                                boolean addTaskWakesUp, int maxPendingTasks,

                                RejectedExecutionHandler rejectedExecutionHandler) {

    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

    // 创建一个收尾队列

    tailTasks = newTaskQueue(maxPendingTasks);

}

// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,

                                    boolean addTaskWakesUp, int maxPendingTasks,

                                    RejectedExecutionHandler rejectedHandler) {

    super(parent);

    this.addTaskWakesUp = addTaskWakesUp;

    this.maxPendingTasks = Math.max(16, maxPendingTasks);

    // 这是当前NioEventLoop所包含的executor

    this.executor = ThreadExecutorMap.apply(executor, this);

    // 创建一个任务队列

    taskQueue = newTaskQueue(this.maxPendingTasks);

    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");

}

这里首先创建的是 SingleThreadEventExecutor ,这里重点需要关注的代码是:

this.executor = ThreadExecutorMap.apply(executor, this);

这里this 是 NioEventLoop ,所以this.executor就是前面说的 NioEventLoop 里的 Executor,这里我们先称为 子 Executor(子:对应的就是 NioEventLoop ,前面说的 总:对应的是 NioEventLoopGroup )。

而这里 子 Executor 初始化是由一个 executor 参数的,这个就是前面 NioEventLoopGroup 构造方法一直带入总 Executor。那我们继续往下跟,看看这个子 Executor 是如何完成的初始化的。

    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {

        ObjectUtil.checkNotNull(executor, "executor");

        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");

        // 这里创建的executor是子executor

        return new Executor() {

            // 这个execute()是子executor的execute()

            @Override

            public void execute(final Runnable command) {

                // 这里调用了NioEventLoopGroup所包含的executor的execute()

                // 即调用了“总的executor”的execute()

                executor.execute(apply(command, eventExecutor));

            }

        };

    }

这段代码细看就会明白,这里创建的 子 Executor的创建也就是一个线程的创建,但是重点却在这个线程 Executor 的 execute()方法实现,只做了一件事情:就是调用 传入的 总 Executorexecute()方法。所以这里 子 Executor 做的事情就是调用 总 Executor execute()。不要觉得这里绕,因为这还只是初始化,后面这里执行会更绕。

其实这里的 apply(command, eventExecutor),这里再执行 总 Executor execute() 时还是会记录当前正在执行的线程,并且再执行完成时将当前记录值删除。

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {

    ObjectUtil.checkNotNull(command, "command");

    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");

    return new Runnable() {

        @Override

        public void run() {

            setCurrentEventExecutor(eventExecutor);

            try {

                command.run();

            } finally {

                setCurrentEventExecutor(null);

            }

        }

    };

}

这里再 NioEventLoop 的属性 Executor 创建完成时,又去创建了一个普通任务队列taskQueue = newTaskQueue(this.maxPendingTasks);并且还创建了一个收尾任务队列tailTasks = newTaskQueue(maxPendingTasks);。这几个队列后面会说到。这里继续跟 NioEventLoop 主流程初始化。

到这我们再回去看看 openSelector(),这里我们要先知道 SelectorTuple :

private static final class SelectorTuple {

    final Selector unwrappedSelector;  // NIO原生selector

    final Selector selector;  // 优化过的selector

    SelectorTuple(Selector unwrappedSelector) {

        this.unwrappedSelector = unwrappedSelector;

        this.selector = unwrappedSelector;

    }

    SelectorTuple(Selector unwrappedSelector, Selector selector) {

        this.unwrappedSelector = unwrappedSelector;

        this.selector = selector;

    }

}

SelectorTuple 只是一个包含两个 Selector 的内部类,用于封装优化前后的 Selector。而 openSelector() 方法就是为了返回 Selector 并且根据配置判断是否需要优化当前 Selector 。下面看具体代码:

Java

而具体的优化过程有兴趣的可以自己去看看,这里只要知道,若是禁用了优化则 SelectorTuple 的优化后的 Selector 和为优化的 Selector 均为 Nio 原生的 Selector。

而这io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)后面还有在 NioEventLoop 数组创建完成后,还有选择器创建和关闭监听器绑定等,感兴趣可以自己看看,这里不再介绍。

Java

到这一个 NioEventLoop 的创建过程的代码也全部看完了。我想如果只看这个肯定还是有点懵,源码这个东西需要自己跟进去去看,debug 一点点的跟,跟着运行的代码去想为何这么实现,不过这里我也画个图,让大家更直观的了解到 NioEventLoopGroup 的创建流程以及主要操作。

Java

我想大家结合这个图,再结合上面的分析过程,最好可以自己找到源码,跟一遍,应该可以理解 NioEvnetLoopGroup 的创建。

以上内容由开课吧老师敖丙提供,更多Java教程尽在开课吧广场Java教程频道。更多免费课程可以关注公众号码农集散地”

有用
分享