kafka KafkaProducerMetrics 源码

  • 2022-10-20
  • 浏览 (537)

kafka KafkaProducerMetrics 代码

文件路径:/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;

import java.util.Map;

public class KafkaProducerMetrics implements AutoCloseable {

    public static final String GROUP = "producer-metrics";
    private static final String FLUSH = "flush";
    private static final String TXN_INIT = "txn-init";
    private static final String TXN_BEGIN = "txn-begin";
    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
    private static final String TXN_COMMIT = "txn-commit";
    private static final String TXN_ABORT = "txn-abort";
    private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
    private static final String METADATA_WAIT = "metadata-wait";

    private final Map<String, String> tags;
    private final Metrics metrics;
    private final Sensor initTimeSensor;
    private final Sensor beginTxnTimeSensor;
    private final Sensor flushTimeSensor;
    private final Sensor sendOffsetsSensor;
    private final Sensor commitTxnSensor;
    private final Sensor abortTxnSensor;
    private final Sensor metadataWaitSensor;

    public KafkaProducerMetrics(Metrics metrics) {
        this.metrics = metrics;
        tags = this.metrics.config().tags();
        flushTimeSensor = newLatencySensor(
            FLUSH,
            "Total time producer has spent in flush in nanoseconds."
        );
        initTimeSensor = newLatencySensor(
            TXN_INIT,
            "Total time producer has spent in initTransactions in nanoseconds."
        );
        beginTxnTimeSensor = newLatencySensor(
            TXN_BEGIN,
            "Total time producer has spent in beginTransaction in nanoseconds."
        );
        sendOffsetsSensor = newLatencySensor(
            TXN_SEND_OFFSETS,
            "Total time producer has spent in sendOffsetsToTransaction in nanoseconds."
        );
        commitTxnSensor = newLatencySensor(
            TXN_COMMIT,
            "Total time producer has spent in commitTransaction in nanoseconds."
        );
        abortTxnSensor = newLatencySensor(
            TXN_ABORT,
            "Total time producer has spent in abortTransaction in nanoseconds."
        );
        metadataWaitSensor = newLatencySensor(
            METADATA_WAIT,
            "Total time producer has spent waiting on topic metadata in nanoseconds."
        );
    }

    @Override
    public void close() {
        removeMetric(FLUSH);
        removeMetric(TXN_INIT);
        removeMetric(TXN_BEGIN);
        removeMetric(TXN_SEND_OFFSETS);
        removeMetric(TXN_COMMIT);
        removeMetric(TXN_ABORT);
        removeMetric(METADATA_WAIT);
    }

    public void recordFlush(long duration) {
        flushTimeSensor.record(duration);
    }

    public void recordInit(long duration) {
        initTimeSensor.record(duration);
    }

    public void recordBeginTxn(long duration) {
        beginTxnTimeSensor.record(duration);
    }

    public void recordSendOffsets(long duration) {
        sendOffsetsSensor.record(duration);
    }

    public void recordCommitTxn(long duration) {
        commitTxnSensor.record(duration);
    }

    public void recordAbortTxn(long duration) {
        abortTxnSensor.record(duration);
    }

    public void recordMetadataWait(long duration) {
        metadataWaitSensor.record(duration);
    }

    private Sensor newLatencySensor(String name, String description) {
        Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX);
        sensor.add(metricName(name, description), new CumulativeSum());
        return sensor;
    }

    private MetricName metricName(final String name, final String description) {
        return metrics.metricName(name + TOTAL_TIME_SUFFIX, GROUP, description, tags);
    }

    private void removeMetric(final String name) {
        metrics.removeSensor(name + TOTAL_TIME_SUFFIX);
    }
}

相关信息

kafka 源码目录

相关文章

kafka BufferPool 源码

kafka BuiltInPartitioner 源码

kafka DefaultPartitioner 源码

kafka ErrorLoggingCallback 源码

kafka FutureRecordMetadata 源码

kafka IncompleteBatches 源码

kafka ProduceRequestResult 源码

kafka ProducerBatch 源码

kafka ProducerInterceptors 源码

kafka ProducerMetadata 源码

0  赞