dubbo AbstractServiceNameMapping 源码

  • 2022-10-20
  • 浏览 (328)

dubbo AbstractServiceNameMapping 代码

文件路径:/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.metadata;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Stream.of;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDED_BY;
import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY;
import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
import static org.apache.dubbo.common.utils.CollectionUtils.toTreeSet;
import static org.apache.dubbo.common.utils.StringUtils.isBlank;

public abstract class AbstractServiceNameMapping implements ServiceNameMapping {
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    protected ApplicationModel applicationModel;
    private final MappingCacheManager mappingCacheManager;
    private final Map<String, Set<MappingListener>> mappingListeners = new ConcurrentHashMap<>();
    // mapping lock is shared among registries of the same application.
    private final ConcurrentMap<String, ReentrantLock> mappingLocks = new ConcurrentHashMap<>();
    // TODO, check how should this be cleared once a reference or interface is destroyed to avoid key accumulation
    private final Map<String, Boolean> mappingInitStatus = new HashMap<>();

    public AbstractServiceNameMapping(ApplicationModel applicationModel) {
        this.applicationModel = applicationModel;
        boolean enableFileCache = true;
        Optional<ApplicationConfig> application = applicationModel.getApplicationConfigManager().getApplication();
        if(application.isPresent()) {
            enableFileCache = Boolean.TRUE.equals(application.get().getEnableFileCache()) ? true : false;
        }
        this.mappingCacheManager = new MappingCacheManager(enableFileCache,
            applicationModel.tryGetApplicationName(),
            applicationModel.getFrameworkModel().getBeanFactory()
            .getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
    }

    // just for test
    public void setApplicationModel(ApplicationModel applicationModel) {
        this.applicationModel = applicationModel;
    }

    /**
     * Get the service names from the specified Dubbo service interface, group, version and protocol
     *
     * @return
     */
    abstract public Set<String> get(URL url);

    /**
     * Get the service names from the specified Dubbo service interface, group, version and protocol
     *
     * @return
     */
    abstract public Set<String> getAndListen(URL url, MappingListener mappingListener);

    abstract protected void removeListener(URL url, MappingListener mappingListener);

    @Override
    public synchronized void initInterfaceAppMapping(URL subscribedURL) {
        String key = ServiceNameMapping.buildMappingKey(subscribedURL);
        if (hasInitiated(key)) {
            return;
        }
        mappingInitStatus.put(key, Boolean.TRUE);

        Set<String> subscribedServices = new TreeSet<>();
        String serviceNames = subscribedURL.getParameter(PROVIDED_BY);

        if (StringUtils.isNotEmpty(serviceNames)) {
            logger.info(key + " mapping to " + serviceNames + " instructed by provided-by set by user.");
            subscribedServices.addAll(parseServices(serviceNames));
        }

        if (isEmpty(subscribedServices)) {
            Set<String> cachedServices = this.getCachedMapping(key);
            if (!isEmpty(cachedServices)) {
                logger.info(key + " mapping to " + serviceNames + " instructed by local cache.");
                subscribedServices.addAll(cachedServices);
            }
        } else {
            this.putCachedMappingIfAbsent(key, subscribedServices);
        }
    }

    @Override
    public Set<String> getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {
        String key = ServiceNameMapping.buildMappingKey(subscribedURL);
        // use previously cached services.
        Set<String> mappingServices = this.getCachedMapping(key);

        // Asynchronously register listener in case previous cache does not exist or cache expired.
        if (CollectionUtils.isEmpty(mappingServices)) {
            try {
                logger.info("Local cache mapping is empty");
                mappingServices = (new AsyncMappingTask(listener, subscribedURL, false)).call();
            } catch (Exception e) {
                // ignore
            }
            if (CollectionUtils.isEmpty(mappingServices)) {
                String registryServices = registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY);
                if (StringUtils.isNotEmpty(registryServices)) {
                    logger.info(subscribedURL.getServiceInterface() + " mapping to " + registryServices + " instructed by registry subscribed-services.");
                    mappingServices = parseServices(registryServices);
                }
            }
            if (CollectionUtils.isNotEmpty(mappingServices)) {
                this.putCachedMapping(key, mappingServices);
            }
        } else {
            ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
                .getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
            executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
        }

        return mappingServices;
    }

    @Override
    public MappingListener stopListen(URL subscribeURL, MappingListener listener) {
        synchronized (mappingListeners) {
            String mappingKey = ServiceNameMapping.buildMappingKey(subscribeURL);
            Set<MappingListener> listeners = mappingListeners.get(mappingKey);
            //todo, remove listener from remote metadata center
            if (CollectionUtils.isNotEmpty(listeners)) {
                listeners.remove(listener);
                listener.stop();
                removeListener(subscribeURL, listener);
            }
            if (CollectionUtils.isEmpty(listeners)) {
                mappingListeners.remove(mappingKey);
                removeCachedMapping(mappingKey);
                removeMappingLock(mappingKey);
            }
            return listener;
        }
    }

    static Set<String> parseServices(String literalServices) {
        return isBlank(literalServices) ? emptySet() :
            unmodifiableSet(new TreeSet<>(of(literalServices.split(","))
                .map(String::trim)
                .filter(StringUtils::isNotEmpty)
                .collect(toSet())));
    }

    @Override
    public void putCachedMapping(String serviceKey, Set<String> apps) {
        mappingCacheManager.put(serviceKey, toTreeSet(apps));
    }

    protected void putCachedMappingIfAbsent(String serviceKey, Set<String> apps) {
        Lock lock = getMappingLock(serviceKey);
        try {
            lock.lock();
            if (CollectionUtils.isEmpty(mappingCacheManager.get(serviceKey))) {
                mappingCacheManager.put(serviceKey, toTreeSet(apps));
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public Set<String> getCachedMapping(String mappingKey) {
        return mappingCacheManager.get(mappingKey);
    }

    @Override
    public Set<String> getCachedMapping(URL consumerURL) {
        return getCachedMapping(ServiceNameMapping.buildMappingKey(consumerURL));
    }

    @Override
    public Set<String> getRemoteMapping(URL consumerURL) {
        return get(consumerURL);
    }

    @Override
    public Set<String> removeCachedMapping(String serviceKey) {
        return mappingCacheManager.remove(serviceKey);
    }

    @Override
    public Map<String, Set<String>> getCachedMapping() {
        return Collections.unmodifiableMap(mappingCacheManager.getAll());
    }

    public Lock getMappingLock(String key) {
        return mappingLocks.computeIfAbsent(key, _k -> new ReentrantLock());
    }

    protected void removeMappingLock(String key) {
        Lock lock = mappingLocks.get(key);
        if (lock != null) {
            try {
                lock.lock();
                mappingLocks.remove(key);
            } finally {
                lock.unlock();
            }
        }
    }

    private boolean hasInitiated(String key) {
        Lock lock = getMappingLock(key);
        try {
            lock.lock();
            return mappingInitStatus.computeIfAbsent(key, _k -> Boolean.FALSE);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void $destroy() {
        mappingCacheManager.destroy();
        mappingListeners.clear();
        mappingLocks.clear();
        mappingInitStatus.clear();

    }

    private class AsyncMappingTask implements Callable<Set<String>> {
        private final MappingListener listener;
        private final URL subscribedURL;
        private final boolean notifyAtFirstTime;

        public AsyncMappingTask(MappingListener listener, URL subscribedURL, boolean notifyAtFirstTime) {
            this.listener = listener;
            this.subscribedURL = subscribedURL;
            this.notifyAtFirstTime = notifyAtFirstTime;
        }

        @Override
        public Set<String> call() throws Exception {
            synchronized (mappingListeners) {
                Set<String> mappedServices = emptySet();
                try {
                    String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
                    if (listener != null) {
                        mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
                        Set<MappingListener> listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());
                        listeners.add(listener);
                        if (CollectionUtils.isNotEmpty(mappedServices)) {
                            if (notifyAtFirstTime) {
                                // guarantee at-least-once notification no matter what kind of underlying meta server is used.
                                // listener notification will also cause updating of mapping cache.
                                listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
                            }
                        }
                    } else {
                        mappedServices = get(subscribedURL);
                        if (CollectionUtils.isNotEmpty(mappedServices)) {
                            AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
                        }
                    }
                } catch (Exception e) {
                    logger.error("Failed getting mapping info from remote center. ", e);
                }
                return mappedServices;
            }
        }
    }
}

相关信息

dubbo 源码目录

相关文章

dubbo AbstractCacheManager 源码

dubbo DefaultMetadataParamsFilter 源码

dubbo InstanceMetadataChangedListener 源码

dubbo MappingCacheManager 源码

dubbo MappingChangedEvent 源码

dubbo MappingListener 源码

dubbo MetadataConstants 源码

dubbo MetadataInfo 源码

dubbo MetadataParamsFilter 源码

dubbo MetadataService 源码

0  赞