spark ddl 源码
spark ddl 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import java.util.Locale
import java.util.concurrent.TimeUnit._
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.errors.QueryExecutionErrors.hiveTableWithAnsiIntervalsError
import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceUtils, FileFormat, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.PartitioningUtils
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
/**
* A command for users to create a new database.
*
* It will issue an error message when the database with the same name already exists,
* unless 'ifNotExists' is true.
* The syntax of using this command in SQL is:
* {{{
* CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
* [COMMENT database_comment]
* [LOCATION database_directory]
* [WITH DBPROPERTIES (property_name=property_value, ...)];
* }}}
*/
case class CreateDatabaseCommand(
databaseName: String,
ifNotExists: Boolean,
path: Option[String],
comment: Option[String],
props: Map[String, String])
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
catalog.createDatabase(
CatalogDatabase(
databaseName,
comment.getOrElse(""),
path.map(CatalogUtils.stringToURI).getOrElse(catalog.getDefaultDBPath(databaseName)),
props),
ifNotExists)
Seq.empty[Row]
}
}
/**
* A command for users to remove a database from the system.
*
* 'ifExists':
* - true, if database_name doesn't exist, no action
* - false (default), if database_name doesn't exist, a warning message will be issued
* 'cascade':
* - true, the dependent objects are automatically dropped before dropping database.
* - false (default), it is in the Restrict mode. The database cannot be dropped if
* it is not empty. The inclusive tables must be dropped at first.
*
* The syntax of using this command in SQL is:
* {{{
* DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
* }}}
*/
case class DropDatabaseCommand(
databaseName: String,
ifExists: Boolean,
cascade: Boolean)
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
Seq.empty[Row]
}
}
/**
* A command for users to add new (key, value) pairs into DBPROPERTIES
* If the database does not exist, an error message will be issued to indicate the database
* does not exist.
* The syntax of using this command in SQL is:
* {{{
* ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
* }}}
*/
case class AlterDatabasePropertiesCommand(
databaseName: String,
props: Map[String, String])
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
catalog.alterDatabase(db.copy(properties = db.properties ++ props))
Seq.empty[Row]
}
}
/**
* A command for users to set new location path for a database
* If the database does not exist, an error message will be issued to indicate the database
* does not exist.
* The syntax of using this command in SQL is:
* {{{
* ALTER (DATABASE|SCHEMA) database_name SET LOCATION path
* }}}
*/
case class AlterDatabaseSetLocationCommand(databaseName: String, location: String)
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val oldDb = catalog.getDatabaseMetadata(databaseName)
catalog.alterDatabase(oldDb.copy(locationUri = CatalogUtils.stringToURI(location)))
Seq.empty[Row]
}
}
/**
* A command for users to show the name of the database, its comment (if one has been set), and its
* root location on the filesystem. When extended is true, it also shows the database's properties
* If the database does not exist, an error message will be issued to indicate the database
* does not exist.
* The syntax of using this command in SQL is
* {{{
* DESCRIBE DATABASE [EXTENDED] db_name
* }}}
*/
case class DescribeDatabaseCommand(
databaseName: String,
extended: Boolean,
override val output: Seq[Attribute])
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val dbMetadata: CatalogDatabase =
sparkSession.sessionState.catalog.getDatabaseMetadata(databaseName)
val allDbProperties = dbMetadata.properties
val result =
Row("Catalog Name", SESSION_CATALOG_NAME) ::
Row("Database Name", dbMetadata.name) ::
Row("Comment", dbMetadata.description) ::
Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri))::
Row("Owner", allDbProperties.getOrElse(PROP_OWNER, "")) :: Nil
if (extended) {
val properties = allDbProperties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
val propertiesStr =
if (properties.isEmpty) {
""
} else {
conf.redactOptions(properties).toSeq.sortBy(_._1).mkString("(", ", ", ")")
}
result :+ Row("Properties", propertiesStr)
} else {
result
}
}
}
/**
* Drops a table/view from the metastore and removes it if it is cached. This command does not drop
* temp views, which should be handled by [[DropTempViewCommand]].
*
* The syntax of this command is:
* {{{
* DROP TABLE [IF EXISTS] table_name;
* DROP VIEW [IF EXISTS] [db_name.]view_name;
* }}}
*/
case class DropTableCommand(
tableName: TableIdentifier,
ifExists: Boolean,
isView: Boolean,
purge: Boolean) extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (catalog.tableExists(tableName)) {
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
// issue an exception.
catalog.getTableMetadata(tableName).tableType match {
case CatalogTableType.VIEW if !isView =>
throw QueryCompilationErrors.cannotDropViewWithDropTableError()
case o if o != CatalogTableType.VIEW && isView =>
throw QueryCompilationErrors.cannotDropViewWithDropTableError()
case _ =>
}
try {
sparkSession.sharedState.cacheManager.uncacheQuery(
sparkSession.table(tableName), cascade = true)
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
catalog.refreshTable(tableName)
catalog.dropTable(tableName, ifExists, purge)
} else if (ifExists) {
// no-op
} else {
throw QueryCompilationErrors.noSuchTableError(
tableName.catalog.toSeq ++ tableName.database :+ tableName.table)
}
Seq.empty[Row]
}
}
case class DropTempViewCommand(ident: Identifier) extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(ident.namespace().isEmpty || ident.namespace().length == 1)
val nameParts = ident.namespace() :+ ident.name()
val catalog = sparkSession.sessionState.catalog
catalog.getRawLocalOrGlobalTempView(nameParts).foreach { view =>
val hasViewText = view.tableMeta.viewText.isDefined
sparkSession.sharedState.cacheManager.uncacheTableOrView(
sparkSession, nameParts, cascade = hasViewText)
view.refresh()
if (ident.namespace().isEmpty) {
catalog.dropTempView(ident.name())
} else {
catalog.dropGlobalTempView(ident.name())
}
}
Seq.empty[Row]
}
}
/**
* A command that sets table/view properties.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
* ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
* }}}
*/
case class AlterTableSetPropertiesCommand(
tableName: TableIdentifier,
properties: Map[String, String],
isView: Boolean)
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableRawMetadata(tableName)
// This overrides old properties and update the comment parameter of CatalogTable
// with the newly added/modified comment since CatalogTable also holds comment as its
// direct property.
val newTable = table.copy(
properties = table.properties ++ properties,
comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
catalog.alterTable(newTable)
catalog.invalidateCachedTable(tableName)
Seq.empty[Row]
}
}
/**
* A command that unsets table/view properties.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
* ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
* }}}
*/
case class AlterTableUnsetPropertiesCommand(
tableName: TableIdentifier,
propKeys: Seq[String],
ifExists: Boolean,
isView: Boolean)
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableRawMetadata(tableName)
if (!ifExists) {
propKeys.foreach { k =>
if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) {
throw QueryCompilationErrors.unsetNonExistentPropertyError(k, table.identifier)
}
}
}
// If comment is in the table property, we reset it to None
val tableComment = if (propKeys.contains(TableCatalog.PROP_COMMENT)) None else table.comment
val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
val newTable = table.copy(properties = newProperties, comment = tableComment)
catalog.alterTable(newTable)
catalog.invalidateCachedTable(tableName)
Seq.empty[Row]
}
}
/**
* A command to change the column for a table, only support changing the comment of a non-partition
* column for now.
*
* The syntax of using this command in SQL is:
* {{{
* ALTER TABLE table_identifier
* CHANGE [COLUMN] column_old_name column_new_name column_dataType [COMMENT column_comment]
* [FIRST | AFTER column_name];
* }}}
*/
case class AlterTableChangeColumnCommand(
tableName: TableIdentifier,
columnName: String,
newColumn: StructField) extends LeafRunnableCommand {
// TODO: support change column name/dataType/metadata/position.
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableRawMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
// Find the origin column from dataSchema by column name.
val originColumn = findColumnByName(table.dataSchema, columnName, resolver)
// Throw an AnalysisException if the column name/dataType is changed.
if (!columnEqual(originColumn, newColumn, resolver)) {
throw QueryCompilationErrors.alterTableChangeColumnNotSupportedForColumnTypeError(
originColumn, newColumn)
}
val newDataSchema = table.dataSchema.fields.map { field =>
if (field.name == originColumn.name) {
// Create a new column from the origin column with the new comment.
val withNewComment: StructField =
addComment(field, newColumn.getComment)
// Create a new column from the origin column with the new current default value.
if (newColumn.getCurrentDefaultValue().isDefined) {
if (newColumn.getCurrentDefaultValue().get.nonEmpty) {
val result: StructField =
addCurrentDefaultValue(withNewComment, newColumn.getCurrentDefaultValue())
// Check that the proposed default value parses and analyzes correctly, and that the
// type of the resulting expression is equivalent or coercible to the destination column
// type.
ResolveDefaultColumns.analyze(result, "ALTER TABLE ALTER COLUMN")
result
} else {
withNewComment.clearCurrentDefaultValue()
}
} else {
withNewComment
}
} else {
field
}
}
catalog.alterTableDataSchema(tableName, StructType(newDataSchema))
Seq.empty[Row]
}
// Find the origin column from schema by column name, throw an AnalysisException if the column
// reference is invalid.
private def findColumnByName(
schema: StructType, name: String, resolver: Resolver): StructField = {
schema.fields.collectFirst {
case field if resolver(field.name, name) => field
}.getOrElse(throw QueryCompilationErrors.cannotFindColumnError(name, schema.fieldNames))
}
// Add the comment to a column, if comment is empty, return the original column.
private def addComment(column: StructField, comment: Option[String]): StructField =
comment.map(column.withComment).getOrElse(column)
// Add the current default value to a column, if default value is empty, return the original
// column.
private def addCurrentDefaultValue(column: StructField, value: Option[String]): StructField =
value.map(column.withCurrentDefaultValue).getOrElse(column)
// Compare a [[StructField]] to another, return true if they have the same column
// name(by resolver) and dataType.
private def columnEqual(
field: StructField, other: StructField, resolver: Resolver): Boolean = {
resolver(field.name, other.name) && field.dataType == other.dataType
}
}
/**
* A command that sets the serde class and/or serde properties of a table/view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
* ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
* }}}
*/
case class AlterTableSerDePropertiesCommand(
tableName: TableIdentifier,
serdeClassName: Option[String],
serdeProperties: Option[Map[String, String]],
partSpec: Option[TablePartitionSpec])
extends LeafRunnableCommand {
// should never happen if we parsed things correctly
require(serdeClassName.isDefined || serdeProperties.isDefined,
"ALTER TABLE attempted to set neither serde class name nor serde properties")
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableRawMetadata(tableName)
// For datasource tables, disallow setting serde or specifying partition
if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
throw QueryCompilationErrors.alterTableSetSerdeForSpecificPartitionNotSupportedError()
}
if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
throw QueryCompilationErrors.alterTableSetSerdeNotSupportedError()
}
if (partSpec.isEmpty) {
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
properties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
val part = catalog.getPartition(table.identifier, spec)
val newPart = part.copy(storage = part.storage.copy(
serde = serdeClassName.orElse(part.storage.serde),
properties = part.storage.properties ++ serdeProperties.getOrElse(Map())))
catalog.alterPartitions(table.identifier, Seq(newPart))
}
Seq.empty[Row]
}
}
/**
* Add Partition in ALTER TABLE: add the table partitions.
*
* An error message will be issued if the partition exists, unless 'ifNotExists' is true.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
* PARTITION spec2 [LOCATION 'loc2']
* }}}
*/
case class AlterTableAddPartitionCommand(
tableName: TableIdentifier,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
ifNotExists: Boolean)
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION")
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionSchema,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
// inherit table storage format (possibly except for location)
CatalogTablePartition(normalizedSpec, table.storage.copy(
locationUri = location.map(CatalogUtils.stringToURI)))
}
// Hive metastore may not have enough memory to handle millions of partitions in single RPC.
// Also the request to metastore times out when adding lot of partitions in one shot.
// we should split them into smaller batches
val batchSize = conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
parts.iterator.grouped(batchSize).foreach { batch =>
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
}
sparkSession.catalog.refreshTable(table.identifier.quotedString)
if (table.stats.nonEmpty && sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
// Updating table stats only if new partition is not empty
val addedSize = CommandUtils.calculateMultipleLocationSizes(sparkSession, table.identifier,
parts.map(_.storage.locationUri)).sum
if (addedSize > 0) {
val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize)
catalog.alterTableStats(table.identifier, Some(newStats))
catalog.alterPartitions(table.identifier, parts)
}
} else {
// Re-calculating of table size including all partitions
CommandUtils.updateTableStats(sparkSession, table)
}
Seq.empty[Row]
}
}
/**
* Alter a table partition's spec.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
* }}}
*/
case class AlterTableRenamePartitionCommand(
tableName: TableIdentifier,
oldPartition: TablePartitionSpec,
newPartition: TablePartitionSpec)
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION")
val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
oldPartition,
table.partitionSchema,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
val normalizedNewPartition = PartitioningUtils.normalizePartitionSpec(
newPartition,
table.partitionSchema,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
catalog.renamePartitions(
tableName, Seq(normalizedOldPartition), Seq(normalizedNewPartition))
sparkSession.catalog.refreshTable(table.identifier.quotedString)
Seq.empty[Row]
}
}
/**
* Drop Partition in ALTER TABLE: to drop a particular partition for a table.
*
* This removes the data and metadata for this partition.
* The data is actually moved to the .Trash/Current directory if Trash is configured,
* unless 'purge' is true, but the metadata is completely lost.
* An error message will be issued if the partition does not exist, unless 'ifExists' is true.
* Note: purge is always false when the target is a view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* }}}
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionSchema,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
}
catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
retainData = retainData)
sparkSession.catalog.refreshTable(table.identifier.quotedString)
CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]
}
}
case class PartitionStatistics(numFiles: Int, totalSize: Long)
/**
* Repair a table by recovering all the partition in the directory of the table and
* update the catalog.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table RECOVER PARTITIONS;
* MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS];
* }}}
*/
case class RepairTableCommand(
tableName: TableIdentifier,
enableAddPartitions: Boolean,
enableDropPartitions: Boolean,
cmd: String = "MSCK REPAIR TABLE") extends LeafRunnableCommand {
// These are list of statistics that can be collected quickly without requiring a scan of the data
// see https://github.com/apache/hive/blob/master/
// common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
val NUM_FILES = "numFiles"
val TOTAL_SIZE = "totalSize"
val DDL_TIME = "transient_lastDdlTime"
private def getPathFilter(hadoopConf: Configuration): PathFilter = {
// Dummy jobconf to get to the pathFilter defined in configuration
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
path: Path => {
val name = path.getName
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
pathFilter == null || pathFilter.accept(path)
} else {
false
}
}
}
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableRawMetadata(tableName)
val tableIdentWithDB = table.identifier.quotedString
if (table.partitionColumnNames.isEmpty) {
throw QueryCompilationErrors.cmdOnlyWorksOnPartitionedTablesError(cmd, tableIdentWithDB)
}
if (table.storage.locationUri.isEmpty) {
throw QueryCompilationErrors.cmdOnlyWorksOnTableWithLocationError(cmd, tableIdentWithDB)
}
val root = new Path(table.location)
logInfo(s"Recover all the partitions in $root")
val hadoopConf = spark.sessionState.newHadoopConf()
val fs = root.getFileSystem(hadoopConf)
val droppedAmount = if (enableDropPartitions) {
dropPartitions(catalog, fs)
} else 0
val addedAmount = if (enableAddPartitions) {
val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD)
val pathFilter = getPathFilter(hadoopConf)
val evalPool = ThreadUtils.newForkJoinPool("RepairTableCommand", 8)
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
try {
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
} finally {
evalPool.shutdown()
}
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")
val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
} else {
Map.empty[String, PartitionStatistics]
}
logInfo(s"Finished to gather the fast stats for all $total partitions.")
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
total
} else 0
// Updates the table to indicate that its partition metadata is stored in the Hive metastore.
// This is always the case for Hive format tables, but is not true for Datasource tables created
// before Spark 2.1 unless they are converted via `msck repair table`.
spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
try {
spark.catalog.refreshTable(tableIdentWithDB)
} catch {
case NonFatal(e) =>
logError(s"Cannot refresh the table '$tableIdentWithDB'. A query of the table " +
"might return wrong result if the table was cached. To avoid such issue, you should " +
"uncache the table manually via the UNCACHE TABLE command after table recovering will " +
"complete fully.", e)
}
logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).")
Seq.empty[Row]
}
private def scanPartitions(
spark: SparkSession,
fs: FileSystem,
filter: PathFilter,
path: Path,
spec: TablePartitionSpec,
partitionNames: Seq[String],
threshold: Int,
resolver: Resolver,
evalTaskSupport: ForkJoinTaskSupport): Seq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
val statuses = fs.listStatus(path, filter)
val statusPar: Seq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
// parallelize the list of partitions here, then we can have better parallelism later.
val parArray = new ParVector(statuses.toVector)
parArray.tasksupport = evalTaskSupport
parArray.seq
} else {
statuses
}
statusPar.flatMap { st =>
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
val columnName = ExternalCatalogUtils.unescapePathName(ps(0))
// TODO: Validate the value
val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
} else {
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Seq.empty
}
} else {
logWarning(s"ignore ${new Path(path, name)}")
Seq.empty
}
}
}
private def gatherPartitionStats(
spark: SparkSession,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)],
fs: FileSystem,
pathFilter: PathFilter,
threshold: Int): Map[String, PartitionStatistics] = {
if (partitionSpecsAndLocs.length > threshold) {
val hadoopConf = spark.sessionState.newHadoopConf()
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(serializedPaths.length,
Math.min(spark.sparkContext.defaultParallelism, 10000))
// gather the fast stats for all the partitions otherwise Hive metastore will list all the
// files for all the new partitions in sequential way, which is super slow.
logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
spark.sparkContext.parallelize(serializedPaths, numParallelism)
.mapPartitions { paths =>
val pathFilter = getPathFilter(serializableConfiguration.value)
paths.map(new Path(_)).map{ path =>
val fs = path.getFileSystem(serializableConfiguration.value)
val statuses = fs.listStatus(path, pathFilter)
(path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
}
}.collectAsMap().toMap
} else {
partitionSpecsAndLocs.map { case (_, location) =>
val statuses = fs.listStatus(location, pathFilter)
(location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
}.toMap
}
}
private def addPartitions(
spark: SparkSession,
table: CatalogTable,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)],
partitionStats: Map[String, PartitionStatistics]): Unit = {
val total = partitionSpecsAndLocs.length
var done = 0L
// Hive metastore may not have enough memory to handle millions of partitions in single RPC,
// we should split them into smaller batches. Since Hive client is not thread safe, we cannot
// do this in parallel.
val batchSize = spark.conf.get(SQLConf.ADD_PARTITION_BATCH_SIZE)
partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
val parts = batch.map { case (spec, location) =>
val params = partitionStats.get(location.toString).map {
case PartitionStatistics(numFiles, totalSize) =>
// This two fast stat could prevent Hive metastore to list the files again.
Map(NUM_FILES -> numFiles.toString,
TOTAL_SIZE -> totalSize.toString,
// Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
// see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
DDL_TIME -> now.toString)
}.getOrElse(Map.empty)
// inherit table storage format (possibly except for location)
CatalogTablePartition(
spec,
table.storage.copy(locationUri = Some(location.toUri)),
params)
}
spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
done += parts.length
logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
}
}
// Drops the partitions that do not exist in the file system
private def dropPartitions(catalog: SessionCatalog, fs: FileSystem): Int = {
val dropPartSpecs = ThreadUtils.parmap(
catalog.listPartitions(tableName),
"RepairTableCommand: non-existing partitions",
maxThreads = 8) { partition =>
partition.storage.locationUri.flatMap { uri =>
if (fs.exists(new Path(uri))) None else Some(partition.spec)
}
}.flatten
catalog.dropPartitions(
tableName,
dropPartSpecs,
ignoreIfNotExists = true,
purge = false,
// Since we have already checked that partition directories do not exist, we can avoid
// additional calls to the file system at the catalog side by setting this flag.
retainData = true)
dropPartSpecs.length
}
}
/**
* A command that sets the location of a table or a partition.
*
* For normal tables, this just sets the location URI in the table/partition's storage format.
* For datasource tables, this sets a "path" parameter in the table/partition's serde properties.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc";
* }}}
*/
case class AlterTableSetLocationCommand(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
location: String)
extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val locUri = CatalogUtils.stringToURI(location)
partitionSpec match {
case Some(spec) =>
DDLUtils.verifyPartitionProviderIsHive(
sparkSession, table, "ALTER TABLE ... SET LOCATION")
// Partition spec is specified, so we set the location only for this partition
val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionSchema,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
val part = catalog.getPartition(table.identifier, normalizedSpec)
val newPart = part.copy(storage = part.storage.copy(locationUri = Some(locUri)))
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
catalog.alterTable(table.withNewStorage(locationUri = Some(locUri)))
}
sparkSession.catalog.refreshTable(table.identifier.quotedString)
CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]
}
}
object DDLUtils extends Logging {
val HIVE_PROVIDER = "hive"
def isHiveTable(table: CatalogTable): Boolean = {
isHiveTable(table.provider)
}
def isHiveTable(provider: Option[String]): Boolean = {
provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
}
def isDatasourceTable(table: CatalogTable): Boolean = {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}
def readHiveTable(table: CatalogTable): HiveTableRelation = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}
/**
* Throws a standard error for actions that require partitionProvider = hive.
*/
def verifyPartitionProviderIsHive(
spark: SparkSession, table: CatalogTable, action: String): Unit = {
val tableName = table.identifier.table
if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
throw QueryCompilationErrors
.actionNotAllowedOnTableWithFilesourcePartitionManagementDisabledError(action, tableName)
}
if (!table.tracksPartitionsInCatalog && isDatasourceTable(table)) {
throw QueryCompilationErrors.actionNotAllowedOnTableSincePartitionMetadataNotStoredError(
action, tableName)
}
}
/**
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
* issue an exception [[AnalysisException]].
*
* Note: temporary views can be altered by both ALTER VIEW and ALTER TABLE commands,
* since temporary views can be also created by CREATE TEMPORARY TABLE. In the future,
* when we decided to drop the support, we should disallow users to alter temporary views
* by ALTER TABLE.
*/
def verifyAlterTableType(
catalog: SessionCatalog,
tableMetadata: CatalogTable,
isView: Boolean): Unit = {
if (!catalog.isTempView(tableMetadata.identifier)) {
tableMetadata.tableType match {
case CatalogTableType.VIEW if !isView =>
throw QueryCompilationErrors.cannotAlterViewWithAlterTableError()
case o if o != CatalogTableType.VIEW && isView =>
throw QueryCompilationErrors.cannotAlterTableWithAlterViewError()
case _ =>
}
}
}
private[sql] def checkTableColumns(table: CatalogTable): Unit = {
checkTableColumns(table, table.dataSchema)
}
// Checks correctness of table's column names and types.
private[sql] def checkTableColumns(table: CatalogTable, schema: StructType): Unit = {
table.provider.foreach {
_.toLowerCase(Locale.ROOT) match {
case HIVE_PROVIDER =>
val serde = table.storage.serde
if (schema.exists(_.dataType.isInstanceOf[AnsiIntervalType])) {
throw hiveTableWithAnsiIntervalsError(table.identifier.toString)
} else if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
checkDataColNames("orc", schema)
} else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
serde == Some("parquet.hive.serde.ParquetHiveSerDe") ||
serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) {
checkDataColNames("parquet", schema)
} else if (serde == HiveSerDe.sourceToSerDe("avro").get.serde) {
checkDataColNames("avro", schema)
}
case "parquet" => checkDataColNames("parquet", schema)
case "orc" => checkDataColNames("orc", schema)
case "avro" => checkDataColNames("avro", schema)
case _ =>
}
}
}
def checkDataColNames(provider: String, schema: StructType): Unit = {
val source = try {
DataSource.lookupDataSource(provider, SQLConf.get).getConstructor().newInstance()
} catch {
case e: Throwable =>
logError(s"Failed to find data source: $provider when check data column names.", e)
return
}
source match {
case f: FileFormat => DataSourceUtils.checkFieldNames(f, schema)
case f: FileDataSourceV2 =>
DataSourceUtils.checkFieldNames(f.fallbackFileFormat.newInstance(), schema)
case _ =>
}
}
/**
* Throws exception if outputPath tries to overwrite inputpath.
*/
def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
val inputPaths = query.collect {
case LogicalRelation(r: HadoopFsRelation, _, _, _) =>
r.location.rootPaths
}.flatten
if (inputPaths.contains(outputPath)) {
throw QueryCompilationErrors.cannotOverwritePathBeingReadFromError()
}
}
}
相关信息
相关文章
spark AnalyzePartitionCommand 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦