spark ConfigEntry 源码
spark ConfigEntry 代码
文件路径:/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.config
// ====================================================================================
// The guideline for naming configurations
// ====================================================================================
/*
In general, the config name should be a noun that describes its basic purpose. It's
recommended to add prefix to the config name to make the scope clearer. For example,
`spark.scheduler.mode` clearly indicates that this config is for the scheduler.
A config name can have multiple prefixes that are structured, which is similar to a
qualified Java class name. Each prefix behaves like a namespace. We should only create
a namespace if it's meaningful and can be shared by multiple configs. For example,
`buffer.inMemoryThreshold` is preferred over `buffer.in.memory.threshold`.
The followings are best practices of naming configs for some common cases:
1. When adding configs for a big feature, it's better to create an umbrella config that
can turn the feature on/off, with a name like `featureName.enabled`. The other configs
of this feature should be put under the `featureName` namespace. For example:
- spark.sql.cbo.enabled
- spark.sql.cbo.starSchemaDetection
- spark.sql.cbo.joinReorder.enabled
- spark.sql.cbo.joinReorder.dp.threshold
2. When adding a boolean config, the name should be a verb that describes what
happens if this config is set to true, e.g. `spark.shuffle.sort.useRadixSort`.
3. When adding a config to specify a time duration, it's better to put the time unit
in the config name. For example, `featureName.timeoutMs`, which clearly indicates
that the time unit is millisecond. The config entry should be created with
`ConfigBuilder#timeConf`, to support time strings like `2 minutes`.
*/
/**
* An entry contains all meta information for a configuration.
*
* When applying variable substitution to config values, only references starting with "spark." are
* considered in the default namespace. For known Spark configuration keys (i.e. those created using
* `ConfigBuilder`), references will also consider the default value when it exists.
*
* Variable expansion is also applied to the default values of config entries that have a default
* value declared as a string.
*
* @param key the key for the configuration
* @param prependedKey the key for the configuration which will be prepended
* @param prependSeparator the separator which is used for prepending
* @param valueConverter how to convert a string to the value. It should throw an exception if the
* string does not have the required format.
* @param stringConverter how to convert a value to a string that the user can use it as a valid
* string value. It's usually `toString`. But sometimes, a custom converter
* is necessary. E.g., if T is List[String], `a, b, c` is better than
* `List(a, b, c)`.
* @param doc the documentation for the configuration
* @param isPublic if this configuration is public to the user. If it's `false`, this
* configuration is only used internally and we should not expose it to users.
* @param version the spark version when the configuration was released.
* @tparam T the value type
*/
private[spark] abstract class ConfigEntry[T] (
val key: String,
val prependedKey: Option[String],
val prependSeparator: String,
val alternatives: List[String],
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
val isPublic: Boolean,
val version: String) {
import ConfigEntry._
registerEntry(this)
def defaultValueString: String
protected def readString(reader: ConfigReader): Option[String] = {
val values = Seq(
prependedKey.flatMap(reader.get(_)),
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
).flatten
if (values.nonEmpty) {
Some(values.mkString(prependSeparator))
} else {
None
}
}
def readFrom(reader: ConfigReader): T
def defaultValue: Option[T] = None
override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
s"public=$isPublic, version=$version)"
}
}
private class ConfigEntryWithDefault[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultValue: T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean,
version: String)
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic,
version
) {
override def defaultValue: Option[T] = Some(_defaultValue)
override def defaultValueString: String = stringConverter(_defaultValue)
def readFrom(reader: ConfigReader): T = {
readString(reader).map(valueConverter).getOrElse(_defaultValue)
}
}
private class ConfigEntryWithDefaultFunction[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean,
version: String)
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic,
version
) {
override def defaultValue: Option[T] = Some(_defaultFunction())
override def defaultValueString: String = stringConverter(_defaultFunction())
def readFrom(reader: ConfigReader): T = {
readString(reader).map(valueConverter).getOrElse(_defaultFunction())
}
}
private class ConfigEntryWithDefaultString[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultValue: String,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean,
version: String)
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic,
version
) {
override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
override def defaultValueString: String = _defaultValue
def readFrom(reader: ConfigReader): T = {
val value = readString(reader).getOrElse(reader.substitute(_defaultValue))
valueConverter(value)
}
}
/**
* A config entry that does not have a default value.
*/
private[spark] class OptionalConfigEntry[T](
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean,
version: String)
extends ConfigEntry[Option[T]](
key,
prependedKey,
prependSeparator,
alternatives,
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull,
doc,
isPublic,
version
) {
override def defaultValueString: String = ConfigEntry.UNDEFINED
override def readFrom(reader: ConfigReader): Option[T] = {
readString(reader).map(rawValueConverter)
}
}
/**
* A config entry whose default value is defined by another config entry.
*/
private[spark] class FallbackConfigEntry[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
doc: String,
isPublic: Boolean,
version: String,
val fallback: ConfigEntry[T])
extends ConfigEntry[T](
key,
prependedKey,
prependSeparator,
alternatives,
fallback.valueConverter,
fallback.stringConverter,
doc,
isPublic,
version
) {
override def defaultValueString: String = s"<value of ${fallback.key}>"
override def readFrom(reader: ConfigReader): T = {
readString(reader).map(valueConverter).getOrElse(fallback.readFrom(reader))
}
}
private[spark] object ConfigEntry {
val UNDEFINED = "<undefined>"
private[spark] val knownConfigs =
new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
def registerEntry(entry: ConfigEntry[_]): Unit = {
val existing = knownConfigs.putIfAbsent(entry.key, entry)
require(existing == null, s"Config entry ${entry.key} already registered!")
}
def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
}
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦