netty系列之(三)——EventLoop和EventLoopGroup

一、線程模型

Netty的線程模型

Q

1、默認(rèn)情況下netty服務(wù)端起多少個(gè)線程?何時(shí)啟動(dòng)?
2、netty如何解決jdk空輪詢的bug?
3、netty如何保證異步串行無鎖化?

過程

1、NioEventLoop創(chuàng)建
2、NioEventLoop啟動(dòng)
3、NioEventLoop執(zhí)行邏輯

private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup();//用于處理I/O相關(guān)的讀寫操作,或者執(zhí)行Task

服務(wù)端啟動(dòng)的時(shí)候,創(chuàng)建了兩個(gè)NioEventLoopGroup,它們實(shí)際是兩個(gè)獨(dú)立的Reactor線程池。一個(gè)用于接收客戶端的TCP連接,另一個(gè)用于處理I/O相關(guān)的讀寫操作,或者執(zhí)行系統(tǒng)Task、定時(shí)任務(wù)Task等。
Netty用于接收客戶端請(qǐng)求的線程池職責(zé)如下。

  • 接收客戶端TCP連接,初始化Channel參數(shù);
  • 將鏈路狀態(tài)變更事件通知給ChannelPipeline。

Netty處理I/O操作的Reactor線程池職責(zé)如下。

  • 異步讀取通信對(duì)端的數(shù)據(jù)報(bào),發(fā)送讀事件到ChannelPipeline;
  • 異步發(fā)送消息到通信對(duì)端,調(diào)用ChannelPipeline的消息發(fā)送接口;
  • 執(zhí)行系統(tǒng)調(diào)用Task;
  • 執(zhí)行定時(shí)任務(wù)Task,例如鏈路空閑狀態(tài)監(jiān)測定時(shí)任務(wù)。

為了盡可能地提升性能,Netty在很多地方進(jìn)行了無鎖化的設(shè)計(jì),例如在I/O線程內(nèi)部進(jìn)行串行操作,避免多線程競爭導(dǎo)致的性能下降問題。表面上看,串行化設(shè)計(jì)似乎CPU利用率不高,并發(fā)程度不夠。但是,通過調(diào)整NIO線程池的線程參數(shù),可以同時(shí)啟動(dòng)多個(gè)串行化的線程并行運(yùn)行,這種局部無鎖化的串行線程設(shè)計(jì)相比一個(gè)隊(duì)列—多個(gè)工作線程的模型性能更優(yōu)。
Netty的NioEventLoop讀取到消息之后,直接調(diào)用ChannelPipeline的fireChannelRead (Object msg)。只要用戶不主動(dòng)切換線程,一直都是由NioEventLoop調(diào)用用戶的Handler,期間不進(jìn)行線程切換。這種串行化處理方式避免了多線程操作導(dǎo)致的鎖的競爭,從性能角度看是最優(yōu)的。
netty線程使用建議:

(1)創(chuàng)建兩個(gè)NioEventLoopGroup,用于邏輯隔離NIO acceptor和NIO I/O線程。
(2)盡量不要在ChannelHandler中啟動(dòng)用戶線程(解碼后用于將POJO消息派發(fā)到后端業(yè)務(wù)線程的除外)。
(3)解碼要放在NIO線程調(diào)用的解碼Handler中進(jìn)行,不要切換到用戶線程中完成消息的解碼。
(4)如果業(yè)務(wù)邏輯操作非常簡單,沒有復(fù)雜的業(yè)務(wù)邏輯計(jì)算,沒有可能會(huì)導(dǎo)致線程被阻塞的磁盤操作、數(shù)據(jù)庫操作、網(wǎng)路操作等,可以直接在NIO線程上完成業(yè)務(wù)邏輯編排,不需要切換
(5)如果業(yè)務(wù)邏輯處理復(fù)雜,不要在NIO線程上完成,建議將解碼后的POJO消息封裝成Task,派發(fā)到業(yè)務(wù)線程池中由業(yè)務(wù)線程執(zhí)行,以保證NIO線程盡快被釋放,處理其他的I/O操作。推薦的線程數(shù)量計(jì)算公式有以下兩種。
?公式一:線程數(shù)量=(線程總時(shí)間/瓶頸資源時(shí)間)×瓶頸資源的線程并行數(shù)。
?公式二:QPS=1000/線程總時(shí)間×線程數(shù)。

二、NioEventLoop源碼解析

NioEventLoop類圖.png
圖片.png

1、NioEventLoop創(chuàng)建

NioEventLoop創(chuàng)建步驟:

  • new ThreadPerTaskExecutor()[線程創(chuàng)建器]:線程執(zhí)行器的作用是負(fù)責(zé)創(chuàng)建NioEventLoopGroup對(duì)應(yīng)底層線程
  • for(){newChild()}[構(gòu)造NioEventLoop]:創(chuàng)建NioEventLoop對(duì)象數(shù)組,for循環(huán)創(chuàng)建每個(gè)NioEventLoop,調(diào)用newChild()配置NioEventLoop核心參數(shù)
  • chooserFactory.newChooser()[線程選擇器]:給每個(gè)新連接分配NioEventLoop線程
    chooserFactory.newChooser

    PowerOfTwoEventExecutorChooser.next

    (1)、NioEventLoopGroup與ThreadPerTaskExecutor線程創(chuàng)建器
public NioEventLoopGroup() {
        this(0);
}

public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
}

//線程組默認(rèn)線程數(shù)為2倍的cpu數(shù)
//DEFAULT_EVENT_LOOP_THREADS=2*Runtime.getRuntime().availableProcessors()
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

#MultithreadEventExecutorGroup類,核心方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            //線程創(chuàng)建器,負(fù)責(zé)創(chuàng)建NioEventLoopGroup對(duì)應(yīng)底層線程
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];//創(chuàng)建NioEventLoop對(duì)象數(shù)組

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //for循環(huán)創(chuàng)建每個(gè)NioEventLoop,調(diào)用newChild(),配置NioEventLoop核心參數(shù)
                children[i] = newChild(executor, args);//newChild
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        //線程選擇器,給每個(gè)新連接分配NioEventLoop線程
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
  }
    
#使用threadFactory創(chuàng)建線程
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
      //每次執(zhí)行任務(wù)創(chuàng)建一個(gè)線程,newDefaultThreadFactory定義了nioEventLoop-1-xx的線程名
        threadFactory.newThread(command).start();
    }
}

(2)、newChild():創(chuàng)建NioEventLoop線程

  • 保持線程執(zhí)行器ThreadPerTaskExecutor;
  • 創(chuàng)建一個(gè)MpscQueue:taskQueue用于外部線程執(zhí)行Netty任務(wù)的時(shí)候,如果判斷不是在NioEventLoop對(duì)應(yīng)線程里面執(zhí)行,而直接塞到任務(wù)隊(duì)列里面,由NioEventLoop對(duì)應(yīng)線程執(zhí)行,
    PlatformDependent.newMpscQueue(maxPendingTasks)創(chuàng)建MpscQueue保存異步任務(wù)隊(duì)列;
  • 創(chuàng)建一個(gè)selector:provider.openSelector()創(chuàng)建selector輪詢初始化連接
#NioEventLoopGroup
@Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

    
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);//父類構(gòu)造函數(shù)
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    
#SingleThreadEventExecutor類,父類構(gòu)造函數(shù)
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);//task隊(duì)列,外部線程將任務(wù)扔進(jìn)隊(duì)列
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
    
//task queue
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return PlatformDependent.newMpscQueue(maxPendingTasks);
}
    
private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();//用數(shù)組實(shí)現(xiàn)set

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
                // ensure the current selector implementation is what we can instrument.
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        //通過反射方式設(shè)置selectedKeySet
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
    

(3)、 chooserFactory.newChooser()[線程選擇器]:給每個(gè)新連接均衡分配NioEventLoop線程

#DefaultEventExecutorChooserFactory 類
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    //判斷長度是否是2的冪,是則使用PowerOfTwoEventExecutorChooser,更高效
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];//&比取模高效,循環(huán)下標(biāo)
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];//取模
        }
    }
}

2、NioEventLoop啟動(dòng)流程

NioEventLoop啟動(dòng)流程步驟:

  • bind->execute(task)[入口]:調(diào)用NioEventLoop的execute()方法執(zhí)行綁定端口,操作封裝的Task
  • startThread()->doStartThread()[創(chuàng)建線程]:非NioEventLoop線程調(diào)用startThread()方法,創(chuàng)建啟動(dòng)線程
  • ThreadPerTaskExecutor.execute():線程執(zhí)行器執(zhí)行任務(wù),創(chuàng)建并啟動(dòng)FastThreadLocalThread線程
  • NioEventLoop.run()[啟動(dòng)]
#AbstractBootstrap類doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

#AbstractBootstrap類doBind0方法
private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        //調(diào)用SingleThreadEventExecutor中execute方法
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    
#SingleThreadEventExecutor中execute方法
public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();//判斷是否當(dāng)前eventloop中
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();//創(chuàng)建線程
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
    
    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

//啟動(dòng)線程  
private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();//保存當(dāng)前線程,用于判斷
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();//觸發(fā)NioEventLoop執(zhí)行
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

3、NioEventLoop執(zhí)行過程

NioEventLoop.run()->SingleThreadEventExecutor.this.run():
run()->for(;;)

  • select()[檢查是否有io事件]:輪詢注冊(cè)到selector上面的io事件
  • processSelectedKeys()[處理io事件]
  • runAllTasks()[處理異步任務(wù)隊(duì)列]:處理外部線程扔到TaskQueue里面的任務(wù)
    (1)、NioEventLoop的run方法
#NioEventLoop的run方法
protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));//wakenUp.getAndSet(false)取值,并設(shè)置為false。

                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
//ioRatio控制processSelectedKeys(處理IO)和runAllTasks時(shí)間占有率,
//runAllTasks處理外部線程扔到taskQueue的任務(wù)。默認(rèn)50,即時(shí)間1:1
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

(2)、select()方法執(zhí)行邏輯(檢測IO事件):

  • deadline以及任務(wù)穿插邏輯處理:計(jì)算本次執(zhí)行select截止時(shí)間(根據(jù)NioEventLoop當(dāng)時(shí)是否有定時(shí)任務(wù)處理)以及判斷在select的時(shí)候是否有任務(wù)要處理
  • 阻塞式select:未到截止時(shí)間或者任務(wù)隊(duì)列為空進(jìn)行一次阻塞式select操作
  • 避免JDK空輪詢的Bug:判斷這次select操作是否阻塞timeoutMillis時(shí)間,未阻塞timeoutMillis時(shí)間表示觸發(fā)JDK空輪詢;判斷觸發(fā)JDK空輪詢的次數(shù)是否超過閾值,達(dá)到閾值調(diào)用rebuildSelector()方法替換原來的selector操作方式避免下次JDK空輪詢繼續(xù)發(fā)生
private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//截止時(shí)間
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {//taskQueue是否有,wakenUp設(shè)置為true
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                //未到截止時(shí)間,且當(dāng)前taskQueue無任務(wù),進(jìn)行阻塞式的select
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;//輪詢selectCnt次數(shù)++
                //輪詢到IO事件,或當(dāng)前操作是否需要喚醒,或被喚醒,或異步隊(duì)列有任務(wù),或者有定時(shí)任務(wù)。則退出循環(huán)
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }
                //防止死循環(huán)
                long time = System.nanoTime();
                //time-currentTimeNanos>超時(shí)時(shí)間,說明已經(jīng)進(jìn)行了一次阻塞式
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    //沒超過時(shí)間說明有空輪詢,當(dāng)空輪詢超過SELECTOR_AUTO_REBUILD_THRESHOLD次數(shù)
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();//重新創(chuàng)建selector
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }
    
    
    protected boolean hasTasks() {
        assert inEventLoop();
        return !taskQueue.isEmpty();
    }
    
    /**解決java nio 空輪詢bug
     * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
     * around the infamous epoll 100% CPU bug.
     */
    public void rebuildSelector() {
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector0();
                }
            });
            return;
        }
        rebuildSelector0();
    }

    private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;

        if (oldSelector == null) {
            return;
        }

        try {
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // Register all channels to the new Selector.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {//重新注冊(cè)到新的Selector
            Object a = key.attachment();
            try {
                if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                }

                int interestOps = key.interestOps();
                key.cancel();
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }

        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;

        try {
            // time to close the old selector as everything else is registered to the new one
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }

        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }

(3)、IO事件處理:processSelectedKeys()執(zhí)行邏輯:

  • selected keySet優(yōu)化:select操作每次把已就緒狀態(tài)的io事件添加到底層HashSet(時(shí)間復(fù)雜度為O(n))數(shù)據(jù)結(jié)構(gòu),通過反射方式將HashSet替換成數(shù)組的實(shí)現(xiàn)
  • processSelectedKeysOptimized():
    調(diào)用SelectedKeys的flip()方法獲取SelectionKey數(shù)組;
    遍歷SelectionKey數(shù)組獲取SelectionKey的attachment即NioChannel;
    SelectionKey合法獲取SelectionKey的io事件進(jìn)行事件處理
private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {//不需要優(yōu)化,返回原生的Selector(默認(rèn)進(jìn)行優(yōu)化)
            return new SelectorTuple(unwrappedSelector);
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });
        
        if (!(maybeSelectorImplClass instanceof Class) ||
                // ensure the current selector implementation is what we can instrument.
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        //通過反射方式
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);//反射方式進(jìn)行賦值
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
    
    
#使用SelectedSelectionKeySet代替hashset,采用數(shù)組實(shí)現(xiàn),add方法
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;// O(1),而hashset的add為o(n)
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }

    @Override
    public int size() {
        return size;
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
        throw new UnsupportedOperationException();
    }

    void reset() {
        reset(0);
    }

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }

    private void increaseCapacity() {
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }
}


//NioEventLoop的run方法調(diào)用processSelectedKeys
private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

#處理IO事件
private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {//遍歷SelectedSelectionKeySet數(shù)組
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();//attachment即NioChannel

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);//處理io事件
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }


private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }   

(4)、tash執(zhí)行,runAllTasks()執(zhí)行邏輯:

  • Task的分類和添加:分為普通任務(wù)Task和定時(shí)任務(wù)Task
    MpscQueue創(chuàng)建NioEventLoop構(gòu)造,外部線程使用addTask()方法添加task;
    ScheduledTaskQueue調(diào)用schedule()封裝ScheduledFutureTask添加到普通任務(wù)隊(duì)列
  • 任務(wù)的聚合->fetchFromScheduledTaskQueue():將定時(shí)任務(wù)隊(duì)列任務(wù)聚合到普通任務(wù)隊(duì)列
  • 任務(wù)的執(zhí)行:獲取普通任務(wù)隊(duì)列待執(zhí)行任務(wù),使用safeExecute()方法執(zhí)行任務(wù),每次當(dāng)累計(jì)任務(wù)數(shù)量達(dá)到64判斷當(dāng)前時(shí)間是否超過截止時(shí)間中斷執(zhí)行后續(xù)任務(wù)
#NioEventLoop構(gòu)造函數(shù)中,創(chuàng)建taskQueue
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

#父類構(gòu)造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);//taskQueue 
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

#NioEventLoop
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return PlatformDependent.newMpscQueue(maxPendingTasks);
}

//普通任務(wù)隊(duì)列添加
#SingleThreadEventExecutor
public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    
#SingleThreadEventExecutor
protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }
    
//定時(shí)任務(wù)隊(duì)列
#AbstractScheduledEventExecutor 
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(callable, "callable");
        ObjectUtil.checkNotNull(unit, "unit");
        if (delay < 0) {
            delay = 0;
        }
        return schedule(new ScheduledFutureTask<V>(
                this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }
    
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {//是在在當(dāng)前EventLoop,否則外部線程執(zhí)行
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }

//非線程安全的
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
        }
        return scheduledTaskQueue;
    }
    
#runAllTasks
protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();//從定時(shí)任務(wù)隊(duì)列里拉取第一個(gè)任務(wù)(根據(jù)截止時(shí)間排序優(yōu)先級(jí))
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;
             //每64個(gè)task檢查一次deadline
            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();//
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
    
//從定時(shí)任務(wù)隊(duì)列取任務(wù)    
private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask  = pollScheduledTask(nanoTime);//取截止時(shí)間為nanoTime的任務(wù)
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) {
                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            scheduledTask  = pollScheduledTask(nanoTime);
        }
        return true;
    }
    
#AbstractScheduledEventExecutor
protected final Runnable pollScheduledTask(long nanoTime) {
        assert inEventLoop();

        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        //取第一個(gè)任務(wù)
        ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
        if (scheduledTask == null) {
            return null;
        }

        if (scheduledTask.deadlineNanos() <= nanoTime) {
            scheduledTaskQueue.remove();//取小于等于nanoTime的
            return scheduledTask;
        }
        return null;//無符合nanoTime的任務(wù)
    }
    
    
//執(zhí)行任務(wù)
#SingleThreadEventExecutor
protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();//取任務(wù)
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;//計(jì)算截止時(shí)間
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
//累計(jì)任務(wù)數(shù)量達(dá)到64判斷當(dāng)前時(shí)間是否超過截止時(shí)間中斷執(zhí)行后續(xù)任務(wù),因?yàn)镾cheduledFutureTask.nanoTime()比較耗時(shí),不每次判斷
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {//沒有任務(wù)退出
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
    
    protected Runnable pollTask() {
        assert inEventLoop();
        return pollTaskFrom(taskQueue);
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容