kafka SchemaProjector 源码

  • 2022-10-20
  • 浏览 (392)

kafka SchemaProjector 代码

文件路径:/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.data;

import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.errors.SchemaProjectorException;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
 * <p>
 *     SchemaProjector is utility to project a value between compatible schemas and throw exceptions
 *     when non compatible schemas are provided.
 * </p>
 */

public class SchemaProjector {

    private static Set<AbstractMap.SimpleImmutableEntry<Type, Type>> promotable = new HashSet<>();

    static {
        Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, Type.INT64, Type.FLOAT32, Type.FLOAT64};
        for (int i = 0; i < promotableTypes.length; ++i) {
            for (int j = i; j < promotableTypes.length; ++j) {
                promotable.add(new AbstractMap.SimpleImmutableEntry<>(promotableTypes[i], promotableTypes[j]));
            }
        }
    }

    /**
     * This method project a value between compatible schemas and throw exceptions when non compatible schemas are provided
     * @param source the schema used to construct the record
     * @param record the value to project from source schema to target schema
     * @param target the schema to project the record to
     * @return the projected value with target schema
     * @throws SchemaProjectorException if the target schema is not optional and does not have a default value
     */
    public static Object project(Schema source, Object record, Schema target) throws SchemaProjectorException {
        checkMaybeCompatible(source, target);
        if (source.isOptional() && !target.isOptional()) {
            if (target.defaultValue() != null) {
                if (record != null) {
                    return projectRequiredSchema(source, record, target);
                } else {
                    return target.defaultValue();
                }
            } else {
                throw new SchemaProjectorException("Writer schema is optional, however, target schema does not provide a default value.");
            }
        } else {
            if (record != null) {
                return projectRequiredSchema(source, record, target);
            } else {
                return null;
            }
        }
    }

    private static Object projectRequiredSchema(Schema source, Object record, Schema target) throws SchemaProjectorException {
        switch (target.type()) {
            case INT8:
            case INT16:
            case INT32:
            case INT64:
            case FLOAT32:
            case FLOAT64:
            case BOOLEAN:
            case BYTES:
            case STRING:
                return projectPrimitive(source, record, target);
            case STRUCT:
                return projectStruct(source, (Struct) record, target);
            case ARRAY:
                return projectArray(source, record, target);
            case MAP:
                return projectMap(source, record, target);
        }
        return null;
    }

    private static Object projectStruct(Schema source, Struct sourceStruct, Schema target) throws SchemaProjectorException {
        Struct targetStruct = new Struct(target);
        for (Field targetField : target.fields()) {
            String fieldName = targetField.name();
            Field sourceField = source.field(fieldName);
            if (sourceField != null) {
                Object sourceFieldValue = sourceStruct.get(fieldName);
                try {
                    Object targetFieldValue = project(sourceField.schema(), sourceFieldValue, targetField.schema());
                    targetStruct.put(fieldName, targetFieldValue);
                } catch (SchemaProjectorException e) {
                    throw new SchemaProjectorException("Error projecting " + sourceField.name(), e);
                }
            } else if (targetField.schema().isOptional()) {
                // Ignore missing field
            } else if (targetField.schema().defaultValue() != null) {
                targetStruct.put(fieldName, targetField.schema().defaultValue());
            } else {
                throw new SchemaProjectorException("Required field `" +  fieldName + "` is missing from source schema: " + source);
            }
        }
        return targetStruct;
    }


    private static void checkMaybeCompatible(Schema source, Schema target) {
        if (source.type() != target.type() && !isPromotable(source.type(), target.type())) {
            throw new SchemaProjectorException("Schema type mismatch. source type: " + source.type() + " and target type: " + target.type());
        } else if (!Objects.equals(source.name(), target.name())) {
            throw new SchemaProjectorException("Schema name mismatch. source name: " + source.name() + " and target name: " + target.name());
        } else if (!Objects.equals(source.parameters(), target.parameters())) {
            throw new SchemaProjectorException("Schema parameters not equal. source parameters: " + source.parameters() + " and target parameters: " + target.parameters());
        }
    }

    private static Object projectArray(Schema source, Object record, Schema target) throws SchemaProjectorException {
        List<?> array = (List<?>) record;
        List<Object> retArray = new ArrayList<>();
        for (Object entry : array) {
            retArray.add(project(source.valueSchema(), entry, target.valueSchema()));
        }
        return retArray;
    }

    private static Object projectMap(Schema source, Object record, Schema target) throws SchemaProjectorException {
        Map<?, ?> map = (Map<?, ?>) record;
        Map<Object, Object> retMap = new HashMap<>();
        for (Map.Entry<?, ?> entry : map.entrySet()) {
            Object key = entry.getKey();
            Object value = entry.getValue();
            Object retKey = project(source.keySchema(), key, target.keySchema());
            Object retValue = project(source.valueSchema(), value, target.valueSchema());
            retMap.put(retKey, retValue);
        }
        return retMap;
    }

    private static Object projectPrimitive(Schema source, Object record, Schema target) throws SchemaProjectorException {
        assert source.type().isPrimitive();
        assert target.type().isPrimitive();
        Object result;
        if (isPromotable(source.type(), target.type()) && record instanceof Number) {
            Number numberRecord = (Number) record;
            switch (target.type()) {
                case INT8:
                    result = numberRecord.byteValue();
                    break;
                case INT16:
                    result = numberRecord.shortValue();
                    break;
                case INT32:
                    result = numberRecord.intValue();
                    break;
                case INT64:
                    result = numberRecord.longValue();
                    break;
                case FLOAT32:
                    result = numberRecord.floatValue();
                    break;
                case FLOAT64:
                    result = numberRecord.doubleValue();
                    break;
                default:
                    throw new SchemaProjectorException("Not promotable type.");
            }
        } else {
            result = record;
        }
        return result;
    }

    private static boolean isPromotable(Type sourceType, Type targetType) {
        return promotable.contains(new AbstractMap.SimpleImmutableEntry<>(sourceType, targetType));
    }
}

相关信息

kafka 源码目录

相关文章

kafka ConnectSchema 源码

kafka Date 源码

kafka Decimal 源码

kafka Field 源码

kafka Schema 源码

kafka SchemaAndValue 源码

kafka SchemaBuilder 源码

kafka Struct 源码

kafka Time 源码

kafka Timestamp 源码

0  赞