dubbo SerializingExecutor 源码

  • 2022-10-20
  • 浏览 (479)

dubbo SerializingExecutor 代码

文件路径:/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dubbo.common.threadpool.serial;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadlocal.InternalThreadLocal;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Executor ensuring that all {@link Runnable} tasks submitted are executed in order
 * using the provided {@link Executor}, and serially such that no two will ever be
 * running at the same time.
 */
public final class SerializingExecutor implements Executor, Runnable {

    private static final Logger LOGGER = LoggerFactory.getLogger(SerializingExecutor.class);

    /**
     * Use false to stop and true to run
     */
    private final AtomicBoolean atomicBoolean = new AtomicBoolean();

    private final Executor executor;

    private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();

    /**
     * Creates a SerializingExecutor, running tasks using {@code executor}.
     *
     * @param executor Executor in which tasks should be run. Must not be null.
     */
    public SerializingExecutor(Executor executor) {
        this.executor = executor;
    }

    /**
     * Runs the given runnable strictly after all Runnables that were submitted
     * before it, and using the {@code executor} passed to the constructor.     .
     */
    @Override
    public void execute(Runnable r) {
        runQueue.add(r);
        schedule(r);
    }

    private void schedule(Runnable removable) {
        if (atomicBoolean.compareAndSet(false, true)) {
            boolean success = false;
            try {
                executor.execute(this);
                success = true;
            } finally {
                // It is possible that at this point that there are still tasks in
                // the queue, it would be nice to keep trying but the error may not
                // be recoverable.  So we update our state and propagate so that if
                // our caller deems it recoverable we won't be stuck.
                if (!success) {
                    if (removable != null) {
                        // This case can only be reached if 'this' was not currently running, and we failed to
                        // reschedule.  The item should still be in the queue for removal.
                        // ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
                        // throw if the item to remove is null.  If removable is present in the queue twice,
                        // the wrong one may be removed.  It doesn't seem possible for this case to exist today.
                        // This is important to run in case of RejectedExecutionException, so that future calls
                        // to execute don't succeed and accidentally run a previous runnable.
                        runQueue.remove(removable);
                    }
                    atomicBoolean.set(false);
                }
            }
        }
    }

    @Override
    public void run() {
        Runnable r;
        try {
            while ((r = runQueue.poll()) != null) {
                try {
                    InternalThreadLocal.removeAll();
                    r.run();
                } catch (RuntimeException e) {
                    LOGGER.error("Exception while executing runnable " + r, e);
                } finally {
                    InternalThreadLocal.removeAll();
                }
            }
        } finally {
            atomicBoolean.set(false);
        }
        if (!runQueue.isEmpty()) {
            // we didn't enqueue anything but someone else did.
            schedule(null);
        }
    }
}

相关信息

dubbo 源码目录

相关文章

dubbo AddressListener 源码

dubbo CacheableRouterFactory 源码

dubbo Cluster 源码

dubbo ClusterInvoker 源码

dubbo ClusterScopeModelInitializer 源码

dubbo Configurator 源码

dubbo ConfiguratorFactory 源码

dubbo Constants 源码

dubbo Directory 源码

dubbo LoadBalance 源码

0  赞