spark SubtractedRDD 源码
spark SubtractedRDD 代码
文件路径:/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.rdd
import java.util.{HashMap => JHashMap}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.spark.Dependency
import org.apache.spark.OneToOneDependency
import org.apache.spark.Partition
import org.apache.spark.Partitioner
import org.apache.spark.ShuffleDependency
import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
/**
* An optimized version of cogroup for set difference/subtraction.
*
* It is possible to implement this operation with just `cogroup`, but
* that is less efficient because all of the entries from `rdd2`, for
* both matching and non-matching values in `rdd1`, are kept in the
* JHashMap until the end.
*
* With this implementation, only the entries from `rdd1` are kept in-memory,
* and the entries from `rdd2` are essentially streamed, as we only need to
* touch each once to decide if the value needs to be removed.
*
* This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
* you can use `rdd1`'s partitioner/partition size and not worry about running
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
@transient var rdd1: RDD[_ <: Product2[K, V]],
@transient var rdd2: RDD[_ <: Product2[K, W]],
part: Partitioner)
extends RDD[(K, V)](rdd1.context, Nil) {
override def getDependencies: Seq[Dependency[_]] = {
def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
: Dependency[_] = {
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[T1, T2, Any](rdd, part)
}
}
Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
}
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- array.indices) {
// Each CoGroupPartition will depend on rdd1 and rdd2
array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match {
case s: ShuffleDependency[_, _, _] =>
None
case _ =>
Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
}
}.toArray)
}
array
}
override val partitioner = Some(part)
override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
val partition = p.asInstanceOf[CoGroupPartition]
val map = new JHashMap[K, ArrayBuffer[V]]
def getSeq(k: K): ArrayBuffer[V] = {
val seq = map.get(k)
if (seq != null) {
seq
} else {
val seq = new ArrayBuffer[V]()
map.put(k, seq)
seq
}
}
def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
dependencies(depNum) match {
case oneToOneDependency: OneToOneDependency[_] =>
val dependencyPartition = partition.narrowDeps(depNum).get.split
oneToOneDependency.rdd.iterator(dependencyPartition, context)
.asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
case shuffleDependency: ShuffleDependency[_, _, _] =>
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
val iter = SparkEnv.get.shuffleManager
.getReader(
shuffleDependency.shuffleHandle,
partition.index,
partition.index + 1,
context,
metrics)
.read()
iter.foreach(op)
}
}
// the first dep is rdd1; add all values to the map
integrate(0, t => getSeq(t._1) += t._2)
// the second dep is rdd2; remove all of its keys
integrate(1, t => map.remove(t._1))
map.asScala.iterator.flatMap(t => t._2.iterator.map((t._1, _)))
}
override def clearDependencies(): Unit = {
super.clearDependencies()
rdd1 = null
rdd2 = null
}
}
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦