spark Exchange 源码
spark Exchange 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.exchange
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.vectorized.ColumnarBatch
/**
* Base class for operators that exchange data among multiple threads or processes.
*
* Exchanges are the key class of operators that enable parallelism. Although the implementation
* differs significantly, the concept is similar to the exchange operator described in
* "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe.
*/
abstract class Exchange extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
final override val nodePatterns: Seq[TreePattern] = Seq(EXCHANGE)
override def stringArgs: Iterator[Any] = super.stringArgs ++ Iterator(s"[plan_id=$id]")
}
/**
* A wrapper for reused exchange to have different output, because two exchanges which produce
* logically identical output will have distinct sets of output attribute ids, so we need to
* preserve the original ids because they're what downstream operators are expecting.
*/
case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchange)
extends LeafExecNode {
override def supportsColumnar: Boolean = child.supportsColumnar
// Ignore this wrapper for canonicalizing.
override def doCanonicalize(): SparkPlan = child.canonicalized
def doExecute(): RDD[InternalRow] = {
child.execute()
}
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar()
}
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
child.executeBroadcast()
}
// `ReusedExchangeExec` can have distinct set of output attribute ids from its child, we need
// to update the attribute ids in `outputPartitioning` and `outputOrdering`.
private[sql] lazy val updateAttr: Expression => Expression = {
val originalAttrToNewAttr = AttributeMap(child.output.zip(output))
e => e.transform {
case attr: Attribute => originalAttrToNewAttr.getOrElse(attr, attr)
}
}
override def outputPartitioning: Partitioning = child.outputPartitioning match {
case e: Expression => updateAttr(e).asInstanceOf[Partitioning]
case other => other
}
override def outputOrdering: Seq[SortOrder] = {
child.outputOrdering.map(updateAttr(_).asInstanceOf[SortOrder])
}
override def verboseStringWithOperatorId(): String = {
val reuse_op_str = ExplainUtils.getOpId(child)
s"""
|$formattedNodeName [Reuses operator id: $reuse_op_str]
|${ExplainUtils.generateFieldString("Output", output)}
|""".stripMargin
}
}
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦