开课吧开课吧锤锤2021-02-23 15:25
近年来,随着云计算、大数据等热门技术的发展和普及,Java在企业中越来越受到重视,公司对于Java人才需求也是每年都在增长,对于这样热门的行业,很多人都是心动的,那么对于如何学习,怎么提高Java的技术,那就来看看本篇文章吧。
NioEventLoopGroup 初始化代码分析
上面说了基本的了解内容,下面具体分析,从 NioEventLoopGroup 的初始化进入源码分析。
入口我们直接找 NioEventLoopGroup 的无参构造。
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
// 第二个参数是这个group所包含的executor
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// 第三个参数是provider,其用于提供selector及selectable的channel,
// 这个provider是当前JVM中唯一的一个单例的provider
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 第四个参数是一个选择策略工厂实例
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// 第三个参数是选择器工厂实例
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
跟到此,可以发现无参构造的基本参数被初始化, nThreads :DEFAULT_EVENT_LOOP_THREADS//默认当前CPU逻辑核心数的两倍,selectorProvide:SelectorProvider.provider()//当前JVM中唯一的一个单例的provider,SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默认选择策略工厂实例,chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//选择器工厂实例。到这里只是基本的初始化参数,重点方法为MultithreadEventExecutorGroup 的构造方法。下面重点分析:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
if (!success) {
// 关闭之前所有已经创建好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 终止所有eventLoop上所执行的任务
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建一个选择器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
根据无参构造直接往下跟,可以看到核心部分在最后一个父类的构造里。也就是 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)。
再这里完成整个 NioEventLoopGroup 的实例初始化,这里分析下,然后再画个图回顾下。
初始化构造参数中的 Executor 参数,当其为空时,将其初始化
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
首先 newDefaultThreadFactory()) 创建默认的线程工厂,有兴趣可以跟进去看看。然后再创建ThreadPerTaskExecutor线程 Executor 对象。(PS:这里创建的 Executor 就是 NioEventLoopGroup 内的 Executor 对象,并不是当前 NioEventLoopGroup 自身,可以称其为 总 Executor)。
然后可以看到这里创建了一个 children 数组,根据需要创建的线程数创建对应数量的数组。
children = new EventExecutor[nThreads];
因为每个 NioEventLoopGroup 都是 NioEventLoop 的集合,所以这里的 children 数组就是当前 NioEventLoopGroup 的 NioEventLoop。所以 NioEventLoop 的创建的实在 NioEventLoopGroup 初始化的时候。下面看 NioEventLoop 的初始化:
// 逐个创建nioEventLoop实例
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
if (!success) {
// 闭之前所有已经创建好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 终止所有eventLoop上所执行的任务
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
先整体看这段 NioEventLoop 的创建代码,可以看到整个过程中存在一个成功标志,catch 每个 NioEventLoop 创建完成过程,如果发生异常则将所有已经创建的 NioEventLoop 关闭。重点的代码也就在 NioEventLoop 的创建了。所以我们继续跟:children[i] = newChild(executor, args);往下走,直接找到 io.netty.channel.nio.NioEventLoopGroup#newChild ,因为当前是 NioEventLoopGroup 的创建,所以知道找到子类的 newChild 实现。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
又将之前合并的 args 参数强转回来,继续跟进 NioEventLoop 构造:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 创建一个selector的二元组
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
这里我们先整体看下,将之前的默认参数初始化到 NioEventLoop 属性中。其中有两处:openSelector() 和 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler)。这里先看父类构造:
往下跟,直接就是 SingleThreadEventLoop -> SingleThreadEventExecutor 的初始化,这些也可以在 NioEventLoop 的继承体系可以看到:
// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 创建一个收尾队列
tailTasks = newTaskQueue(maxPendingTasks);
}
// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 这是当前NioEventLoop所包含的executor
this.executor = ThreadExecutorMap.apply(executor, this);
// 创建一个任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
这里首先创建的是 SingleThreadEventExecutor ,这里重点需要关注的代码是:
this.executor = ThreadExecutorMap.apply(executor, this);
这里this 是 NioEventLoop ,所以this.executor就是前面说的 NioEventLoop 里的 Executor,这里我们先称为 子 Executor(子:对应的就是 NioEventLoop ,前面说的 总:对应的是 NioEventLoopGroup )。
而这里 子 Executor 的初始化是由一个 executor 参数的,这个就是前面 NioEventLoopGroup 构造方法一直带入的 总 Executor。那我们继续往下跟,看看这个子 Executor 是如何完成的初始化的。
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 这里创建的executor是子executor
return new Executor() {
// 这个execute()是子executor的execute()
@Override
public void execute(final Runnable command) {
// 这里调用了NioEventLoopGroup所包含的executor的execute()
// 即调用了“总的executor”的execute()
executor.execute(apply(command, eventExecutor));
}
};
}
这段代码细看就会明白,这里创建的 子 Executor的创建也就是一个线程的创建,但是重点却在这个线程 Executor 的 execute()方法实现,只做了一件事情:就是调用 传入的 总 Executor 的 execute()方法。所以这里 子 Executor 做的事情就是调用 总 Executor 的 execute()。不要觉得这里绕,因为这还只是初始化,后面这里执行会更绕。
其实这里的 apply(command, eventExecutor),这里再执行 总 Executor 的 execute() 时还是会记录当前正在执行的线程,并且再执行完成时将当前记录值删除。
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
这里再 NioEventLoop 的属性 Executor 创建完成时,又去创建了一个普通任务队列taskQueue = newTaskQueue(this.maxPendingTasks);并且还创建了一个收尾任务队列tailTasks = newTaskQueue(maxPendingTasks);。这几个队列后面会说到。这里继续跟 NioEventLoop 主流程初始化。
到这我们再回去看看 openSelector(),这里我们要先知道 SelectorTuple :
private static final class SelectorTuple {
final Selector unwrappedSelector; // NIO原生selector
final Selector selector; // 优化过的selector
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
SelectorTuple 只是一个包含两个 Selector 的内部类,用于封装优化前后的 Selector。而 openSelector() 方法就是为了返回 Selector 并且根据配置判断是否需要优化当前 Selector 。下面看具体代码:
而具体的优化过程有兴趣的可以自己去看看,这里只要知道,若是禁用了优化则 SelectorTuple 的优化后的 Selector 和为优化的 Selector 均为 Nio 原生的 Selector。
而这io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)后面还有在 NioEventLoop 数组创建完成后,还有选择器创建和关闭监听器绑定等,感兴趣可以自己看看,这里不再介绍。
到这一个 NioEventLoop 的创建过程的代码也全部看完了。我想如果只看这个肯定还是有点懵,源码这个东西需要自己跟进去去看,debug 一点点的跟,跟着运行的代码去想为何这么实现,不过这里我也画个图,让大家更直观的了解到 NioEventLoopGroup 的创建流程以及主要操作。
我想大家结合这个图,再结合上面的分析过程,最好可以自己找到源码,跟一遍,应该可以理解 NioEvnetLoopGroup 的创建。
以上内容由开课吧老师敖丙提供,更多Java教程尽在开课吧广场Java教程频道。更多免费课程可以关注公众号“码农集散地”

最新文章

如何在大规模 Kubernetes 集群上实现高 SLO?
Pod创建成功率:这是一个非常重要的指标,蚂蚁集团一周的Pod创建量在百万级别,如果成功率波动会造成大量Pod失败,同时Pod成功率下跌也是集群异常的最直观反映;
2021-03-05 16:20:04

MySQL不建议delete删除数据的原因是什么?(二)
MySQL内部不会真正删除空间,而且做标记删除,即将delflag:N修改为delflag:Y,commit之后会会被purge进入删除链表,如果下一次insert更大的记录,delete之后的空间不会被重用,如果插入的记录小于等于delete的记录空会被重用,这块内容可以通过知数堂的innbloc
2021-03-05 11:45:44

MySQL不建议delete删除数据的原因是什么?(一)
物理上主要由系统用户数据文件,日志文件组成,数据文件主要存储MySQL字典数据和用户数据,日志文件记录的是datapage的变更记录,用于MySQLCrash时的恢复。
2021-03-05 11:36:22

Java教程:MySQL如何设计索引更高效?(五)
同时也介绍了如何更好做MySQL索引设计,包括前缀索引,复合索引的顺序问题以及MySQL8.0推出的索引跳跃扫描,我们都知道,索引可以加快数据的检索,减少IO开销,会占用磁盘空间,是一种用空间换时间的优化手段,同时更新操作会导致索引频繁的合并分裂,影响索引性能,在实际的业务开发中,如何根据业务场景去
2021-03-05 11:29:12

Java教程:MySQL如何设计索引更高效?(四)
在单列索引不能很好的过滤数据的时候,可以结合where条件中其他字段来创建复合索引,更好的去过滤数据,减少IO的扫描次数,举个例子:业务需要按照时间段来查询交易记录,有如下的SQL:
2021-03-05 11:25:02