spark KinesisInputDStream 源码
spark KinesisInputDStream 代码
文件路径:/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kinesis
import scala.reflect.ClassTag
import collection.JavaConverters._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
private[kinesis] class KinesisInputDStream[T: ClassTag](
_ssc: StreamingContext,
val streamName: String,
val endpointUrl: String,
val regionName: String,
val initialPosition: KinesisInitialPosition,
val checkpointAppName: String,
val checkpointInterval: Duration,
val _storageLevel: StorageLevel,
val messageHandler: Record => T,
val kinesisCreds: SparkAWSCredentials,
val dynamoDBCreds: Option[SparkAWSCredentials],
val cloudWatchCreds: Option[SparkAWSCredentials],
val metricsLevel: MetricsLevel,
val metricsEnabledDimensions: Set[String]
) extends ReceiverInputDStream[T](_ssc) {
private[streaming]
override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
// This returns true even for when blockInfos is empty
val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
if (allBlocksHaveRanges) {
// Create a KinesisBackedBlockRDD, even when there are no blocks
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
val seqNumRanges = blockInfos.map {
_.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
new KinesisBackedBlockRDD(
context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
isBlockIdValid = isBlockIdValid,
messageHandler = messageHandler,
kinesisCreds = kinesisCreds,
kinesisReadConfigs = KinesisReadConfigurations(ssc))
} else {
logWarning("Kinesis sequence number information was not present with some block metadata," +
" it may not be possible to recover from failures")
super.createBlockRDD(time, blockInfos)
}
}
override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition,
checkpointAppName, checkpointInterval, _storageLevel, messageHandler,
kinesisCreds, dynamoDBCreds, cloudWatchCreds,
metricsLevel, metricsEnabledDimensions)
}
}
object KinesisInputDStream {
/**
* Builder for [[KinesisInputDStream]] instances.
*
* @since 2.2.0
*/
class Builder {
// Required params
private var streamingContext: Option[StreamingContext] = None
private var streamName: Option[String] = None
private var checkpointAppName: Option[String] = None
// Params with defaults
private var endpointUrl: Option[String] = None
private var regionName: Option[String] = None
private var initialPosition: Option[KinesisInitialPosition] = None
private var checkpointInterval: Option[Duration] = None
private var storageLevel: Option[StorageLevel] = None
private var kinesisCredsProvider: Option[SparkAWSCredentials] = None
private var dynamoDBCredsProvider: Option[SparkAWSCredentials] = None
private var cloudWatchCredsProvider: Option[SparkAWSCredentials] = None
private var metricsLevel: Option[MetricsLevel] = None
private var metricsEnabledDimensions: Option[Set[String]] = None
/**
* Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a
* required parameter.
*
* @param ssc [[StreamingContext]] used to construct Kinesis DStreams
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def streamingContext(ssc: StreamingContext): Builder = {
streamingContext = Option(ssc)
this
}
/**
* Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a
* required parameter.
*
* @param jssc [[JavaStreamingContext]] used to construct Kinesis DStreams
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def streamingContext(jssc: JavaStreamingContext): Builder = {
streamingContext = Option(jssc.ssc)
this
}
/**
* Sets the name of the Kinesis stream that the DStream will read from. This is a required
* parameter.
*
* @param streamName Name of Kinesis stream that the DStream will read from
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def streamName(streamName: String): Builder = {
this.streamName = Option(streamName)
this
}
/**
* Sets the KCL application name to use when checkpointing state to DynamoDB. This is a
* required parameter.
*
* @param appName Value to use for the KCL app name (used when creating the DynamoDB checkpoint
* table and when writing metrics to CloudWatch)
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def checkpointAppName(appName: String): Builder = {
checkpointAppName = Option(appName)
this
}
/**
* Sets the AWS Kinesis endpoint URL. Defaults to "https://kinesis.us-east-1.amazonaws.com" if
* no custom value is specified
*
* @param url Kinesis endpoint URL to use
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def endpointUrl(url: String): Builder = {
endpointUrl = Option(url)
this
}
/**
* Sets the AWS region to construct clients for. Defaults to "us-east-1" if no custom value
* is specified.
*
* @param regionName Name of AWS region to use (e.g. "us-west-2")
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def regionName(regionName: String): Builder = {
this.regionName = Option(regionName)
this
}
/**
* Sets the initial position data is read from in the Kinesis stream. Defaults to
* [[KinesisInitialPositions.Latest]] if no custom value is specified.
*
* @param initialPosition [[KinesisInitialPosition]] value specifying where Spark Streaming
* will start reading records in the Kinesis stream from
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def initialPosition(initialPosition: KinesisInitialPosition): Builder = {
this.initialPosition = Option(initialPosition)
this
}
/**
* Sets the initial position data is read from in the Kinesis stream. Defaults to
* [[InitialPositionInStream.LATEST]] if no custom value is specified.
* This function would be removed when we deprecate the KinesisUtils.
*
* @param initialPosition InitialPositionInStream value specifying where Spark Streaming
* will start reading records in the Kinesis stream from
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
@deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")
def initialPositionInStream(initialPosition: InitialPositionInStream): Builder = {
this.initialPosition = Option(
KinesisInitialPositions.fromKinesisInitialPosition(initialPosition))
this
}
/**
* Sets how often the KCL application state is checkpointed to DynamoDB. Defaults to the Spark
* Streaming batch interval if no custom value is specified.
*
* @param interval [[Duration]] specifying how often the KCL state should be checkpointed to
* DynamoDB.
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def checkpointInterval(interval: Duration): Builder = {
checkpointInterval = Option(interval)
this
}
/**
* Sets the storage level of the blocks for the DStream created. Defaults to
* [[StorageLevel.MEMORY_AND_DISK_2]] if no custom value is specified.
*
* @param storageLevel [[StorageLevel]] to use for the DStream data blocks
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def storageLevel(storageLevel: StorageLevel): Builder = {
this.storageLevel = Option(storageLevel)
this
}
/**
* Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS Kinesis
* endpoint. Defaults to [[DefaultCredentialsProvider]] if no custom value is specified.
*
* @param credentials [[SparkAWSCredentials]] to use for Kinesis authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def kinesisCredentials(credentials: SparkAWSCredentials): Builder = {
kinesisCredsProvider = Option(credentials)
this
}
/**
* Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS DynamoDB
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for DynamoDB authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder = {
dynamoDBCredsProvider = Option(credentials)
this
}
/**
* Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS CloudWatch
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for CloudWatch authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder = {
cloudWatchCredsProvider = Option(credentials)
this
}
/**
* Sets the CloudWatch metrics level. Defaults to
* [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom value is specified.
*
* @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics level
* @return Reference to this [[KinesisInputDStream.Builder]]
* @see
* [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]]
*/
def metricsLevel(metricsLevel: MetricsLevel): Builder = {
this.metricsLevel = Option(metricsLevel)
this
}
/**
* Sets the enabled CloudWatch metrics dimensions. Defaults to
* [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]]
* if no custom value is specified.
*
* @param metricsEnabledDimensions Set[String] to specify which CloudWatch metrics dimensions
* should be enabled
* @return Reference to this [[KinesisInputDStream.Builder]]
* @see
* [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]]
*/
def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = {
this.metricsEnabledDimensions = Option(metricsEnabledDimensions)
this
}
/**
* Create a new instance of [[KinesisInputDStream]] with configured parameters and the provided
* message handler.
*
* @param handler Function converting [[Record]] instances read by the KCL to DStream type [[T]]
* @return Instance of [[KinesisInputDStream]] constructed with configured parameters
*/
def buildWithMessageHandler[T: ClassTag](
handler: Record => T): KinesisInputDStream[T] = {
val ssc = getRequiredParam(streamingContext, "streamingContext")
new KinesisInputDStream(
ssc,
getRequiredParam(streamName, "streamName"),
endpointUrl.getOrElse(DEFAULT_KINESIS_ENDPOINT_URL),
regionName.getOrElse(DEFAULT_KINESIS_REGION_NAME),
initialPosition.getOrElse(DEFAULT_INITIAL_POSITION),
getRequiredParam(checkpointAppName, "checkpointAppName"),
checkpointInterval.getOrElse(ssc.graph.batchDuration),
storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
ssc.sc.clean(handler),
kinesisCredsProvider.getOrElse(DefaultCredentials),
dynamoDBCredsProvider,
cloudWatchCredsProvider,
metricsLevel.getOrElse(DEFAULT_METRICS_LEVEL),
metricsEnabledDimensions.getOrElse(DEFAULT_METRICS_ENABLED_DIMENSIONS))
}
/**
* Create a new instance of [[KinesisInputDStream]] with configured parameters and using the
* default message handler, which returns [[Array[Byte]]].
*
* @return Instance of [[KinesisInputDStream]] constructed with configured parameters
*/
def build(): KinesisInputDStream[Array[Byte]] = buildWithMessageHandler(defaultMessageHandler)
private def getRequiredParam[T](param: Option[T], paramName: String): T = param.getOrElse {
throw new IllegalArgumentException(s"No value provided for required parameter $paramName")
}
}
/**
* Creates a [[KinesisInputDStream.Builder]] for constructing [[KinesisInputDStream]] instances.
*
* @since 2.2.0
* @return [[KinesisInputDStream.Builder]] instance
*/
def builder: Builder = new Builder
private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = {
if (record == null) return null
val byteBuffer = record.getData()
val byteArray = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(byteArray)
byteArray
}
private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String =
"https://kinesis.us-east-1.amazonaws.com"
private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
private[kinesis] val DEFAULT_METRICS_LEVEL: MetricsLevel =
KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL
private[kinesis] val DEFAULT_METRICS_ENABLED_DIMENSIONS: Set[String] =
KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
}
相关信息
相关文章
spark KinesisBackedBlockRDD 源码
spark KinesisReadConfigurations 源码
spark KinesisRecordProcessor 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦