Java教程:Netty知识点详解(十三)

开课吧开课吧锤锤2021-02-24 11:45

近年来,随着云计算、大数据等热门技术的发展和普及,Java在企业中越来越受到重视,公司对于Java人才需求也是每年都在增长,对于这样热门的行业,很多人都是心动的,那么对于如何学习,怎么提高Java的技术,那就来看看本篇文章吧。

Tag 1.3 config().group().register(channel) 将channel注册到selector

channle 注册到 Selector 的代码分析:

// 

config().group().register(channel);

config().group() :这里就是 Bootstrap 的 EventLoopGroup,而这里是 Server 端的 ServerBootstrap 所以这个其实就是 parentGroup。那这里我们需要找到 register 的方法实现:

Java

这里因为 group 是 NioEventLoopGroup,根据 NioEventLoopGroup 的继承体系就可以直接找到 实现 io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)。因为只有 MultithreadEventLoopGroup 在其继承体系中。所以找到代码我们继续:

public ChannelFuture register(Channel channel) {

    // next() 从eventLoop数组中选择一个eventLoop

    return next().register(channel);

}

这里需要了解下 next() 方法,因为我们现在是 eventLoopGroup next() 就是从当前 group 中获取一个 EventLoop,然后这里在继续跟进需要找 EventLoop 继承体系中实现 register 方法的类:SingleThreadEventLoop

Java

public ChannelFuture register(Channel channel) {

    // 创建一个 ChannelPromise 然后注册

    return register(new DefaultChannelPromise(channel, this));

}

// ----> 这里继续调用 unsafe 的 register

public ChannelFuture register(final ChannelPromise promise) {

    ObjectUtil.checkNotNull(promise, "promise");

    promise.channel().unsafe().register(this, promise);

    return promise;

}

这里调用的 unsafe 的 register 方法,在初始化 eventLoop 的时候说过这个 unsafe 的初始化。是我们直接跟进:

io.netty.channel.AbstractChannel.AbstractUnsafe#register

@Override

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

    // 对异常情况的处理

    if (eventLoop == null) {

        throw new NullPointerException("eventLoop");

    }

    if (isRegistered()) {

        promise.setFailure(new IllegalStateException("registered to an event loop already"));

        return;

    }

    if (!isCompatible(eventLoop)) {

        promise.setFailure(

            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));

        return;

    }

    // channel与eventLoop的绑定就发生在这里,

    // 需要注意,这里的eventLoop还没有绑定线程,因为这个线程还没有创建

    AbstractChannel.this.eventLoop = eventLoop;

    // 判断当前线程与eventLoop所绑定线程是否是同一个线程

    if (eventLoop.inEventLoop()) {

        register0(promise);

    } else {

        try {

            // 执行当前线程所绑定的eventLoop的execute(), 这个execute()会将参数任务写入到任务队列,并创建启动新的线程

            eventLoop.execute(new Runnable() {

                @Override

                public void run() {

                    register0(promise);

                }

            });

        } catch (Throwable t) {

            logger.warn(

                "Force-closing a channel whose registration task was not accepted by an event loop: {}",

                AbstractChannel.this, t);

            closeForcibly();

            closeFuture.setClosed();

            safeSetFailure(promise, t);

        }

    }

}

AbstractUnsafe#register:

1、首先判断当前操作是否存在异常情况。

2、将 eventLoop 保存到 channel 的 eventLoop 的属性中(channel 与 eventLoop 的绑定),注意:这里的 eventLoop 里面还没有绑定 thread。

3、判断 EventLoop 的 thread 是否是当前线程:eventLoop.inEventLoop()。这里断点看一下,初始化的时候这里 eventLoop 中的 thread = null。所以这里返回 false。

4、执行当前线程绑定的 eventLoop 的 excute() 方法。执行传入的 runnable ,主要是做的是将参数任务写入到任务队列,并创建启动新的线程

5、runnable 中的 run 方法实现: register0(promise);

这里标记三个 Tag

Tag 1.3.1 register0(promise) 也就是上面的第 5 步。

Tag 1.3.2 eventLoop.excute() 执行分析 也就是上面的第 4 步。

**Tag 1.3.3 eventLoop.excute() 的 run 方法执行分析 ** 也就是 Tag 1.3.2 最后执行起来的 run 方法

Tag 1.3.1 register0(promise)

直接跟进 register0(promise);

private void register0(ChannelPromise promise) {

    try {

        if (!promise.setUncancellable() || !ensureOpen(promise)) {

            return;

        }

        boolean firstRegistration = neverRegistered;

        doRegister();  // 绑定

        neverRegistered = false;

        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);

        pipeline.fireChannelRegistered();

        if (isActive()) {

            if (firstRegistration) {

                pipeline.fireChannelActive();

            } else if (config().isAutoRead()) {

                beginRead();

            }

        }

    } catch (Throwable t) {

        closeForcibly();

        closeFuture.setClosed();

        safeSetFailure(promise, t);

    }

}

这里其他操作感兴趣的自己可以进去看看,这边我们主要看 register 流程,直接找 doRegister(); 的绑定代码:

Java

protected void doRegister() throws Exception {

    boolean selected = false;

    for (;;) {

        try {

            // 在这里进行了注册,将NIO原生channel注册到了NIO原生selector

            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

            return;

        } catch (CancelledKeyException e) {

            if (!selected) {

                eventLoop().selectNow();

                selected = true;

            } else {

                throw e;

            }

        }

    }

}

这里就是 channel 注册 Selector 的代码:

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this)

1、javaChannel() :这里获取原生的 Nio Channel,跟进去可以找到这里返回的是 AbstractNioChannel#ch 的 channel。在前面 NioEventGroup 初始化的时候说过这个 NIO Channel 的初始化的过程。

2、然后调用 NIO Channel 的 Regsiter 方法

3、Regsiter 方法中首先传入的是 unwrappedSelector 前面初始化的 selector 数组。第二个参数 0 ,就是当前监听的的事件, 0 表示不关注任何事件。为什么这里子 Channel 注册的是不关注任何事件? 在前面看到的 Channel 注册一个指定的关注事件:SelectionKey.OP_ACCEPT 连接事件,那个 channel 是 Netty 封装的 channel,哪里监听了连接事件之后,只要关注客户端的连接,当 netty 封装的 channel 获取到连接就绪的 channel 的时候就可以拿到当前 channel 需要注册事件了,然后这个时候就可以指定 原生 NIO channel 的需要关注的事件。所以这里默认不关注任何事件就是为后续修改其需要关注指定类型的就绪事件。

到这里 register0 的方法说完。前面还有 EventLoop 的线程 thread 的事情没有说明白,也就是 eventLoop 的 excute() 方法执行过程做了什么,返回去找到代码:io.netty.channel.AbstractChannel.AbstractUnsafe#register

Tag 1.3.2 eventLoop.excute() 执行分析

前面还有 EventLoop 的线程 thread 的事情没有说明白,也就是 eventLoop 的 excute() 方法执行过程做了什么,返回去找到代码:io.netty.channel.AbstractChannel.AbstractUnsafe#register

// 刚刚往里面跟的是 register0 现在再说一下 execute 

eventLoop.execute(new Runnable() {

    @Override

    public void run() {

        register0(promise);

    }

});

往下跟,找到 io.netty.util.concurrent.SingleThreadEventExecutor#execute, eventLoop 的父类:

public void execute(Runnable task) {

    if (task == null) {

        throw new NullPointerException("task");

    }

    // 判断当前线程与eventLoop所绑定线程是否是同一个

    boolean inEventLoop = inEventLoop();

    // 将任务添加到任务队列

    addTask(task);

    if (!inEventLoop) {

        // 创建并启动一个线程

        startThread();

        if (isShutdown()) {

            boolean reject = false;

            try {

                if (removeTask(task)) {

                    reject = true;

                }

            } catch (UnsupportedOperationException e) {

            }

            if (reject) {

                reject();

            }

        }

    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {

        wakeup(inEventLoop);

    }

}

首先这个参数: **Runnable task ** 其实就是刚刚我们跟过的方法 register0.

1、首先判断当前 inEventLoop(); 当前线程是否是 EventLoop 中的 thtrad。这里还是 false。

2、将任务添加到任务队列。跟下去可以找到 taskQueue.offer(task) ,这里的 taskQueue 任务队列就在跟创建 eventLoop 时 newChild 中初始化创建的.

 

Java

inEventLoop = false。首先做的是:startThread.

继续跟进:startThread()

private void startThread() {

    // 若当前eventLoop所绑定线程尚未启动

    if (state == ST_NOT_STARTED) {

        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {

            try {

                // 创建并启动一个线程

                doStartThread();

            } catch (Throwable cause) {

                STATE_UPDATER.set(this, ST_NOT_STARTED);

                PlatformDependent.throwException(cause);

            }

        }

    }

}

首先判断当前 eventLoop 所绑定线程尚未启动,然后使用 CAS 修改当前线程的启动状态 ,修改成功则执行 doStartThread()创建并启动一个线程,继续跟:

private void doStartThread() {

    assert thread == null;

    // 调用NioEventLoop所包含的executor的execute()

    // 这个execute()会创建并启动一个线程

    executor.execute(new Runnable() {

        @Override

        public void run() {

            thread = Thread.currentThread();

            if (interrupted) {

                thread.interrupt();

            }

            boolean success = false;

            updateLastExecutionTime();

            try {

                // 执行了一个不会停止的for,用于完成任务队列中的任务

                SingleThreadEventExecutor.this.run();

                success = true;

            } catch (Throwable t) {

                logger.warn("Unexpected exception from an event executor: ", t);

            } finally {

                // 省略......

        }

    });

}

这里调用了 NioEventLoop 所包含的 executor 的 execute() 方法,也就是创建线程的逻辑,后面的具体执行逻辑,下一步部分具体看。传入了一个 Runnable。主要是执行了一个 SingleThreadEventExecutor.this.run(); 线程,用于完成任务队列的任务。后面说。这里主要说一下这个 executor.execute()执行的过程。

这里跟进下面代码可以找到之前 子 Executor 的初始化创建的匿名内部类:io.netty.util.internal.ThreadExecutorMap#apply

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {

    ObjectUtil.checkNotNull(executor, "executor");

    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");

    return new Executor() {

        @Override

        public void execute(final Runnable command) {

            // 这里调用了NioEventLoopGroup所包含的executor的execute()

            executor.execute(apply(command, eventExecutor));

        }

    };

}

所以 execute 方法执行的是这里的 execute方法:executor.execute(apply(command, eventExecutor));

而在 ThreadExecutorMap 这里的 executor 之前在 NioEventLoopGroup 初始化的时候说了,这个 executor 是 NioEventLoopGroup 初始化过成功构造方法创建的 总 executor。然后 apply 方法又将传入的 runnable 包装成了一个新的 Runnable 。

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {

    ObjectUtil.checkNotNull(command, "command");

    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");

    return new Runnable() {

        @Override

        public void run() {

            // 做了线程隔离

            setCurrentEventExecutor(eventExecutor);

            try {

                command.run();

            } finally {

                setCurrentEventExecutor(null);

            }

        }

    };

}

只是在执行 command 之前做了异步线程隔离的操作。所以到这里就是 总 executor 执行了传入了新包装的 runnable。 然后我们继续跟进这里的 executor.execute(apply(command, eventExecutor)); execute 方法。这里需要找到实现方法在这里:io.netty.util.concurrent.ThreadPerTaskExecutor#execute

public void execute(Runnable command) {

    // 创建并启动一个线程

    threadFactory.newThread(command).start();

}

threadFactory 也是之前 NioEventLoopGroup 初始化的线程工厂。这里主要用到这个 总 executor面的线程工厂来创建线程来着。而这里的 command 就是 apply() 返回的 runnable,也就是包装后的 doStartThread 中的匿名内部类 runnable。所以这里的线程 newThread(command).start() 的 start 就执行了commnnd 的 run 方法。最后就执行到 doStartThread 的里面的 run 方法。

所以到这里 EventLoop 中的 thread 的创建并且启动就都这里处理完成了。

这里也画个图说下这个调用过程:

Java

到这里我们的 eventLoop.excute() 中的创建线程并启动的流程看完了,那下面我们要单独说一下这个线程启动之后执行的 run 方法做了什么。主要就是详细说下 threadFactory.newThread(command).start(); 这个线程启动执行执行的 run 代码的解析。

Tag 1.3.3 eventLoop.excute() 的 run 方法执行分析

入口还是从 eventLoop.excute() 中进去,也就是 eventLoop 的 run 方法执行分析,找到里面的匿名内部类的 runnable 的实现:

//  对应的代码位置 io.netty.channel.AbstractChannel.AbstractUnsafe#register

eventLoop.execute(new Runnable() {

     @Override

     public void run() {

         register0(promise);

     }

 });

跟进 execute() 找到 startThread(); 在直接跟进 doStartThread()找到下面代码:

 private void doStartThread() {

        assert thread == null;

        // 调用NioEventLoop所包含的executor的execute() 这个execute()会创建并启动一个线程

        executor.execute(new Runnable() {

            @Override

            public void run() {

                thread = Thread.currentThread();

                if (interrupted) {

                    thread.interrupt();

                }

                boolean success = false;

                updateLastExecutionTime();

                try {

                    // 执行了一个不会停止的for,用于完成任务队列中的任务

                    SingleThreadEventExecutor.this.run();

                    success = true;

                } catch (Throwable t) {

                    logger.warn("Unexpected exception from an event executor: ", t);

                } finally {

               // 省略。。。

这段代码上面都跟过,这里就跳过直接找我们要看的代码:SingleThreadEventExecutor.this.run(); 这里执行了一个无限循环的代码,用来一直完成任务队列中的任务:这找到实现代码 eventLoop 中的 run() :

protected void run() {

    for (;;) {

        try {

            try {

                // 选择就绪的channel

                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

                    case SelectStrategy.CONTINUE:    // NioEventLoop不支持

                        continue;

                    case SelectStrategy.BUSY_WAIT:  // NioEventLoop不支持

                    case SelectStrategy.SELECT:  // SELECT = -1 能走到这里,说明当前任务队列中没有任务

                        // 进行阻塞式选择

                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {

                            selector.wakeup();

                        }

                    default:

                }

            } catch (IOException e) {

                rebuildSelector0();

                handleLoopException(e);

                continue;

            }

            cancelledKeys = 0;

            needsToSelectAgain = false;

            // 该变量用于设置“处理就绪channel的IO所使用的时间”与“处理任务队列中任务使用时间”的比例 该值为整型,不大于100

            final int ioRatio = this.ioRatio;

            if (ioRatio == 100) {

                try {

                    processSelectedKeys();

                } finally {

                    runAllTasks();

                }

            } else {

                // 记录处理就绪channel的IO开始执行的时间点

                final long ioStartTime = System.nanoTime();

                try {

                    // 处理就绪channel的IO

                    processSelectedKeys();

                } finally {

                    // 计算出处理就绪channel的IO所使用的时长

                    final long ioTime = System.nanoTime() - ioStartTime;

                    // 执行任务队列中的任务

                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

                }

            }

            // 省略。。。

    }

}

首先我们整体上看一下这个方法。

1、selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())

2、switch - case

3、processSelectedKeys()

4、unAllTasks()

Tag 1.3.3.1: selectStrategy.calculateStrategy

Tag 1.3.3.2 :switch - case

Tag 1.3.3.3 :processSelectedKeys()

Tag 1.3.3.4 :runAllTasks()

Tag 1.3.3.1: selectStrategy.calculateStrategy

首先找到代码:

selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

// hasTasks  tailTasks 收尾任务队列

protected boolean hasTasks() {

    return super.hasTasks() || !tailTasks.isEmpty();

}

// super.hasTasks()   taskQueue 普通任务队列

protected boolean hasTasks() {

    assert inEventLoop();

    return !taskQueue.isEmpty();

}

首先看到 hasTask : 返回当前任务队列和收尾队列是否有任务。selectNowSupplier : 匿名内部类io.netty.util.IntSupplier

继续跟进: calculateStrategy:io.netty.channel.DefaultSelectStrategy#calculateStrategy 初始化的默认选择器

// SelectStrategy.SELECT = -1

public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {

    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;

}

这里就是如果存在任务则走选择器 selectSupplier.get() 否则直接返回 -1:SELECT 。

继续跟 get 的任务选择逻辑:selectSupplier : NioEventLoop 中的内部类 IntSupplier

public int get() throws Exception {

    return selectNow();

}

// io.netty.channel.nio.NioEventLoop#selectNow

int selectNow() throws IOException {

    try {

        return selector.selectNow();

    } finally {

        // restore wakeup state if needed

        if (wakenUp.get()) {

            selector.wakeup();

        }

    }

}

1、selector.selectNow() : 方法为 NIO 的非阻塞选择,返回就绪的 channel 的数量,可以为 0。

补充:Selector 的阻塞选择和非阻塞选择的区别就是,非阻塞选则在当前 select 方法执行时判断循环判断所有的 channel 是否就绪并返回所有的就绪数量,而阻塞式选择则是阻塞指定时间直至阻塞时间内获取到就绪 channel 或者阻塞时间超时时立刻返回。

2、wakenUp.get() : 返回当前线程是否被阻塞,没有被阻塞时返回 true,当前线程被阻塞返回 false。

3、selector.wakeup() :当前线程如果被阻塞,则立刻返回 selector 结果,即唤醒当前线程。

这是 selectNow() 方法执行的结果,是一个必然大于等于 0 的结果。

所以返回 calculateStrategy 方法:如果任务队列存在任务,则通过 Selector 执行非阻塞选择返回就绪的 channel 数量,如果不存在任务,则直接返回 -1。

Tag 1.3.3.2 :switch - case

现在在返回去看 switch - case 的代码:

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

    case SelectStrategy.CONTINUE:    // -2 NioEventLoop不支持 

        continue;

    case SelectStrategy.BUSY_WAIT:  // -3 NioEventLoop不支持

    case SelectStrategy.SELECT:  // -1 能走到这里,说明当前任务队列中没有任务

        // 进行阻塞式选择

        select(wakenUp.getAndSet(false));

        if (wakenUp.get()) {

            selector.wakeup();

        }

    default:

}

因为再 selectStrategy.calculateStrategy 方法中,不可能返回 -2 和 -3。所以 case 的结果只可能走到 SelectStrategy.SELECT 或者直接 default。而只有当所有任务队列中都没有任务的时候才会返回 -1。也就意味着当任务队列中没有任务时也会景行一次阻塞式选择,通过 wakenUp.getAndSet(false) 方法将当前线程设置为阻塞状态。然后就阻塞式 select。

这里我们具体去看看这 select 阻塞选择的逻辑:

private void select(boolean oldWakenUp) throws IOException {

    Selector selector = this.selector;

    try {

        // 计数器:用于记录空轮询导致CPU占用率飙升,select()提前结束的次数(其值大于1时)

        int selectCnt = 0;

        // 获取当前时间,也就是for循环第一次开始执行的时间点

        long currentTimeNanos = System.nanoTime();

        // delayNanos() 表示定时任务队列中第一个定时任务还有多久就到开始执行的时间了

        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {

            // 处理小于0.5毫秒的任务

            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

            if (timeoutMillis <= 0) {  // 该条件为true,表示具有立即需要执行的定时任务

                if (selectCnt == 0) {  // 只有第一次for循环才会执行下面的“非阻塞选择”

                    selector.selectNow();

                    selectCnt = 1;

                }

                break;

            }

            if (hasTasks() && wakenUp.compareAndSet(false, true)) {

                selector.selectNow();

                selectCnt = 1;

                break;

            }

            

            int selectedKeys = selector.select(timeoutMillis);

            selectCnt ++;

            // 若有就绪的channel了,则直接结束

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {

                break;

            }

            // 若当前线程被中断

            if (Thread.interrupted()) {

                selectCnt = 1;

                break;

            }

            // 获取当前时间

            long time = System.nanoTime();

            // 下面的式子等价于:  time - currentTimeNanos >= timeoutMillis

            // 若下面的条件成立,则说明select()是在指定的阻塞时间过期后才跳出的,即正常结束的

            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {

                // timeoutMillis elapsed without anything selected.

                selectCnt = 1;

            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&

                       selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

                selector = selectRebuildSelector(selectCnt);  // 重构selector

                selectCnt = 1;

                break;

            }

            // 本轮for循环结束时间点,同时也是下一轮for循环的开始时间点

            currentTimeNanos = time;

        }  

    } catch (CancelledKeyException e) {

    }

}

这个方法直接画图解释把:

Java

而在 switch-case 唯一的代码逻辑也就是在任务队列中没有任务时执行的阻塞 select,而在其他的任何情况下或者阻塞选择存在就绪 channel 或者任务队列新增任务之后都会跳出 switch - case,执行后续逻辑。

Tag 1.3.3.3 :processSelectedKeys()

首先我要说的时这个 processSelectedKeys 方法时处理就绪的 channel 的 IO,而代码逻辑走到这里其实并不一定就有已经就绪的 channel,因为看了上面的逻辑会发现代码任务处理为先,而存在任务就会走到这里逻辑,虽然在走到这里之前也执行了 select 的 channel 但是也都是去查看一遍是否存在就绪 channel,所以这里看下面的逻辑需要先有这个理解,最后我们再具体看 processSelectedKeys 代码:

final int ioRatio = this.ioRatio; // 默认值 50

if (ioRatio == 100) {

    try {

        processSelectedKeys();

    } finally {

        // Ensure we always run tasks.

        runAllTasks();

    }

} else {

    // 记录处理就绪channel的IO开始执行的时间点

    final long ioStartTime = System.nanoTime();

    try {

        // 处理就绪channel的IO

        processSelectedKeys();

    } finally {

        // 计算出处理就绪channel的IO所使用的时长

        final long ioTime = System.nanoTime() - ioStartTime;

        // 执行任务队列中的任务

        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

    }

}

这段代码的整体逻辑再我们看完 runAllTasks 之后再分析,这里存在一个 io 处理与 task 处理的时间分配逻辑。后面再看,这里继续跟进 processSelectedKeys

private void processSelectedKeys() {

    // 判断channel的selectedKeys是否是优化过的

    if (selectedKeys != null) {

        processSelectedKeysOptimized();  // 优化处理方式

    } else {

        processSelectedKeysPlain(selector.selectedKeys());  // 普通处理方式

    }

}

到这里,代码有限判断了当前是否启用的 selectedKey 的优化,再 NioEventLoopGroup 的时候说过,优化就是将selectedKeys 的 set 集合转换成了数组,而再这里也可以得到验证,selectedKeys直接产看这个属性就可以看到,这里不进去看了,感兴趣进去看看。然后针对优化和非优化的处理唯一的区别就是处理的 selectedKeys对象是数组还是集合。这里直接分析 processSelectedKeysOptimized 方法,processSelectedKeysPlain 方法可以自己看,一样的处理。

private void processSelectedKeysOptimized() {

    for (int i = 0; i < selectedKeys.size; ++i) {

        // 从数组中取出一个元素

        final SelectionKey k = selectedKeys.keys[i];

    // 移除已经取出的 SelectionKey,使 GC 可以处理到已经关闭的 channel

        selectedKeys.keys[i] = null;

        // 获取selectionKey的附件,该附件中可以存放任意数据,不过这里存放的是NIO原生channel

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {

            processSelectedKey(k, (AbstractNioChannel) a);  // 处理就绪事件

        } else {

            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

            processSelectedKey(k, task); // 这里是测试代码。跟进去可以看到实现方法是测试类

        }

// 省略......

这里就是直接轮询 selectedKeys 的集合,每取出一个 selectKey 都会在原数组中移除当前元素,就是为了当 channel 关闭后, GC 可以释放当前 channel 占用的内存。

然后获取 selectKey 中保存的 Nio 原生的 channel,处理就绪后逻辑:processSelectedKey 继续跟进:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

    // 处理selectionKey失效的情况

    if (!k.isValid()) {

        final EventLoop eventLoop;

        try {

            eventLoop = ch.eventLoop();

        } catch (Throwable ignored) {

            return;

        }

        if (eventLoop != this || eventLoop == null) {

            return;

        }

        unsafe.close(unsafe.voidPromise());

        return;

    }

    try {

        int readyOps = k.readyOps();

// 判断当前 channnel 就绪的事件类型

        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {

            // 获取当前selectionKey的interestOps

            int ops = k.interestOps();

            // 先将SelectionKey.OP_CONNECT按位取或,再与ops进行按位与

            ops &= ~SelectionKey.OP_CONNECT;

            // 将修改过的ops再写入到selectionsKey中

            k.interestOps(ops);

            // 连接server

            unsafe.finishConnect();

        }

        // 处理写就绪的情况

        if ((readyOps & SelectionKey.OP_WRITE) != 0) {

            // 强制刷新(将user buffer中的数据写入到网关缓存)

            ch.unsafe().forceFlush();

        }

        // readyOps为0表示当前没有任何channel就绪

        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

            // 将网卡缓存中的数据写入到user buffer

            unsafe.read();

        }

    } catch (CancelledKeyException ignored) {

        unsafe.close(unsafe.voidPromise());

    }

}

这段逻辑就是处理就绪 channel 的 IO 事件的逻辑。

1、判断当前 SelectionKey 是否有效。失效结束处理并关闭资源。

2、判断当前 channel 的关注事件,针对处理:获取 SelectionKey 的 readyOps。这里的判断逻辑都是使用高效的位运算。readyOps 为当前 SelectionKey 的就绪的事件类型。

3、(readyOps & SelectionKey.OP_CONNECT) != 0 :连接就绪事件

这个事件在 server 端不会关注,只有 client 用来连接 server 时才会关注连接就绪事件。

连接就绪后,获取当前 SelectionKey 的 interestOps 值,将当前 interestOps 值修改后,调用底层 unsafe 连接server

4、(readyOps & SelectionKey.OP_WRITE) != 0 :写就绪事件

当前 channel 关注的是写就绪事件,此时写操作已经就绪,所以直接调用unsafe将数据写入的网卡缓存。

5、(readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 : 当前channel 关注的是读就绪事件,或者前面因为有新增任务而触发的就绪 channel 处理逻辑,只有因为任务触发的情况下 readyOps 才可能会是 0 ,readyOps = 0 意味着没有就绪 channel。

直接调用 unsafe 继续读操作,将网卡缓存的数据读取到用户空间。如果是 readyOps = 0 的情况相当于网卡缓存并没有就绪数据,则时进行的读操作不会读取到数据。

这就是完整的 IO 处理逻辑,主要根据当前 channel 关注的事件进行相应的 unsafe 操作。

Tag 1.3.3.4 :runAllTasks()

下面我们在看下 runAllTask 方法。

runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

protected boolean runAllTasks(long timeoutNanos) {

    // 从定时任务队列中取出所有当前马上就要到期的定时任务放入到任务队列

    fetchFromScheduledTaskQueue();

    // 从任务队列中取出一个任务

    Runnable task = pollTask();

    // 若该任务为空,则说明任务队列中已经没有任务了,此时就可以执行收尾任务了

    if (task == null) {

        // 执行收尾队列中的收尾任务

        afterRunningAllTasks();

        return false;

    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;

    // 计数器

    long runTasks = 0;

    long lastExecutionTime;

    for (;;) {

        // 执行任务

        safeExecute(task);

        runTasks ++;

        // 每64个任务查看一次超时

        if ((runTasks & 0x3F) == 0) {

            lastExecutionTime = ScheduledFutureTask.nanoTime();

            if (lastExecutionTime >= deadline) {

                break;

            }

        }

        // 从任务队列中再取出一个任务

        task = pollTask();

        if (task == null) {

            lastExecutionTime = ScheduledFutureTask.nanoTime();

            break;

        }

    } // end-for

    // 处理收尾队列中的任务

    afterRunningAllTasks();

    this.lastExecutionTime = lastExecutionTime;

    return true;

}

画图理解一下这个方法:

Java

这里面有几个方法大家感兴趣可以进去看看:这部分逻辑不复杂,大家可以自己研究下。

1、fetchFromScheduledTaskQueue()

io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue

从定时任务队列中取出所有当前马上就要到期的定时任务放入到任务队列

2、pollTask()

io.netty.util.concurrent.SingleThreadEventExecutor#pollTask

从任务队列中取出一个任务

3、afterRunningAllTasks()

io.netty.util.concurrent.SingleThreadEventExecutor#afterRunningAllTasks

执行收尾任务队列中的所有收尾任务

4、safeExecute(task)

io.netty.util.concurrent.AbstractEventExecutor#safeExecute

执行任务

到此我们的 channel 的 initAndRegister 介绍完成,并且介绍了 channel 就绪后的执行方法 eventLoop 的 execute 调用的 run 方法的逻辑。其实 run 方法不是说注册初始化的时候就调用的,而是通过任务或者就绪 channel 触发的,只是注册时候说到这个代码就直接跟完这个逻辑,让大家也好理解一点。

以上内容由开课吧老师敖丙提供,更多Java教程尽在开课吧广场Java教程频道。更多免费课程可以关注公众号“码农集散地”

有用
分享