spark KinesisWordCountASL 源码
spark KinesisWordCountASL 代码
文件路径:/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.streaming
import java.nio.ByteBuffer
import scala.util.Random
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
*
* This example spins up 1 Kinesis Receiver per shard for the given stream.
* It then starts pulling from the last checkpointed sequence number of the given stream.
*
* Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
* <app-name> is the name of the consumer app, used to track the read data in DynamoDB
* <stream-name> name of the Kinesis stream (i.e. mySparkStream)
* <endpoint-url> endpoint of the Kinesis service
* (e.g. https://kinesis.us-east-1.amazonaws.com)
*
*
* Example:
* # export AWS keys if necessary
* $ export AWS_ACCESS_KEY_ID=<your-access-key>
* $ export AWS_SECRET_ACCESS_KEY=<your-secret-key>
*
* # run the example
* $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \
* https://kinesis.us-east-1.amazonaws.com
*
* There is a companion helper class called KinesisWordProducerASL which puts dummy data
* onto the Kinesis stream.
*
* This code uses the DefaultAWSCredentialsProviderChain to find credentials
* in the following order:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
* Instance profile credentials - delivered through the Amazon EC2 metadata service
* For more information, see
* http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
*
* See https://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
* the Kinesis Spark Streaming integration.
*/
object KinesisWordCountASL extends Logging {
def main(args: Array[String]): Unit = {
// Check that all required args were passed in.
if (args.length != 3) {
System.err.println(
"""
|Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url>
|
| <app-name> is the name of the consumer app, used to track the read data in DynamoDB
| <stream-name> is the name of the Kinesis stream
| <endpoint-url> is the endpoint of the Kinesis service
| (e.g. https://kinesis.us-east-1.amazonaws.com)
|
|Generate input data for Kinesis stream using the example KinesisWordProducerASL.
|See https://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more
|details.
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Populate the appropriate variables from the given args
val Array(appName, streamName, endpointUrl) = args
// Determine the number of shards from the stream using the low-level Kinesis Client
// from the AWS Java SDK.
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
require(credentials != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
// This is not a necessity; if there are less receivers/DStreams than the number of shards,
// then the shards will be automatically distributed among the receivers and each receiver
// will receive data from multiple shards.
val numStreams = numShards
// Spark Streaming batch interval
val batchInterval = Milliseconds(2000)
// Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
// on sequence number of records that have been received. Same as batchInterval for this
// example.
val kinesisCheckpointInterval = batchInterval
// Get the region name from the endpoint URL to save Kinesis Client Library metadata in
// DynamoDB of the same region as the Kinesis stream
val regionName = KinesisExampleUtils.getRegionNameByEndpoint(endpointUrl)
// Setup the SparkConfig and StreamingContext
val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL")
val ssc = new StreamingContext(sparkConfig, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i =>
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPosition(new Latest())
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
}
// Union all the streams
val unionStreams = ssc.union(kinesisStreams)
// Convert each line of Array[Byte] to String, and split into words
val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
// Map each word to a (word, 1) tuple so we can reduce by key to count the words
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Print the first 10 wordCounts
wordCounts.print()
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
/**
* Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
* <records-per-sec> <words-per-record>
*
* <stream-name> is the name of the Kinesis stream (i.e. mySparkStream)
* <endpoint-url> is the endpoint of the Kinesis service
* (i.e. https://kinesis.us-east-1.amazonaws.com)
* <records-per-sec> is the rate of records per second to put onto the stream
* <words-per-record> is the number of words per record
*
* Example:
* $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \
* https://kinesis.us-east-1.amazonaws.com 10 5
*/
object KinesisWordProducerASL {
def main(args: Array[String]): Unit = {
if (args.length != 4) {
System.err.println(
"""
|Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec>
| <words-per-record>
|
| <stream-name> is the name of the Kinesis stream
| <endpoint-url> is the endpoint of the Kinesis service
| (e.g. https://kinesis.us-east-1.amazonaws.com)
| <records-per-sec> is the rate of records per second to put onto the stream
| <words-per-record> is the number of words per record
|
""".stripMargin)
System.exit(1)
}
// Set default log4j logging level to WARN to hide Spark logs
StreamingExamples.setStreamingLogLevels()
// Populate the appropriate variables from the given args
val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
// Generate the records and return the totals
val totals = generate(stream, endpoint, recordsPerSecond.toInt,
wordsPerRecord.toInt)
// Print the array of (word, total) tuples
println("Totals for the words sent")
totals.foreach(println(_))
}
def generate(stream: String,
endpoint: String,
recordsPerSecond: Int,
wordsPerRecord: Int): Seq[(String, Int)] = {
val randomWords = List("spark", "you", "are", "my", "father")
val totals = scala.collection.mutable.Map[String, Int]()
// Create the low-level Kinesis Client from the AWS Java SDK.
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint)
println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per record")
// Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
for (i <- 1 to 10) {
// Generate recordsPerSec records to put onto the stream
val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
// Randomly generate wordsPerRecord number of words
val data = (1 to wordsPerRecord.toInt).map(x => {
// Get a random index to a word
val randomWordIdx = Random.nextInt(randomWords.size)
val randomWord = randomWords(randomWordIdx)
// Increment total count to compare to server counts later
totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
randomWord
}).mkString(" ")
// Create a partitionKey based on recordNum
val partitionKey = s"partitionKey-$recordNum"
// Create a PutRecordRequest with an Array[Byte] version of the data
val putRecordRequest = new PutRecordRequest().withStreamName(stream)
.withPartitionKey(partitionKey)
.withData(ByteBuffer.wrap(data.getBytes()))
// Put the record onto the stream and capture the PutRecordResult
val putRecordResult = kinesisClient.putRecord(putRecordRequest)
}
// Sleep for a second
Thread.sleep(1000)
println("Sent " + recordsPerSecond + " records")
}
// Convert the totals to (index, total) tuple
totals.toSeq.sortBy(_._1)
}
}
/**
* Utility functions for Spark Streaming examples.
* This has been lifted from the examples/ project to remove the circular dependency.
*/
private[streaming] object StreamingExamples extends Logging {
// Set reasonable logging levels for streaming if the user has not configured log4j.
def setStreamingLogLevels(): Unit = {
if (Logging.islog4j2DefaultConfigured()) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Configurator.setRootLevel(Level.WARN)
}
}
}
// scalastyle:on println
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦