spark RpcEndpoint 源码
spark RpcEndpoint 代码
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.spark.rpc
import org.apache.spark.SparkException
* A factory class to create the [[RpcEnv]]. It must have an empty constructor so that it can be
* created using Reflection.
private[spark] trait RpcEnvFactory {
def create(config: RpcEnvConfig): RpcEnv
* An end point for the RPC that defines what functions to trigger given a message.
* It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
* The life-cycle of an endpoint is:
* {@code constructor -> onStart -> receive* -> onStop}
* Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use
* [[ThreadSafeRpcEndpoint]]
* If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
* invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.
private[spark] trait RpcEndpoint {
* The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
val rpcEnv: RpcEnv
* The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
* called. And `self` will become `null` when `onStop` is called.
* Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
* valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
* Process messages from `RpcEndpointRef.send` or `RpcCallContext.reply`. If receiving a
* unmatched message, `SparkException` will be thrown and sent to `onError`.
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
* Process messages from `RpcEndpointRef.ask`. If receiving a unmatched message,
* `SparkException` will be thrown and sent to `onError`.
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
* Invoked when any exception is thrown during handling messages.
def onError(cause: Throwable): Unit = {
// By default, throw e and let RpcEnv handle it
throw cause
* Invoked when `remoteAddress` is connected to the current node.
def onConnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
* Invoked when `remoteAddress` is lost.
def onDisconnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
* Invoked when some network error happens in the connection between the current node and
* `remoteAddress`.
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
* Invoked before [[RpcEndpoint]] starts to handle any message.
def onStart(): Unit = {
// By default, do nothing.
* Invoked when [[RpcEndpoint]] is stopping. `self` will be `null` in this method and you cannot
* use it to send or ask messages.
def onStop(): Unit = {
// By default, do nothing.
* A convenient method to stop [[RpcEndpoint]].
final def stop(): Unit = {
val _self = self
if (_self != null) {
* A trait that requires RpcEnv thread-safely sending messages to it.
* Thread-safety means processing of one message happens before processing of the next message by
* the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a
* [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the
* [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent.
* However, there is no guarantee that the same thread will be executing the same
* [[ThreadSafeRpcEndpoint]] for different messages.
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
* An endpoint that uses a dedicated thread pool for delivering messages.
private[spark] trait IsolatedRpcEndpoint extends RpcEndpoint {
* How many threads to use for delivering messages. By default, use a single thread.
* Note that requesting more than one thread means that the endpoint should be able to handle
* messages arriving from many threads at once, and all the things that entails (including
* messages being delivered to the endpoint out of order).
def threadCount(): Int = 1
spark RpcEndpointNotFoundException 源码
- 所属分类: 前端技术
- 本文标签:
2、 - 优质文章
8、 golang
9、 openharmony
10、 Vue中input框自动聚焦