spark JsonProtocol 源码
spark JsonProtocol 代码
文件路径:/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import org.json4s.JsonAST._
import org.json4s.JsonDSL._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master._
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.resource.{ResourceInformation, ResourceRequirement}
private[deploy] object JsonProtocol {
private def writeResourcesInfo(info: Map[String, ResourceInformation]): JObject = {
val jsonFields = info.map {
case (k, v) => JField(k, v.toJson)
}
JObject(jsonFields.toList)
}
private def writeResourceRequirement(req: ResourceRequirement): JObject = {
("name" -> req.resourceName) ~
("amount" -> req.amount)
}
/**
* Export the [[WorkerInfo]] to a Json object. A [[WorkerInfo]] consists of the information of a
* worker.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the worker
* `host` the host that the worker is running on
* `port` the port that the worker is bound to
* `webuiaddress` the address used in web UI
* `cores` total cores of the worker
* `coresused` allocated cores of the worker
* `coresfree` free cores of the worker
* `memory` total memory of the worker
* `memoryused` allocated memory of the worker
* `memoryfree` free memory of the worker
* `resources` total resources of the worker
* `resourcesused` allocated resources of the worker
* `resourcesfree` free resources of the worker
* `state` state of the worker, see [[WorkerState]]
* `lastheartbeat` time in milliseconds that the latest heart beat message from the
* worker is received
*/
def writeWorkerInfo(obj: WorkerInfo): JObject = {
("id" -> obj.id) ~
("host" -> obj.host) ~
("port" -> obj.port) ~
("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("coresfree" -> obj.coresFree) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("memoryfree" -> obj.memoryFree) ~
("resources" -> writeResourcesInfo(obj.resourcesInfo)) ~
("resourcesused" -> writeResourcesInfo(obj.resourcesInfoUsed)) ~
("resourcesfree" -> writeResourcesInfo(obj.resourcesInfoFree)) ~
("state" -> obj.state.toString) ~
("lastheartbeat" -> obj.lastHeartbeat)
}
/**
* Export the [[ApplicationInfo]] to a Json object. An [[ApplicationInfo]] consists of the
* information of an application.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the application
* `starttime` time in milliseconds that the application starts
* `name` the description of the application
* `cores` total cores granted to the application
* `user` name of the user who submitted the application
* `memoryperexecutor` minimal memory in MB required to each executor
* `resourcesperexecutor` minimal resources required to each executor
* `submitdate` time in Date that the application is submitted
* `state` state of the application, see [[ApplicationState]]
* `duration` time in milliseconds that the application has been running
* For compatibility also returns the deprecated `memoryperslave` & `resourcesperslave` fields.
*/
def writeApplicationInfo(obj: ApplicationInfo): JObject = {
("id" -> obj.id) ~
("starttime" -> obj.startTime) ~
("name" -> obj.desc.name) ~
("cores" -> obj.coresGranted) ~
("user" -> obj.desc.user) ~
("memoryperexecutor" -> obj.desc.memoryPerExecutorMB) ~
("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
("resourcesperexecutor" -> obj.desc.resourceReqsPerExecutor
.toList.map(writeResourceRequirement)) ~
("resourcesperslave" -> obj.desc.resourceReqsPerExecutor
.toList.map(writeResourceRequirement)) ~
("submitdate" -> obj.submitDate.toString) ~
("state" -> obj.state.toString) ~
("duration" -> obj.duration)
}
/**
* Export the [[ApplicationDescription]] to a Json object. An [[ApplicationDescription]] consists
* of the description of an application.
*
* @return a Json object containing the following fields:
* `name` the description of the application
* `cores` max cores that can be allocated to the application, 0 means unlimited
* `memoryperexecutor` minimal memory in MB required to each executor
* `resourcesperexecutor` minimal resources required to each executor
* `user` name of the user who submitted the application
* `command` the command string used to submit the application
* For compatibility also returns the deprecated `memoryperslave` & `resourcesperslave` fields.
*/
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
("name" -> obj.name) ~
("cores" -> obj.maxCores.getOrElse(0)) ~
("memoryperexecutor" -> obj.memoryPerExecutorMB) ~
("resourcesperexecutor" -> obj.resourceReqsPerExecutor.toList.map(writeResourceRequirement)) ~
("memoryperslave" -> obj.memoryPerExecutorMB) ~
("resourcesperslave" -> obj.resourceReqsPerExecutor.toList.map(writeResourceRequirement)) ~
("user" -> obj.user) ~
("command" -> obj.command.toString)
}
/**
* Export the [[ExecutorRunner]] to a Json object. An [[ExecutorRunner]] consists of the
* information of an executor.
*
* @return a Json object containing the following fields:
* `id` an integer identifier of the executor
* `memory` memory in MB allocated to the executor
* `resources` resources allocated to the executor
* `appid` a string identifier of the application that the executor is working on
* `appdesc` a Json object of the [[ApplicationDescription]] of the application that the
* executor is working on
*/
def writeExecutorRunner(obj: ExecutorRunner): JObject = {
("id" -> obj.execId) ~
("memory" -> obj.memory) ~
("resources" -> writeResourcesInfo(obj.resources)) ~
("appid" -> obj.appId) ~
("appdesc" -> writeApplicationDescription(obj.appDesc))
}
/**
* Export the [[DriverInfo]] to a Json object. A [[DriverInfo]] consists of the information of a
* driver.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the driver
* `starttime` time in milliseconds that the driver starts
* `state` state of the driver, see [[DriverState]]
* `cores` cores allocated to the driver
* `memory` memory in MB allocated to the driver
* `resources` resources allocated to the driver
* `submitdate` time in Date that the driver is created
* `worker` identifier of the worker that the driver is running on
* `mainclass` main class of the command string that started the driver
*/
def writeDriverInfo(obj: DriverInfo): JObject = {
("id" -> obj.id) ~
("starttime" -> obj.startTime.toString) ~
("state" -> obj.state.toString) ~
("cores" -> obj.desc.cores) ~
("memory" -> obj.desc.mem) ~
("resources" -> writeResourcesInfo(obj.resources)) ~
("submitdate" -> obj.submitDate.toString) ~
("worker" -> obj.worker.map(_.id).getOrElse("None")) ~
("mainclass" -> obj.desc.command.arguments(2))
}
/**
* Export the [[MasterStateResponse]] to a Json object. A [[MasterStateResponse]] consists the
* information of a master node.
*
* @return a Json object containing the following fields:
* `url` the url of the master node
* `workers` a list of Json objects of [[WorkerInfo]] of the workers allocated to the
* master
* `aliveworkers` size of alive workers allocated to the master
* `cores` total cores available of the master
* `coresused` cores used by the master
* `memory` total memory available of the master
* `memoryused` memory used by the master
* `resources` total resources available of the master
* `resourcesused` resources used by the master
* `activeapps` a list of Json objects of [[ApplicationInfo]] of the active applications
* running on the master
* `completedapps` a list of Json objects of [[ApplicationInfo]] of the applications
* completed in the master
* `activedrivers` a list of Json objects of [[DriverInfo]] of the active drivers of the
* master
* `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers
* of the master
* `status` status of the master,
* see [[org.apache.spark.deploy.master.RecoveryState.MasterState]]
*/
def writeMasterState(obj: MasterStateResponse): JObject = {
val aliveWorkers = obj.workers.filter(_.isAlive())
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("aliveworkers" -> aliveWorkers.length) ~
("cores" -> aliveWorkers.map(_.cores).sum) ~
("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~
("memory" -> aliveWorkers.map(_.memory).sum) ~
("memoryused" -> aliveWorkers.map(_.memoryUsed).sum) ~
("resources" -> aliveWorkers.map(_.resourcesInfo).toList.map(writeResourcesInfo)) ~
("resourcesused" -> aliveWorkers.map(_.resourcesInfoUsed).toList.map(writeResourcesInfo)) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~
("status" -> obj.status.toString)
}
/**
* Export the [[WorkerStateResponse]] to a Json object. A [[WorkerStateResponse]] consists the
* information of a worker node.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the worker node
* `masterurl` url of the master node of the worker
* `masterwebuiurl` the address used in web UI of the master node of the worker
* `cores` total cores of the worker
* `coreused` used cores of the worker
* `memory` total memory of the worker
* `memoryused` used memory of the worker
* `resources` total resources of the worker
* `resourcesused` used resources of the worker
* `executors` a list of Json objects of [[ExecutorRunner]] of the executors running on
* the worker
* `finishedexecutors` a list of Json objects of [[ExecutorRunner]] of the finished
* executors of the worker
*/
def writeWorkerState(obj: WorkerStateResponse): JObject = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
("masterwebuiurl" -> obj.masterWebUiUrl) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("resources" -> writeResourcesInfo(obj.resources)) ~
("resourcesused" -> writeResourcesInfo(obj.resourcesUsed)) ~
("executors" -> obj.executors.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner))
}
}
相关信息
相关文章
spark ApplicationDescription 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦