spark KinesisUtilsPythonHelper 源码

  • 2022-10-20
  • 浏览 (318)

spark KinesisUtilsPythonHelper 代码

文件路径:/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.streaming.kinesis

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}

/**
 * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and
 * function so that it can be easily instantiated and called from Python's KinesisUtils.
 */
private class KinesisUtilsPythonHelper {

  // scalastyle:off
  def createStream(
      jssc: JavaStreamingContext,
      kinesisAppName: String,
      streamName: String,
      endpointUrl: String,
      regionName: String,
      initialPositionInStream: Int,
      checkpointInterval: Duration,
      metricsLevel: Int,
      storageLevel: StorageLevel,
      awsAccessKeyId: String,
      awsSecretKey: String,
      stsAssumeRoleArn: String,
      stsSessionName: String,
      stsExternalId: String): JavaReceiverInputDStream[Array[Byte]] = {
    // scalastyle:on
    if (!(stsAssumeRoleArn != null && stsSessionName != null && stsExternalId != null)
        && !(stsAssumeRoleArn == null && stsSessionName == null && stsExternalId == null)) {
      throw new IllegalArgumentException("stsAssumeRoleArn, stsSessionName, and stsExternalId " +
        "must all be defined or all be null")
    }
    if (awsAccessKeyId == null && awsSecretKey != null) {
      throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null")
    }
    if (awsAccessKeyId != null && awsSecretKey == null) {
      throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null")
    }

    val kinesisInitialPosition = initialPositionInStream match {
      case 0 => InitialPositionInStream.LATEST
      case 1 => InitialPositionInStream.TRIM_HORIZON
      case _ => throw new IllegalArgumentException(
        "Illegal InitialPositionInStream. Please use " +
          "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON")
    }

    val cloudWatchMetricsLevel = metricsLevel match {
      case 0 => MetricsLevel.DETAILED
      case 1 => MetricsLevel.SUMMARY
      case 2 => MetricsLevel.NONE
      case _ => MetricsLevel.DETAILED
    }

    val builder = KinesisInputDStream.builder.
      streamingContext(jssc).
      checkpointAppName(kinesisAppName).
      streamName(streamName).
      endpointUrl(endpointUrl).
      regionName(regionName).
      initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(kinesisInitialPosition)).
      checkpointInterval(checkpointInterval).
      metricsLevel(cloudWatchMetricsLevel).
      storageLevel(storageLevel)

    if (stsAssumeRoleArn != null && stsSessionName != null && stsExternalId != null) {
      val kinesisCredsProvider = STSCredentials(
        stsAssumeRoleArn, stsSessionName, Option(stsExternalId),
        BasicCredentials(awsAccessKeyId, awsSecretKey))
      builder.
        kinesisCredentials(kinesisCredsProvider).
        buildWithMessageHandler(KinesisInputDStream.defaultMessageHandler)
    } else {
      if (awsAccessKeyId == null && awsSecretKey == null) {
        builder.build()
      } else {
        builder.kinesisCredentials(BasicCredentials(awsAccessKeyId, awsSecretKey)).build()
      }
    }
  }

}

相关信息

spark 源码目录

相关文章

spark KinesisBackedBlockRDD 源码

spark KinesisCheckpointer 源码

spark KinesisInputDStream 源码

spark KinesisReadConfigurations 源码

spark KinesisReceiver 源码

spark KinesisRecordProcessor 源码

spark KinesisTestUtils 源码

spark SparkAWSCredentials 源码

0  赞