spark HiveOptions 源码
spark HiveOptions 代码
文件路径:/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution
import java.util.Locale
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.orc.OrcConf.COMPRESS
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.orc.OrcOptions
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.internal.SQLConf
/**
* Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive
* serde/format information from these options.
*/
class HiveOptions(@transient private val parameters: CaseInsensitiveMap[String])
extends Serializable {
import HiveOptions._
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
val fileFormat = parameters.get(FILE_FORMAT).map(_.toLowerCase(Locale.ROOT))
val inputFormat = parameters.get(INPUT_FORMAT)
val outputFormat = parameters.get(OUTPUT_FORMAT)
if (inputFormat.isDefined != outputFormat.isDefined) {
throw new IllegalArgumentException("Cannot specify only inputFormat or outputFormat, you " +
"have to specify both of them.")
}
def hasInputOutputFormat: Boolean = inputFormat.isDefined
if (fileFormat.isDefined && inputFormat.isDefined) {
throw new IllegalArgumentException("Cannot specify fileFormat and inputFormat/outputFormat " +
"together for Hive data source.")
}
val serde = parameters.get(SERDE)
if (fileFormat.isDefined && serde.isDefined) {
if (!Set("sequencefile", "textfile", "rcfile").contains(fileFormat.get)) {
throw new IllegalArgumentException(
s"fileFormat '${fileFormat.get}' already specifies a serde.")
}
}
val containsDelimiters = delimiterOptions.keys.exists(parameters.contains)
if (containsDelimiters) {
if (serde.isDefined) {
throw new IllegalArgumentException("Cannot specify delimiters with a custom serde.")
}
if (fileFormat.isEmpty) {
throw new IllegalArgumentException("Cannot specify delimiters without fileFormat.")
}
if (fileFormat.get != "textfile") {
throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " +
s"with fileFormat 'textfile', not ${fileFormat.get}.")
}
}
for (lineDelim <- parameters.get("lineDelim") if lineDelim != "\n") {
throw new IllegalArgumentException("Hive data source only support newline '\\n' as " +
s"line delimiter, but given: $lineDelim.")
}
def serdeProperties: Map[String, String] = parameters.filterKeys {
k => !lowerCasedOptionNames.contains(k.toLowerCase(Locale.ROOT))
}.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }.toMap
}
object HiveOptions {
private val lowerCasedOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
lowerCasedOptionNames += name.toLowerCase(Locale.ROOT)
name
}
val FILE_FORMAT = newOption("fileFormat")
val INPUT_FORMAT = newOption("inputFormat")
val OUTPUT_FORMAT = newOption("outputFormat")
val SERDE = newOption("serde")
// A map from the public delimiter option keys to the underlying Hive serde property keys.
val delimiterOptions = Map(
"fieldDelim" -> "field.delim",
"escapeDelim" -> "escape.delim",
// The following typo is inherited from Hive...
"collectionDelim" -> "colelction.delim",
"mapkeyDelim" -> "mapkey.delim",
"lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v }
def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = {
val tableProps = tableInfo.getProperties.asScala.toMap
tableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match {
case formatName if formatName.endsWith("parquetoutputformat") =>
val compressionCodec = new ParquetOptions(tableProps, sqlConf).compressionCodecClassName
Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
case formatName if formatName.endsWith("orcoutputformat") =>
val compressionCodec = new OrcOptions(tableProps, sqlConf).compressionCodec
Option((COMPRESS.getAttribute, compressionCodec))
case _ => None
}
}
}
相关信息
相关文章
spark CreateHiveTableAsSelectCommand 源码
spark HiveScriptTransformationExec 源码
spark InsertIntoHiveDirCommand 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦