kafka Decimal 源码

  • 2022-10-20
  • 浏览 (511)

kafka Decimal 代码

文件路径:/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.data;

import org.apache.kafka.connect.errors.DataException;

import java.math.BigDecimal;
import java.math.BigInteger;

/**
 * <p>
 *     An arbitrary-precision signed decimal number. The value is unscaled * 10 ^ -scale where:
 *     <ul>
 *         <li>unscaled is an integer </li>
 *         <li>scale is an integer representing how many digits the decimal point should be shifted on the unscaled value</li>
 *     </ul>
 * </p>
 * <p>
 *     Decimal does not provide a fixed schema because it is parameterized by the scale, which is fixed on the schema
 *     rather than being part of the value.
 * </p>
 * <p>
 *     The underlying representation of this type is bytes containing a two's complement integer
 * </p>
 */
public class Decimal {
    public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Decimal";
    public static final String SCALE_FIELD = "scale";

    /**
     * Returns a SchemaBuilder for a Decimal with the given scale factor. By returning a SchemaBuilder you can override
     * additional schema settings such as required/optional, default value, and documentation.
     * @param scale the scale factor to apply to unscaled values
     * @return a SchemaBuilder
     */
    public static SchemaBuilder builder(int scale) {
        return SchemaBuilder.bytes()
                .name(LOGICAL_NAME)
                .parameter(SCALE_FIELD, Integer.toString(scale))
                .version(1);
    }

    public static Schema schema(int scale) {
        return builder(scale).build();
    }

    /**
     * Convert a value from its logical format (BigDecimal) to it's encoded format.
     * @param value the logical value
     * @return the encoded value
     */
    public static byte[] fromLogical(Schema schema, BigDecimal value) {
        int schemaScale = scale(schema);
        if (value.scale() != schemaScale)
            throw new DataException(String.format(
                "Decimal value has mismatching scale for given Decimal schema. "
                    + "Schema has scale %d, value has scale %d.",
                schemaScale,
                value.scale()
            ));
        return value.unscaledValue().toByteArray();
    }

    public static BigDecimal toLogical(Schema schema, byte[] value) {
        return new BigDecimal(new BigInteger(value), scale(schema));
    }

    private static int scale(Schema schema) {
        String scaleString = schema.parameters().get(SCALE_FIELD);
        if (scaleString == null)
            throw new DataException("Invalid Decimal schema: scale parameter not found.");
        try {
            return Integer.parseInt(scaleString);
        } catch (NumberFormatException e) {
            throw new DataException("Invalid scale parameter found in Decimal schema: ", e);
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka ConnectSchema 源码

kafka Date 源码

kafka Field 源码

kafka Schema 源码

kafka SchemaAndValue 源码

kafka SchemaBuilder 源码

kafka SchemaProjector 源码

kafka Struct 源码

kafka Time 源码

kafka Timestamp 源码

0  赞