spark ParquetRowConverter 源码

  • 2022-10-20
spark ParquetRowConverter 代码


 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.spark.sql.execution.datasources.parquet

import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder
import java.time.{ZoneId, ZoneOffset}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.parquet.column.Dictionary
import{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.{GroupType, Type, Types}
import org.apache.parquet.schema.LogicalTypeAnnotation._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.collection.Utils

 * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
 * corresponding parent container. For example, a converter for a `StructType` field may set
 * converted values to a [[InternalRow]]; or a converter for array elements may append converted
 * values to an [[ArrayBuffer]].
private[parquet] trait ParentContainerUpdater {
  /** Called before a record field is being converted */
  def start(): Unit = ()

  /** Called after a record field is being converted */
  def end(): Unit = ()

  def set(value: Any): Unit = ()
  def setBoolean(value: Boolean): Unit = set(value)
  def setByte(value: Byte): Unit = set(value)
  def setShort(value: Short): Unit = set(value)
  def setInt(value: Int): Unit = set(value)
  def setLong(value: Long): Unit = set(value)
  def setFloat(value: Float): Unit = set(value)
  def setDouble(value: Double): Unit = set(value)

/** A no-op updater used for root converter (who doesn't have a parent). */
private[parquet] object NoopUpdater extends ParentContainerUpdater

private[parquet] trait HasParentContainerUpdater {
  def updater: ParentContainerUpdater

 * A convenient converter class for Parquet group types with a [[HasParentContainerUpdater]].
private[parquet] abstract class ParquetGroupConverter(val updater: ParentContainerUpdater)
  extends GroupConverter with HasParentContainerUpdater

 * Parquet converter for Parquet primitive types.  Note that not all Spark SQL atomic types
 * are handled by this converter.  Parquet primitive types are only a subset of those of Spark
 * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpdater)
  extends PrimitiveConverter with HasParentContainerUpdater {

  override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
  override def addInt(value: Int): Unit = updater.setInt(value)
  override def addLong(value: Long): Unit = updater.setLong(value)
  override def addFloat(value: Float): Unit = updater.setFloat(value)
  override def addDouble(value: Double): Unit = updater.setDouble(value)
  override def addBinary(value: Binary): Unit = updater.set(value.getBytes)

 * A [[ParquetRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s.
 * Since Catalyst `StructType` is also a Parquet record, this converter can be used as root
 * converter.  Take the following Parquet type as an example:
 * {{{
 *   message root {
 *     required int32 f1;
 *     optional group f2 {
 *       required double f21;
 *       optional binary f22 (utf8);
 *     }
 *   }
 * }}}
 * 5 converters will be created:
 * - a root [[ParquetRowConverter]] for [[org.apache.parquet.schema.MessageType]] `root`,
 * which contains:
 *   - a [[ParquetPrimitiveConverter]] for required
 *   [[org.apache.parquet.schema.LogicalTypeAnnotation.intType(32, true)]] field `f1`, and
 *   - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which contains:
 *     - a [[ParquetPrimitiveConverter]] for required
 *     [[org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE]] field `f21`, and
 *     - a [[ParquetStringConverter]] for optional
 *     [[org.apache.parquet.schema.LogicalTypeAnnotation.stringType()]] string field `f22`
 * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
 * any "parent" container.
 * @param schemaConverter A utility converter used to convert Parquet types to Catalyst types.
 * @param parquetType Parquet schema of Parquet records
 * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
 *        types should have been expanded.
 * @param convertTz the optional time zone to convert to int96 data
 * @param datetimeRebaseSpec the specification of rebasing date/timestamp from Julian to Proleptic
 *                           Gregorian calendar: mode + optional original time zone
 * @param int96RebaseSpec the specification of rebasing INT96 timestamp from Julian to Proleptic
 *                        Gregorian calendar
 * @param updater An updater which propagates converted field values to the parent container
private[parquet] class ParquetRowConverter(
    schemaConverter: ParquetToSparkSchemaConverter,
    parquetType: GroupType,
    catalystType: StructType,
    convertTz: Option[ZoneId],
    datetimeRebaseSpec: RebaseSpec,
    int96RebaseSpec: RebaseSpec,
    updater: ParentContainerUpdater)
  extends ParquetGroupConverter(updater) with Logging {

    parquetType.getFieldCount <= catalystType.length,
    s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
       |Parquet schema:
       |Catalyst schema:

    s"""User-defined types in Catalyst schema should have already been expanded:

    s"""Building row converter for the following schema:
       |Parquet form:
       |Catalyst form:

   * Updater used together with field converters within a [[ParquetRowConverter]].  It propagates
   * converted filed values to the `ordinal`-th cell in `currentRow`.
  private class RowUpdater(row: InternalRow, ordinal: Int) extends ParentContainerUpdater {
    override def set(value: Any): Unit = row(ordinal) = value
    override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value)
    override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
    override def setShort(value: Short): Unit = row.setShort(ordinal, value)
    override def setInt(value: Int): Unit = row.setInt(ordinal, value)
    override def setLong(value: Long): Unit = row.setLong(ordinal, value)
    override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
    override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)

  private[this] val currentRow = new SpecificInternalRow(

   * The [[InternalRow]] converted from an entire Parquet record.
  def currentRecord: InternalRow = {
    applyExistenceDefaultValuesToRow(catalystType, currentRow)

  private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInRead(
    datetimeRebaseSpec.mode, "Parquet")

  private val timestampRebaseFunc = DataSourceUtils.createTimestampRebaseFuncInRead(
    datetimeRebaseSpec, "Parquet")

  private val int96RebaseFunc = DataSourceUtils.createTimestampRebaseFuncInRead(
    int96RebaseSpec, "Parquet INT96")

  // Converters for each field.
  private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
    // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
    // to prevent throwing IllegalArgumentException when searching catalyst type's field index
    def nameToIndex: Map[String, Int] = Utils.toMapWithIndex(catalystType.fieldNames)

    val catalystFieldIdxByName = if (SQLConf.get.caseSensitiveAnalysis) {
    } else {

    // (SPARK-38094) parquet field ids, if exist, should be prioritized for matching
    val catalystFieldIdxByFieldId =
      if (SQLConf.get.parquetFieldIdReadEnabled && ParquetUtils.hasFieldIds(catalystType)) {
          .filter { case (f, _) => ParquetUtils.hasFieldId(f) }
          .map { case (f, idx) => (ParquetUtils.getFieldId(f), idx) }
      } else {
        Map.empty[Int, Int]
    // If any fields in the Catalyst result schema have associated existence default values,
    // maintain a boolean array to track which fields have been explicitly assigned for each row.
    if (catalystType.hasExistenceDefaultValues) {
      for (i <- 0 until catalystType.existenceDefaultValues.size) {
        catalystType.existenceDefaultsBitmask(i) =
          // Assume the schema for a Parquet file-based table contains N fields. Then if we later
          // run a command "ALTER TABLE t ADD COLUMN c DEFAULT <value>" on the Parquet table, this
          // adds one field to the Catalyst schema. Then if we query the old files with the new
          // Catalyst schema, we should only apply the existence default value to all columns >= N.
          if (i < parquetType.getFieldCount) {
          } else {
            catalystType.existenceDefaultValues(i) != null
    } { parquetField =>
      val catalystFieldIndex = Option(parquetField.getId).flatMap { fieldId =>
        // field has id, try to match by id first before falling back to match by name
      }.getOrElse {
        // field doesn't have id, just match by name
      val catalystField = catalystType(catalystFieldIndex)
      // Create a RowUpdater instance for converting Parquet objects to Catalyst rows.
      val rowUpdater: RowUpdater = new RowUpdater(currentRow, catalystFieldIndex)
      // Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
      newConverter(parquetField, catalystField.dataType, rowUpdater)

  // Updaters for each field.
  private[this] val fieldUpdaters: Array[ParentContainerUpdater] =

  override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)

  override def end(): Unit = {
    var i = 0
    while (i < fieldUpdaters.length) {
      i += 1

  override def start(): Unit = {
    var i = 0
    val numFields = currentRow.numFields
    while (i < numFields) {
      i += 1
    i = 0
    while (i < fieldUpdaters.length) {
      i += 1

   * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type
   * `catalystType`. Converted values are handled by `updater`.
  private def newConverter(
      parquetType: Type,
      catalystType: DataType,
      updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {

    def isUnsignedIntTypeMatched(bitWidth: Int): Boolean = {
      parquetType.getLogicalTypeAnnotation match {
        case i: IntLogicalTypeAnnotation if !i.isSigned => i.getBitWidth == bitWidth
        case _ => false

    catalystType match {
      case LongType if isUnsignedIntTypeMatched(32) =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =
      case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType |
        _: AnsiIntervalType =>
        new ParquetPrimitiveConverter(updater)

      case ByteType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =

      case ShortType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =

      // For INT32 backed decimals
      case _: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
          case decimalType: DecimalLogicalTypeAnnotation =>
            new ParquetIntDictionaryAwareDecimalConverter(
              decimalType.getPrecision, decimalType.getScale, updater)
          case _ =>
            // If the column is a plain INT32, we should pick the precision that can host the
            // largest INT32 value.
            new ParquetIntDictionaryAwareDecimalConverter(
              DecimalType.IntDecimal.precision, 0, updater)

      // For unsigned int64
      case _: DecimalType if isUnsignedIntTypeMatched(64) =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {

      // For INT64 backed decimals
      case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
          case decimalType: DecimalLogicalTypeAnnotation =>
            new ParquetLongDictionaryAwareDecimalConverter(
              decimalType.getPrecision, decimalType.getScale, updater)
          case _ =>
            // If the column is a plain INT64, we should pick the precision that can host the
            // largest INT64 value.
            new ParquetLongDictionaryAwareDecimalConverter(
              DecimalType.LongDecimal.precision, 0, updater)

      // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
      case t: DecimalType
        if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
           parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
          case decimalType: DecimalLogicalTypeAnnotation =>
            new ParquetBinaryDictionaryAwareDecimalConverter(
              decimalType.getPrecision, decimalType.getScale, updater)
          case _ =>
            throw QueryExecutionErrors.cannotCreateParquetConverterForTypeError(
              t, parquetType.toString)

      case t: DecimalType =>
        throw QueryExecutionErrors.cannotCreateParquetConverterForDecimalTypeError(
          t, parquetType.toString)

      case StringType =>
        new ParquetStringConverter(updater)

      // As long as the parquet type is INT64 timestamp, whether logical annotation
      // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
      case TimestampType
        if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
             .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {

      // As long as the parquet type is INT64 timestamp, whether logical annotation
      // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
      case TimestampType
        if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
            .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MILLIS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            val micros = DateTimeUtils.millisToMicros(value)

      // INT96 timestamp doesn't have a logical type, here we check the physical type instead.
      case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
        new ParquetPrimitiveConverter(updater) {
          // Converts nanosecond timestamps stored as INT96
          override def addBinary(value: Binary): Unit = {
            val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
            val gregorianMicros = int96RebaseFunc(julianMicros)
            val adjTime =, _, ZoneOffset.UTC))

      case TimestampNTZType
        if canReadAsTimestampNTZ(parquetType) &&
            .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
        new ParquetPrimitiveConverter(updater)

      case TimestampNTZType
        if canReadAsTimestampNTZ(parquetType) &&
            .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MILLIS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            val micros = DateTimeUtils.millisToMicros(value)

      case DateType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit = {

      // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
      // annotated by `LIST` or `MAP` should be interpreted as a required list of required
      // elements where the element type is the type of the field.
      case t: ArrayType
        if !parquetType.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation] =>
        if (parquetType.isPrimitive) {
          new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
        } else {
          new RepeatedGroupConverter(parquetType, t.elementType, updater)

      case t: ArrayType =>
        new ParquetArrayConverter(parquetType.asGroupType(), t, updater)

      case t: MapType =>
        new ParquetMapConverter(parquetType.asGroupType(), t, updater)

      case t: StructType =>
        val wrappedUpdater = {
          // SPARK-30338: avoid unnecessary InternalRow copying for nested structs:
          // There are two cases to handle here:
          //  1. Parent container is a map or array: we must make a deep copy of the mutable row
          //     because this converter may be invoked multiple times per Parquet input record
          //     (if the map or array contains multiple elements).
          //  2. Parent container is a struct: we don't need to copy the row here because either:
          //     (a) all ancestors are structs and therefore no copying is required because this
          //         converter will only be invoked once per Parquet input record, or
          //     (b) some ancestor is struct that is nested in a map or array and that ancestor's
          //         converter will perform deep-copying (which will recursively copy this row).
          if (updater.isInstanceOf[RowUpdater]) {
            // `updater` is a RowUpdater, implying that the parent container is a struct.
          } else {
            // `updater` is NOT a RowUpdater, implying that the parent container a map or array.
            new ParentContainerUpdater {
              override def set(value: Any): Unit = {
                updater.set(value.asInstanceOf[SpecificInternalRow].copy())  // deep copy
        new ParquetRowConverter(

      case t =>
        throw QueryExecutionErrors.cannotCreateParquetConverterForDataTypeError(
          t, parquetType.toString)

  // Only INT64 column with Timestamp logical annotation `isAdjustedToUTC=false`
  // can be read as Spark's TimestampNTZ type. This is to avoid mistakes in reading the timestamp
  // values.
  private def canReadAsTimestampNTZ(parquetType: Type): Boolean =
    schemaConverter.isTimestampNTZEnabled() &&
      parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 &&
      parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&

   * Parquet converter for strings. A dictionary is used to minimize string decoding cost.
  private final class ParquetStringConverter(updater: ParentContainerUpdater)
    extends ParquetPrimitiveConverter(updater) {

    private var expandedDictionary: Array[UTF8String] = null

    override def hasDictionarySupport: Boolean = true

    override def setDictionary(dictionary: Dictionary): Unit = {
      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>

    override def addValueFromDictionary(dictionaryId: Int): Unit = {

    override def addBinary(value: Binary): Unit = {
      // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we
      // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying
      // it.
      val buffer = value.toByteBuffer
      val offset = buffer.arrayOffset() + buffer.position()
      val numBytes = buffer.remaining()
      updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes))

   * Parquet converter for fixed-precision decimals.
  private abstract class ParquetDecimalConverter(
      precision: Int, scale: Int, updater: ParentContainerUpdater)
    extends ParquetPrimitiveConverter(updater) {

    protected var expandedDictionary: Array[Decimal] = _

    override def hasDictionarySupport: Boolean = true

    override def addValueFromDictionary(dictionaryId: Int): Unit = {

    // Converts decimals stored as INT32
    override def addInt(value: Int): Unit = {
      addLong(value: Long)

    // Converts decimals stored as INT64
    override def addLong(value: Long): Unit = {

    // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
    override def addBinary(value: Binary): Unit = {

    protected def decimalFromLong(value: Long): Decimal = {
      Decimal(value, precision, scale)

    protected def decimalFromBinary(value: Binary): Decimal = {
      if (precision <= Decimal.MAX_LONG_DIGITS) {
        // Constructs a `Decimal` with an unscaled `Long` value if possible.
        val unscaled = ParquetRowConverter.binaryToUnscaledLong(value)
        Decimal(unscaled, precision, scale)
      } else {
        // Otherwise, resorts to an unscaled `BigInteger` instead.
        Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale)

  private class ParquetIntDictionaryAwareDecimalConverter(
      precision: Int, scale: Int, updater: ParentContainerUpdater)
    extends ParquetDecimalConverter(precision, scale, updater) {

    override def setDictionary(dictionary: Dictionary): Unit = {
      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>

  private class ParquetLongDictionaryAwareDecimalConverter(
      precision: Int, scale: Int, updater: ParentContainerUpdater)
    extends ParquetDecimalConverter(precision, scale, updater) {

    override def setDictionary(dictionary: Dictionary): Unit = {
      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>

  private class ParquetBinaryDictionaryAwareDecimalConverter(
      precision: Int, scale: Int, updater: ParentContainerUpdater)
    extends ParquetDecimalConverter(precision, scale, updater) {

    override def setDictionary(dictionary: Dictionary): Unit = {
      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>

   * Parquet converter for arrays.  Spark SQL arrays are represented as Parquet lists.  Standard
   * Parquet lists are represented as a 3-level group annotated by `LIST`:
   * {{{
   *   <list-repetition> group <name> (LIST) {            <-- parquetSchema points here
   *     repeated group list {
   *       <element-repetition> <element-type> element;
   *     }
   *   }
   * }}}
   * The `parquetSchema` constructor argument points to the outermost group.
   * However, before this representation is standardized, some Parquet libraries/tools also use some
   * non-standard formats to represent list-like structures.  Backwards-compatibility rules for
   * handling these cases are described in Parquet format spec.
   * @see
  private final class ParquetArrayConverter(
      parquetSchema: GroupType,
      catalystSchema: ArrayType,
      updater: ParentContainerUpdater)
    extends ParquetGroupConverter(updater) {

    private[this] val currentArray = ArrayBuffer.empty[Any]

    private[this] val elementConverter: Converter = {
      val repeatedType = parquetSchema.getType(0)
      val elementType = catalystSchema.elementType

      // At this stage, we need to figure out if the repeated field maps to the element type or is
      // just the syntactic repeated group of the 3-level standard LIST layout. Take the following
      // Parquet LIST-annotated group type as an example:
      //    optional group f (LIST) {
      //      repeated group list {
      //        optional group element {
      //          optional int32 element;
      //        }
      //      }
      //    }
      // This type is ambiguous:
      // 1. When interpreted as a standard 3-level layout, the `list` field is just the syntactic
      //    group, and the entire type should be translated to:
      //      ARRAY<STRUCT<element: INT>>
      // 2. On the other hand, when interpreted as a non-standard 2-level layout, the `list` field
      //    represents the element type, and the entire type should be translated to:
      //      ARRAY<STRUCT<element: STRUCT<element: INT>>>
      // Here we try to convert field `list` into a Catalyst type to see whether the converted type
      // matches the Catalyst array element type.
      // If the guessed element type from the above does not match the Catalyst type (for example,
      // in case of schema evolution), we need to check if the repeated type matches one of the
      // backward-compatibility rules for legacy LIST types (see the link above).
      // If the element type does not match the Catalyst type and the underlying repeated type
      // does not belong to the legacy LIST type, then it is case 1; otherwise, it is case 2.
      // Since `convertField` method requires a Parquet `ColumnIO` as input, here we first create
      // a dummy message type which wraps the given repeated type, and then convert it to the
      // `ColumnIO` using Parquet API.
      val messageType = Types.buildMessage().addField(repeatedType).named("foo")
      val column = new ColumnIOFactory().getColumnIO(messageType)
      val guessedElementType = schemaConverter.convertField(column.getChild(0)).sparkType
      val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName)

      if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) || isLegacy) {
        // If the repeated field corresponds to the element type, creates a new converter using the
        // type of the repeated field.
        newConverter(repeatedType, elementType, new ParentContainerUpdater {
          override def set(value: Any): Unit = currentArray += value
      } else {
        // If the repeated field corresponds to the syntactic group in the standard 3-level Parquet
        // LIST layout, creates a new converter using the only child field of the repeated field.
        assert(!repeatedType.isPrimitive && repeatedType.asGroupType().getFieldCount == 1)
        new ElementConverter(repeatedType.asGroupType().getType(0), elementType)

    override def getConverter(fieldIndex: Int): Converter = elementConverter

    override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))

    override def start(): Unit = currentArray.clear()

    /** Array element converter */
    private final class ElementConverter(parquetType: Type, catalystType: DataType)
      extends GroupConverter {

      private var currentElement: Any = _

      private[this] val converter =
        newConverter(parquetType, catalystType, new ParentContainerUpdater {
          override def set(value: Any): Unit = currentElement = value

      override def getConverter(fieldIndex: Int): Converter = converter

      override def end(): Unit = currentArray += currentElement

      override def start(): Unit = currentElement = null

  /** Parquet converter for maps */
  private final class ParquetMapConverter(
      parquetType: GroupType,
      catalystType: MapType,
      updater: ParentContainerUpdater)
    extends ParquetGroupConverter(updater) {

    private[this] val currentKeys = ArrayBuffer.empty[Any]
    private[this] val currentValues = ArrayBuffer.empty[Any]

    private[this] val keyValueConverter = {
      val repeatedType = parquetType.getType(0).asGroupType()
      new KeyValueConverter(

    override def getConverter(fieldIndex: Int): Converter = keyValueConverter

    override def end(): Unit = {
      // The parquet map may contains null or duplicated map keys. When it happens, the behavior is
      // undefined.
      // TODO (SPARK-26174): disallow it with a config.
        new ArrayBasedMapData(
          new GenericArrayData(currentKeys.toArray),
          new GenericArrayData(currentValues.toArray)))

    override def start(): Unit = {

    /** Parquet converter for key-value pairs within the map. */
    private final class KeyValueConverter(
        parquetKeyType: Type,
        parquetValueType: Type,
        catalystKeyType: DataType,
        catalystValueType: DataType)
      extends GroupConverter {

      private var currentKey: Any = _

      private var currentValue: Any = _

      private[this] val converters = Array(
        // Converter for keys
        newConverter(parquetKeyType, catalystKeyType, new ParentContainerUpdater {
          override def set(value: Any): Unit = currentKey = value

        // Converter for values
        newConverter(parquetValueType, catalystValueType, new ParentContainerUpdater {
          override def set(value: Any): Unit = currentValue = value

      override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)

      override def end(): Unit = {
        currentKeys += currentKey
        currentValues += currentValue

      override def start(): Unit = {
        currentKey = null
        currentValue = null

  private trait RepeatedConverter {
    private[this] val currentArray = ArrayBuffer.empty[Any]

    protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater {
      override def start(): Unit = currentArray.clear()
      override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
      override def set(value: Any): Unit = currentArray += value

   * A primitive converter for converting unannotated repeated primitive values to required arrays
   * of required primitives values.
  private final class RepeatedPrimitiveConverter(
      parquetType: Type,
      catalystType: DataType,
      parentUpdater: ParentContainerUpdater)
    extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater {

    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)

    private[this] val elementConverter: PrimitiveConverter =
      newConverter(parquetType, catalystType, updater).asPrimitiveConverter()

    override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value)
    override def addInt(value: Int): Unit = elementConverter.addInt(value)
    override def addLong(value: Long): Unit = elementConverter.addLong(value)
    override def addFloat(value: Float): Unit = elementConverter.addFloat(value)
    override def addDouble(value: Double): Unit = elementConverter.addDouble(value)
    override def addBinary(value: Binary): Unit = elementConverter.addBinary(value)

    override def setDictionary(dict: Dictionary): Unit = elementConverter.setDictionary(dict)
    override def hasDictionarySupport: Boolean = elementConverter.hasDictionarySupport
    override def addValueFromDictionary(id: Int): Unit = elementConverter.addValueFromDictionary(id)

   * A group converter for converting unannotated repeated group values to required arrays of
   * required struct values.
  private final class RepeatedGroupConverter(
      parquetType: Type,
      catalystType: DataType,
      parentUpdater: ParentContainerUpdater)
    extends GroupConverter with HasParentContainerUpdater with RepeatedConverter {

    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)

    private[this] val elementConverter: GroupConverter =
      newConverter(parquetType, catalystType, updater).asGroupConverter()

    override def getConverter(field: Int): Converter = elementConverter.getConverter(field)
    override def end(): Unit = elementConverter.end()
    override def start(): Unit = elementConverter.start()

private[parquet] object ParquetRowConverter {
  def binaryToUnscaledLong(binary: Binary): Long = {
    // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
    // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
    // copying it.
    val buffer = binary.toByteBuffer
    val bytes = buffer.array()
    val start = buffer.arrayOffset() + buffer.position()
    val end = buffer.arrayOffset() + buffer.limit()

    var unscaled = 0L
    var i = start

    while (i < end) {
      unscaled = (unscaled << 8) | (bytes(i) & 0xff)
      i += 1

    val bits = 8 * (end - start)
    unscaled = (unscaled << (64 - bits)) >> (64 - bits)

  def binaryToSQLTimestamp(binary: Binary): Long = {
    assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" +
      s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.")
    val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
    val timeOfDayNanos = buffer.getLong
    val julianDay = buffer.getInt
    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)


