spark ConfigBuilder 源码
spark ConfigBuilder 代码
文件路径:/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.config
import java.util.concurrent.TimeUnit
import java.util.regex.PatternSyntaxException
import scala.util.matching.Regex
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.util.Utils
private object ConfigHelpers {
def toNumber[T](s: String, converter: String => T, key: String, configType: String): T = {
try {
converter(s.trim)
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"$key should be $configType, but was $s")
}
}
def toBoolean(s: String, key: String): Boolean = {
try {
s.trim.toBoolean
} catch {
case _: IllegalArgumentException =>
throw new IllegalArgumentException(s"$key should be boolean, but was $s")
}
}
def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
Utils.stringToSeq(str).map(converter)
}
def seqToString[T](v: Seq[T], stringConverter: T => String): String = {
v.map(stringConverter).mkString(",")
}
def timeFromString(str: String, unit: TimeUnit): Long = JavaUtils.timeStringAs(str, unit)
def timeToString(v: Long, unit: TimeUnit): String = TimeUnit.MILLISECONDS.convert(v, unit) + "ms"
def byteFromString(str: String, unit: ByteUnit): Long = {
val (input, multiplier) =
if (str.length() > 0 && str.charAt(0) == '-') {
(str.substring(1), -1)
} else {
(str, 1)
}
multiplier * JavaUtils.byteStringAs(input, unit)
}
def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b"
def regexFromString(str: String, key: String): Regex = {
try str.r catch {
case e: PatternSyntaxException =>
throw new IllegalArgumentException(s"$key should be a regex, but was $str", e)
}
}
}
/**
* A type-safe config builder. Provides methods for transforming the input data (which can be
* used, e.g., for validation) and creating the final config entry.
*
* One of the methods that return a [[ConfigEntry]] must be called to create a config entry that
* can be used with [[SparkConf]].
*/
private[spark] class TypedConfigBuilder[T](
val parent: ConfigBuilder,
val converter: String => T,
val stringConverter: T => String) {
import ConfigHelpers._
def this(parent: ConfigBuilder, converter: String => T) = {
this(parent, converter, Option(_).map(_.toString).orNull)
}
/** Apply a transformation to the user-provided values of the config entry. */
def transform(fn: T => T): TypedConfigBuilder[T] = {
new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
}
/** Checks if the user-provided value for the config matches the validator. */
def checkValue(validator: T => Boolean, errorMsg: String): TypedConfigBuilder[T] = {
transform { v =>
if (!validator(v)) {
throw new IllegalArgumentException(s"'$v' in ${parent.key} is invalid. $errorMsg")
}
v
}
}
/** Check that user-provided values for the config match a pre-defined set. */
def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
transform { v =>
if (!validValues.contains(v)) {
throw new IllegalArgumentException(
s"The value of ${parent.key} should be one of ${validValues.mkString(", ")}, but was $v")
}
v
}
}
/** Turns the config entry into a sequence of values of the underlying type. */
def toSequence: TypedConfigBuilder[Seq[T]] = {
new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
}
/** Creates a [[ConfigEntry]] that does not have a default value. */
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc,
parent._public, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
/** Creates a [[ConfigEntry]] that has a default value. */
def createWithDefault(default: T): ConfigEntry[T] = {
// Treat "String" as a special case, so that both createWithDefault and createWithDefaultString
// behave the same w.r.t. variable expansion of default values.
default match {
case str: String => createWithDefaultString(str)
case _ =>
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, transformedDefault, converter,
stringConverter, parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_ (entry))
entry
}
}
/** Creates a [[ConfigEntry]] with a function to determine the default value */
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter,
parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_ (entry))
entry
}
/**
* Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
* [[String]] and must be a valid value for the entry.
*/
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, default, converter, stringConverter,
parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
}
/**
* Basic builder for Spark configurations. Provides methods for creating type-specific builders.
*
* @see TypedConfigBuilder
*/
private[spark] case class ConfigBuilder(key: String) {
import ConfigHelpers._
private[config] var _prependedKey: Option[String] = None
private[config] var _prependSeparator: String = ""
private[config] var _public = true
private[config] var _doc = ""
private[config] var _version = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
private[config] var _alternatives = List.empty[String]
def internal(): ConfigBuilder = {
_public = false
this
}
def doc(s: String): ConfigBuilder = {
_doc = s
this
}
def version(v: String): ConfigBuilder = {
_version = v
this
}
/**
* Registers a callback for when the config entry is finally instantiated. Currently used by
* SQLConf to keep track of SQL configuration entries.
*/
def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = {
_onCreate = Option(callback)
this
}
def withPrepended(key: String, separator: String = " "): ConfigBuilder = {
_prependedKey = Option(key)
_prependSeparator = separator
this
}
def withAlternative(key: String): ConfigBuilder = {
_alternatives = _alternatives :+ key
this
}
def intConf: TypedConfigBuilder[Int] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
}
def longConf: TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
}
def doubleConf: TypedConfigBuilder[Double] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
}
def booleanConf: TypedConfigBuilder[Boolean] = {
checkPrependConfig
new TypedConfigBuilder(this, toBoolean(_, key))
}
def stringConf: TypedConfigBuilder[String] = {
new TypedConfigBuilder(this, v => v)
}
def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
}
def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit))
}
def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc,
_public, _version, fallback)
_onCreate.foreach(_(entry))
entry
}
def regexConf: TypedConfigBuilder[Regex] = {
checkPrependConfig
new TypedConfigBuilder(this, regexFromString(_, this.key), _.toString)
}
private def checkPrependConfig = {
if (_prependedKey.isDefined) {
throw new IllegalArgumentException(s"$key type must be string if prepend used")
}
}
}
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦