spark DescribeTableExec 源码
spark DescribeTableExec 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.IdentityTransform
case class DescribeTableExec(
output: Seq[Attribute],
table: Table,
isExtended: Boolean) extends LeafV2CommandExec {
override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()
addSchema(rows)
addPartitioning(rows)
if (isExtended) {
addMetadataColumns(rows)
addTableDetails(rows)
}
rows.toSeq
}
private def addTableDetails(rows: ArrayBuffer[InternalRow]): Unit = {
rows += emptyRow()
rows += toCatalystRow("# Detailed Table Information", "", "")
rows += toCatalystRow("Name", table.name(), "")
val tableType = if (table.properties().containsKey(TableCatalog.PROP_EXTERNAL)) {
CatalogTableType.EXTERNAL.name
} else {
CatalogTableType.MANAGED.name
}
rows += toCatalystRow("Type", tableType, "")
CatalogV2Util.TABLE_RESERVED_PROPERTIES
.filterNot(_ == TableCatalog.PROP_EXTERNAL)
.foreach(propKey => {
if (table.properties.containsKey(propKey)) {
rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "")
}
})
val properties =
conf.redactOptions(table.properties.asScala.toMap).toList
.filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1))
.sortBy(_._1).map {
case (key, value) => key + "=" + value
}.mkString("[", ",", "]")
rows += toCatalystRow("Table Properties", properties, "")
}
private def addSchema(rows: ArrayBuffer[InternalRow]): Unit = {
rows ++= table.schema.map{ column =>
toCatalystRow(
column.name, column.dataType.simpleString, column.getComment().orNull)
}
}
private def addMetadataColumns(rows: ArrayBuffer[InternalRow]): Unit = table match {
case hasMeta: SupportsMetadataColumns if hasMeta.metadataColumns.nonEmpty =>
rows += emptyRow()
rows += toCatalystRow("# Metadata Columns", "", "")
rows ++= hasMeta.metadataColumns.map { column =>
toCatalystRow(
column.name,
column.dataType.simpleString,
Option(column.comment()).getOrElse(""))
}
case _ =>
}
private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
if (table.partitioning.nonEmpty) {
val partitionColumnsOnly = table.partitioning.forall(t => t.isInstanceOf[IdentityTransform])
if (partitionColumnsOnly) {
rows += toCatalystRow("# Partition Information", "", "")
rows += toCatalystRow(s"# ${output(0).name}", output(1).name, output(2).name)
rows ++= table.partitioning
.map(_.asInstanceOf[IdentityTransform].ref.fieldNames())
.map { fieldNames =>
val nestedField = table.schema.findNestedField(fieldNames)
assert(nestedField.isDefined,
s"Not found the partition column ${fieldNames.map(quoteIfNeeded).mkString(".")} " +
s"in the table schema ${table.schema().catalogString}.")
nestedField.get
}.map { case (path, field) =>
toCatalystRow(
(path :+ field.name).map(quoteIfNeeded(_)).mkString("."),
field.dataType.simpleString,
field.getComment().orNull)
}
} else {
rows += emptyRow()
rows += toCatalystRow("# Partitioning", "", "")
rows ++= table.partitioning.zipWithIndex.map {
case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "")
}
}
}
}
private def emptyRow(): InternalRow = toCatalystRow("", "", "")
}
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦