spark HiveServer2 源码
spark HiveServer2 代码
文件路径:/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.service.server;
import java.util.Properties;
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.util.ShutdownHookManager;
import org.apache.spark.util.SparkExitCode;
/**
* HiveServer2.
*
*/
public class HiveServer2 extends CompositeService {
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
private CLIService cliService;
private ThriftCLIService thriftCLIService;
public HiveServer2() {
super(HiveServer2.class.getSimpleName());
HiveConf.setLoadHiveServer2Config(true);
}
@Override
public synchronized void init(HiveConf hiveConf) {
cliService = new CLIService(this);
addService(cliService);
if (isHTTPTransportMode(hiveConf)) {
thriftCLIService = new ThriftHttpCLIService(cliService);
} else {
thriftCLIService = new ThriftBinaryCLIService(cliService);
}
addService(thriftCLIService);
super.init(hiveConf);
// Add a shutdown hook for catching SIGTERM & SIGINT
// this must be higher than the Hadoop Filesystem priority of 10,
// which the default priority is.
// The signature of the callback must match that of a scala () -> Unit
// function
ShutdownHookManager.addShutdownHook(
new AbstractFunction0<BoxedUnit>() {
public BoxedUnit apply() {
try {
LOG.info("Hive Server Shutdown hook invoked");
stop();
} catch (Throwable e) {
LOG.warn("Ignoring Exception while stopping Hive Server from shutdown hook",
e);
}
return BoxedUnit.UNIT;
}
});
}
public static boolean isHTTPTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
if (transportMode == null) {
transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
}
if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
return true;
}
return false;
}
@Override
public synchronized void start() {
super.start();
}
@Override
public synchronized void stop() {
LOG.info("Shutting down HiveServer2");
super.stop();
}
private static void startHiveServer2() throws Throwable {
long attempts = 0, maxAttempts = 1;
while (true) {
LOG.info("Starting HiveServer2");
HiveConf hiveConf = new HiveConf();
maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
HiveServer2 server = null;
try {
server = new HiveServer2();
server.init(hiveConf);
server.start();
try {
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf);
pauseMonitor.start();
} catch (Throwable t) {
LOG.warn("Could not initiate the JvmPauseMonitor thread.", t);
}
break;
} catch (Throwable throwable) {
if (server != null) {
try {
server.stop();
} catch (Throwable t) {
LOG.info("Exception caught when calling stop of HiveServer2 before retrying start", t);
} finally {
server = null;
}
}
if (++attempts >= maxAttempts) {
throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
} else {
LOG.warn("Error starting HiveServer2 on attempt " + attempts
+ ", will retry in 60 seconds", throwable);
try {
Thread.sleep(60L * 1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
public static void main(String[] args) {
HiveConf.setLoadHiveServer2Config(true);
ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
// Call the executor which will execute the appropriate command based on the parsed options
oprocResponse.getServerOptionsExecutor().execute();
}
/**
* ServerOptionsProcessor.
* Process arguments given to HiveServer2 (-hiveconf property=value)
* Set properties in System properties
* Create an appropriate response object,
* which has executor to execute the appropriate command based on the parsed options.
*/
public static class ServerOptionsProcessor {
private final Options options = new Options();
private org.apache.commons.cli.CommandLine commandLine;
private final String serverName;
private final StringBuilder debugMessage = new StringBuilder();
@SuppressWarnings("static-access")
public ServerOptionsProcessor(String serverName) {
this.serverName = serverName;
// -hiveconf x=y
options.addOption(OptionBuilder
.withValueSeparator()
.hasArgs(2)
.withArgName("property=value")
.withLongOpt("hiveconf")
.withDescription("Use value for given property")
.create());
options.addOption(new Option("H", "help", false, "Print help information"));
}
public ServerOptionsProcessorResponse parse(String[] argv) {
try {
commandLine = new GnuParser().parse(options, argv);
// Process --hiveconf
// Get hiveconf param values and set the System property values
Properties confProps = commandLine.getOptionProperties("hiveconf");
for (String propKey : confProps.stringPropertyNames()) {
// save logging message for log4j output latter after log4j initialize properly
debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
System.setProperty(propKey, confProps.getProperty(propKey));
}
// Process --help
if (commandLine.hasOption('H')) {
return new ServerOptionsProcessorResponse(new HelpOptionExecutor(serverName, options));
}
} catch (ParseException e) {
// Error out & exit - we were not able to parse the args successfully
System.err.println("Error starting HiveServer2 with given arguments: ");
System.err.println(e.getMessage());
System.exit(-1);
}
// Default executor, when no option is specified
return new ServerOptionsProcessorResponse(new StartOptionExecutor());
}
StringBuilder getDebugMessage() {
return debugMessage;
}
}
/**
* The response sent back from {@link ServerOptionsProcessor#parse(String[])}
*/
static class ServerOptionsProcessorResponse {
private final ServerOptionsExecutor serverOptionsExecutor;
ServerOptionsProcessorResponse(ServerOptionsExecutor serverOptionsExecutor) {
this.serverOptionsExecutor = serverOptionsExecutor;
}
ServerOptionsExecutor getServerOptionsExecutor() {
return serverOptionsExecutor;
}
}
/**
* The executor interface for running the appropriate HiveServer2 command based on parsed options
*/
interface ServerOptionsExecutor {
void execute();
}
/**
* HelpOptionExecutor: executes the --help option by printing out the usage
*/
static class HelpOptionExecutor implements ServerOptionsExecutor {
private final Options options;
private final String serverName;
HelpOptionExecutor(String serverName, Options options) {
this.options = options;
this.serverName = serverName;
}
@Override
public void execute() {
new HelpFormatter().printHelp(serverName, options);
System.exit(SparkExitCode.EXIT_SUCCESS());
}
}
/**
* StartOptionExecutor: starts HiveServer2.
* This is the default executor, when no option is specified.
*/
static class StartOptionExecutor implements ServerOptionsExecutor {
@Override
public void execute() {
try {
startHiveServer2();
} catch (Throwable t) {
LOG.error("Error starting HiveServer2", t);
System.exit(-1);
}
}
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦