spark TransportChannelHandler 源码
spark TransportChannelHandler 代码
文件路径:/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.spark.network.TransportContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportResponseHandler;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.ResponseMessage;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
/**
* The single Transport-level Channel handler which is used for delegating requests to the
* {@link TransportRequestHandler} and responses to the {@link TransportResponseHandler}.
*
* All channels created in the transport layer are bidirectional. When the Client initiates a Netty
* Channel with a RequestMessage (which gets handled by the Server's RequestHandler), the Server
* will produce a ResponseMessage (handled by the Client's ResponseHandler). However, the Server
* also gets a handle on the same Channel, so it may then begin to send RequestMessages to the
* Client.
* This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,
* for the Client's responses to the Server's requests.
*
* This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.
* We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic
* on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
* timeout if the client is continuously sending but getting no responses, for simplicity.
*/
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
private static final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);
private final TransportClient client;
private final TransportResponseHandler responseHandler;
private final TransportRequestHandler requestHandler;
private final long requestTimeoutNs;
private final boolean closeIdleConnections;
private final boolean skipChunkFetchRequest;
private final TransportContext transportContext;
public TransportChannelHandler(
TransportClient client,
TransportResponseHandler responseHandler,
TransportRequestHandler requestHandler,
long requestTimeoutMs,
boolean skipChunkFetchRequest,
boolean closeIdleConnections,
TransportContext transportContext) {
this.client = client;
this.responseHandler = responseHandler;
this.requestHandler = requestHandler;
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
this.skipChunkFetchRequest = skipChunkFetchRequest;
this.closeIdleConnections = closeIdleConnections;
this.transportContext = transportContext;
}
public TransportClient getClient() {
return client;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
cause);
requestHandler.exceptionCaught(cause);
responseHandler.exceptionCaught(cause);
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
try {
requestHandler.channelActive();
} catch (RuntimeException e) {
logger.error("Exception from request handler while channel is active", e);
}
try {
responseHandler.channelActive();
} catch (RuntimeException e) {
logger.error("Exception from response handler while channel is active", e);
}
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
requestHandler.channelInactive();
} catch (RuntimeException e) {
logger.error("Exception from request handler while channel is inactive", e);
}
try {
responseHandler.channelInactive();
} catch (RuntimeException e) {
logger.error("Exception from response handler while channel is inactive", e);
}
super.channelInactive(ctx);
}
/**
* Overwrite acceptInboundMessage to properly delegate ChunkFetchRequest messages
* to ChunkFetchRequestHandler.
*/
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
if (skipChunkFetchRequest && msg instanceof ChunkFetchRequest) {
return false;
} else {
return super.acceptInboundMessage(msg);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
}
/** Triggered based on events from an {@link io.netty.handler.timeout.IdleStateHandler}. */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
// See class comment for timeout semantics. In addition to ensuring we only timeout while
// there are outstanding requests, we also do a secondary consistency check to ensure
// there's no race between the idle timeout and incrementing the numOutstandingRequests
// (see SPARK-7003).
//
// To avoid a race between TransportClientFactory.createClient() and this code which could
// result in an inactive client being returned, this needs to run in a synchronized block.
synchronized (this) {
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
if (responseHandler.hasOutstandingRequests()) {
String address = getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust" +
" spark.{}.io.connectionTimeout if this is wrong.",
address, requestTimeoutNs / 1000 / 1000, transportContext.getConf().getModuleName());
client.timeOut();
ctx.close();
} else if (closeIdleConnections) {
// While CloseIdleConnections is enable, we also close idle connection
client.timeOut();
ctx.close();
}
}
}
}
ctx.fireUserEventTriggered(evt);
}
public TransportResponseHandler getResponseHandler() {
return responseHandler;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
transportContext.getRegisteredConnections().inc();
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
transportContext.getRegisteredConnections().dec();
super.channelUnregistered(ctx);
}
}
相关信息
相关文章
spark AbstractAuthRpcHandler 源码
spark BlockPushNonFatalFailure 源码
spark ChunkFetchRequestHandler 源码
spark OneForOneStreamManager 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦