dubbo DefaultMetricsCollector 源码

  • 2022-10-20
  • 浏览 (429)

dubbo DefaultMetricsCollector 代码

文件路径:/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dubbo.common.metrics.collector;

import static org.apache.dubbo.common.metrics.model.MetricsCategory.REQUESTS;
import static org.apache.dubbo.common.metrics.model.MetricsCategory.RT;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.dubbo.common.metrics.collector.stat.MetricsStatComposite;
import org.apache.dubbo.common.metrics.collector.stat.MetricsStatHandler;
import org.apache.dubbo.common.metrics.event.RequestEvent;
import org.apache.dubbo.common.metrics.listener.MetricsListener;
import org.apache.dubbo.common.metrics.model.MetricsKey;
import org.apache.dubbo.common.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.common.metrics.model.sample.MetricSample;
import org.apache.dubbo.rpc.model.ApplicationModel;

/**
 * Default implementation of {@link MetricsCollector}
 */
public class DefaultMetricsCollector implements MetricsCollector {

    private AtomicBoolean collectEnabled = new AtomicBoolean(false);
    private final List<MetricsListener> listeners = new ArrayList<>();
    private final ApplicationModel applicationModel;
    private final MetricsStatComposite stats;

    public DefaultMetricsCollector(ApplicationModel applicationModel) {
        this.applicationModel = applicationModel;
        this.stats = new MetricsStatComposite(applicationModel.getApplicationName(), this);
    }

    public void setCollectEnabled(Boolean collectEnabled) {
        this.collectEnabled.compareAndSet(isCollectEnabled(), collectEnabled);
    }

    public Boolean isCollectEnabled() {
        return collectEnabled.get();
    }

    public void addListener(MetricsListener listener) {
        listeners.add(listener);
    }

    public List<MetricsListener> getListener() {
        return this.listeners;
    }

    public void increaseTotalRequests(String interfaceName, String methodName, String group, String version) {
        doExecute(RequestEvent.Type.TOTAL,statHandler-> {
            statHandler.increase(interfaceName, methodName, group, version);
        });
    }

    public void increaseSucceedRequests(String interfaceName, String methodName, String group, String version) {
        doExecute(RequestEvent.Type.SUCCEED,statHandler->{
            statHandler.increase(interfaceName, methodName, group, version);
        });
    }

    public void increaseFailedRequests(String interfaceName,
                                       String methodName,
                                       String group,
                                       String version) {
        doExecute(RequestEvent.Type.FAILED,statHandler->{
            statHandler.increase(interfaceName, methodName, group, version);
        });
    }

    public void businessFailedRequests(String interfaceName, String methodName, String group, String version) {
        doExecute(RequestEvent.Type.BUSINESS_FAILED,statHandler->{
            statHandler.increase(interfaceName, methodName, group, version);
        });
    }

    public void increaseProcessingRequests(String interfaceName, String methodName, String group, String version) {
        doExecute(RequestEvent.Type.PROCESSING,statHandler-> {
            statHandler.increase(interfaceName, methodName, group, version);
        });
    }

    public void decreaseProcessingRequests(String interfaceName, String methodName, String group, String version) {
        doExecute(RequestEvent.Type.PROCESSING,statHandler-> {
            statHandler.decrease(interfaceName, methodName, group, version);
        });
    }

    public void addRT(String interfaceName, String methodName, String group, String version, Long responseTime) {
        stats.addRT(interfaceName, methodName, group, version, responseTime);
    }

    @Override
    public List<MetricSample> collect() {
        List<MetricSample> list = new ArrayList<>();
        collectRequests(list);
        collectRT(list);

        return list;
    }

    private void collectRequests(List<MetricSample> list) {
        doExecute(RequestEvent.Type.TOTAL, MetricsStatHandler::get).filter(e->!e.isEmpty())
            .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TOTAL, k.getTags(), REQUESTS, v::get))));

        doExecute(RequestEvent.Type.SUCCEED, MetricsStatHandler::get).filter(e->!e.isEmpty())
            .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_SUCCEED, k.getTags(), REQUESTS, v::get))));

        doExecute(RequestEvent.Type.FAILED, MetricsStatHandler::get).filter(e->!e.isEmpty())
            .ifPresent(map->{
                map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_FAILED, k.getTags(), REQUESTS, v::get)));
            });

        doExecute(RequestEvent.Type.PROCESSING, MetricsStatHandler::get).filter(e->!e.isEmpty())
            .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_PROCESSING, k.getTags(), REQUESTS, v::get))));

        doExecute(RequestEvent.Type.BUSINESS_FAILED, MetricsStatHandler::get).filter(e->!e.isEmpty())
            .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUEST_BUSINESS_FAILED, k.getTags(), REQUESTS, v::get))));
    }

    private void collectRT(List<MetricSample> list) {
        this.stats.getLastRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_LAST, k.getTags(), RT, v::get)));
        this.stats.getMinRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MIN, k.getTags(), RT, v::get)));
        this.stats.getMaxRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MAX, k.getTags(), RT, v::get)));

        this.stats.getTotalRT().forEach((k, v) -> {
            list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_TOTAL, k.getTags(), RT, v::get));

            AtomicLong avg = this.stats.getAvgRT().get(k);
            AtomicLong count = this.stats.getRtCount().get(k);
            avg.set(v.get() / count.get());
            list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_AVG, k.getTags(), RT, avg::get));
        });
    }
    private <T> Optional<T> doExecute(RequestEvent.Type requestType, Function<MetricsStatHandler,T> statExecutor) {
        if (isCollectEnabled()) {
            MetricsStatHandler handler = stats.getHandler(requestType);
            T result =  statExecutor.apply(handler);
            return Optional.ofNullable(result);
        }
        return Optional.empty();
    }

    private void doExecute(RequestEvent.Type requestType, Consumer<MetricsStatHandler> statExecutor) {
        if (isCollectEnabled()) {
            MetricsStatHandler handler = stats.getHandler(requestType);
             statExecutor.accept(handler);
        }
    }
}

相关信息

dubbo 源码目录

相关文章

dubbo MetricsCollector 源码

0  赞