spark CatalogFileIndex 源码
spark CatalogFileIndex 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType
/**
* A [[FileIndex]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param table the metadata of the table
* @param sizeInBytes the table's data size in bytes
*/
class CatalogFileIndex(
sparkSession: SparkSession,
val table: CatalogTable,
override val sizeInBytes: Long) extends FileIndex {
protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf()
/** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */
private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
assert(table.identifier.database.isDefined,
"The table identifier must be qualified in CatalogFileIndex")
private val baseLocation: Option[URI] = table.storage.locationUri
override def partitionSchema: StructType = table.partitionSchema
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(partitionFilters).listFiles(Nil, dataFilters)
}
override def refresh(): Unit = fileStatusCache.invalidateAll()
/**
* Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
* specified by the given partition-pruning filters.
*
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val startTime = System.nanoTime()
val selectedPartitions = ExternalCatalogUtils.listPartitionsByFilter(
sparkSession.sessionState.conf, sparkSession.sessionState.catalog, table, filters)
val partitions = selectedPartitions.map { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
new InMemoryFileIndex(sparkSession,
rootPathsSpecified = partitionSpec.partitions.map(_.path),
parameters = Map.empty,
userSpecifiedSchema = Some(partitionSpec.partitionColumns),
fileStatusCache = fileStatusCache,
userSpecifiedPartitionSpec = Some(partitionSpec),
metadataOpsTimeNs = Some(timeNs))
} else {
new InMemoryFileIndex(sparkSession, rootPaths, parameters = table.storage.properties,
userSpecifiedSchema = None, fileStatusCache = fileStatusCache)
}
}
override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
// `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
// implement `equals` and `hashCode` here, to make it work with cache lookup.
override def equals(o: Any): Boolean = o match {
case other: CatalogFileIndex => this.table.identifier == other.table.identifier
case _ => false
}
override def hashCode(): Int = table.identifier.hashCode()
}
相关信息
相关文章
spark AggregatePushDownUtils 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦