hadoop TimelineV2ClientImpl 源码

  • 2022-10-20
  • 浏览 (186)

haddop TimelineV2ClientImpl 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.client.api.impl;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CancellationException;

import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;

import org.apache.hadoop.classification.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.core.util.MultivaluedMapImpl;

/**
 * Implementation of timeline v2 client interface.
 *
 */
public class TimelineV2ClientImpl extends TimelineV2Client {
  private static final Logger LOG =
      LoggerFactory.getLogger(TimelineV2ClientImpl.class);

  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";

  private TimelineEntityDispatcher entityDispatcher;
  private TimelineEntityDispatcher subAppEntityDispatcher;
  private volatile String timelineServiceAddress;
  @VisibleForTesting
  volatile Token currentTimelineToken = null;

  // Retry parameters for identifying new timeline service
  // TODO consider to merge with connection retry
  private int maxServiceRetries;
  private long serviceRetryInterval;

  private TimelineConnector connector;

  private ApplicationId contextAppId;

  private UserGroupInformation authUgi;

  public TimelineV2ClientImpl(ApplicationId appId) {
    super(TimelineV2ClientImpl.class.getName());
    this.contextAppId = appId;
  }

  public ApplicationId getContextAppId() {
    return contextAppId;
  }

  protected void serviceInit(Configuration conf) throws Exception {
    if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
      throw new IOException("Timeline V2 client is not properly configured. "
          + "Either timeline service is not enabled or version is not set to"
          + " 2");
    }
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    UserGroupInformation realUgi = ugi.getRealUser();
    String doAsUser = null;
    if (realUgi != null) {
      authUgi = realUgi;
      doAsUser = ugi.getShortUserName();
    } else {
      authUgi = ugi;
      doAsUser = null;
    }
    // TODO need to add/cleanup filter retry later for ATSV2. similar to V1
    DelegationTokenAuthenticatedURL.Token token =
        new DelegationTokenAuthenticatedURL.Token();
    connector = new TimelineConnector(false, authUgi, doAsUser, token);
    addIfService(connector);

    // new version need to auto discovery (with retry till ATS v2 address is
    // got).
    maxServiceRetries =
        conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
    serviceRetryInterval = conf.getLong(
        YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
    entityDispatcher = new TimelineEntityDispatcher(conf);
    subAppEntityDispatcher = new TimelineEntityDispatcher(conf);
    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    super.serviceStart();
    entityDispatcher.start();
    subAppEntityDispatcher.start();
  }

  @Override
  protected void serviceStop() throws Exception {
    entityDispatcher.stop();
    subAppEntityDispatcher.stop();
    super.serviceStop();
  }

  @Override
  public void putEntities(TimelineEntity... entities)
      throws IOException, YarnException {
    entityDispatcher.dispatchEntities(true, entities, false);
  }

  @Override
  public void putEntitiesAsync(TimelineEntity... entities)
      throws IOException, YarnException {
    entityDispatcher.dispatchEntities(false, entities, false);
  }

  @Override
  public void putSubAppEntities(TimelineEntity... entities)
      throws IOException, YarnException {
    subAppEntityDispatcher.dispatchEntities(true, entities, true);
  }

  @Override
  public void putSubAppEntitiesAsync(TimelineEntity... entities)
      throws IOException, YarnException {
    subAppEntityDispatcher.dispatchEntities(false, entities, true);
  }

  @Override
  public void setTimelineCollectorInfo(CollectorInfo collectorInfo) {
    if (collectorInfo == null) {
      LOG.warn("Not setting collector info as it is null.");
      return;
    }
    // First update the token so that it is available when collector address is
    // used.
    if (collectorInfo.getCollectorToken() != null) {
      // Use collector address to update token service if its not available.
      setTimelineDelegationToken(
          collectorInfo.getCollectorToken(), collectorInfo.getCollectorAddr());
    }
    // Update timeline service address.
    if (collectorInfo.getCollectorAddr() != null &&
        !collectorInfo.getCollectorAddr().isEmpty() &&
        !collectorInfo.getCollectorAddr().equals(timelineServiceAddress)) {
      this.timelineServiceAddress = collectorInfo.getCollectorAddr();
      LOG.info("Updated timeline service address to " + timelineServiceAddress);
    }
  }

  private void setTimelineDelegationToken(Token delegationToken,
      String collectorAddr) {
    // Checks below are to ensure that an invalid token is not updated in UGI.
    // This is required because timeline token is set via a public API.
    if (!delegationToken.getKind().equals(
        TimelineDelegationTokenIdentifier.KIND_NAME.toString())) {
      LOG.warn("Timeline token to be updated should be of kind " +
          TimelineDelegationTokenIdentifier.KIND_NAME);
      return;
    }
    if (collectorAddr == null || collectorAddr.isEmpty()) {
      collectorAddr = timelineServiceAddress;
    }
    // Token need not be updated if both address and token service do not exist.
    String service = delegationToken.getService();
    if ((service == null || service.isEmpty()) &&
        (collectorAddr == null || collectorAddr.isEmpty())) {
      LOG.warn("Timeline token does not have service and timeline service " +
          "address is not yet set. Not updating the token");
      return;
    }
    // No need to update a duplicate token.
    if (currentTimelineToken != null &&
        currentTimelineToken.equals(delegationToken)) {
      return;
    }
    currentTimelineToken = delegationToken;
    // Convert the token, sanitize the token service and add it to UGI.
    org.apache.hadoop.security.token.
        Token<TimelineDelegationTokenIdentifier> timelineToken =
            new org.apache.hadoop.security.token.
            Token<TimelineDelegationTokenIdentifier>(
                delegationToken.getIdentifier().array(),
                delegationToken.getPassword().array(),
                new Text(delegationToken.getKind()),
                service == null ? new Text() : new Text(service));
    // Prefer timeline service address over service coming in the token for
    // updating the token service.
    InetSocketAddress serviceAddr =
        (collectorAddr != null && !collectorAddr.isEmpty()) ?
        NetUtils.createSocketAddr(collectorAddr) :
        SecurityUtil.getTokenServiceAddr(timelineToken);
    SecurityUtil.setTokenService(timelineToken, serviceAddr);
    authUgi.addToken(timelineToken);
    LOG.info("Updated timeline delegation token " + timelineToken);
  }

  @Private
  protected void putObjects(String path, MultivaluedMap<String, String> params,
      Object obj) throws IOException, YarnException {

    int retries = verifyRestEndPointAvailable();

    // timelineServiceAddress could be stale, add retry logic here.
    boolean needRetry = true;
    while (needRetry) {
      try {
        URI uri = TimelineConnector.constructResURI(getConfig(),
            timelineServiceAddress, RESOURCE_URI_STR_V2);
        putObjects(uri, path, params, obj);
        needRetry = false;
      } catch (IOException e) {
        // handle exception for timelineServiceAddress being updated.
        checkRetryWithSleep(retries, e);
        retries--;
      }
    }
  }

  /**
   * Check if reaching to maximum of retries.
   *
   * @param retries
   * @param e
   */
  private void checkRetryWithSleep(int retries, IOException e)
      throws YarnException, IOException {
    if (retries > 0) {
      try {
        Thread.sleep(this.serviceRetryInterval);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        throw new YarnException("Interrupted while retrying to connect to ATS");
      }
    } else {
      StringBuilder msg =
          new StringBuilder("TimelineClient has reached to max retry times : ");
      msg.append(this.maxServiceRetries)
          .append(" for service address: ")
          .append(timelineServiceAddress);
      LOG.error(msg.toString());
      throw new IOException(msg.toString(), e);
    }
  }

  private ClientResponse doPutObjects(URI base, String path,
      MultivaluedMap<String, String> params, Object obj) {
    return connector.getClient().resource(base).path(path).queryParams(params)
        .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
        .put(ClientResponse.class, obj);
  }

  protected void putObjects(URI base, String path,
      MultivaluedMap<String, String> params, Object obj)
      throws IOException, YarnException {
    ClientResponse resp = null;
    try {
      resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
        @Override
        public ClientResponse run() throws Exception {
          return doPutObjects(base, path, params, obj);
        }
      });
    } catch (UndeclaredThrowableException ue) {
      Throwable cause = ue.getCause();
      if (cause instanceof IOException) {
        throw (IOException)cause;
      } else {
        throw new IOException(cause);
      }
    } catch (InterruptedException ie) {
      throw (IOException) new InterruptedIOException().initCause(ie);
    }

    //Close ClientResponse's input stream as we are done posting objects.
    //ClientResponse#getEntity closes the input stream upon failure in
    //processing HTTP response.
    if (resp == null) {
      String msg = "Error getting HTTP response from the timeline server.";
      LOG.error(msg);
      throw new YarnException(msg);
    } else if (resp.getStatusInfo().getStatusCode()
            == ClientResponse.Status.OK.getStatusCode()) {
      try {
        resp.close();
      } catch(ClientHandlerException che) {
        LOG.warn("Error closing the HTTP response's inputstream. ", che);
      }
    } else {
      String msg = "";
      try {
        String stringType = resp.getEntity(String.class);
        msg = "Server response:\n" + stringType;
      } catch (ClientHandlerException | UniformInterfaceException chuie) {
        msg = "Error getting entity from the HTTP response."
                + chuie.getLocalizedMessage();
      } catch (Throwable t) {
        msg = "Error getting entity from the HTTP response."
                + t.getLocalizedMessage();
      } finally {
        msg = "Response from the timeline server is not successful"
                  + ", HTTP error code: " + resp.getStatus()
                  + ", "
                  + msg;
        LOG.error(msg);
        throw new YarnException(msg);
      }
    }
  }

  private int verifyRestEndPointAvailable() throws YarnException {
    // timelineServiceAddress could haven't be initialized yet
    // or stale (only for new timeline service)
    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
    if (timelineServiceAddress == null) {
      String errMessage = "TimelineClient has reached to max retry times : "
          + this.maxServiceRetries
          + ", but failed to fetch timeline service address. Please verify"
          + " Timeline Auxiliary Service is configured in all the NMs";
      LOG.error(errMessage);
      throw new YarnException(errMessage);
    }
    return retries;
  }

  /**
   * Poll TimelineServiceAddress for maximum of retries times if it is null.
   *
   * @param retries
   * @return the left retry times
   * @throws IOException
   */
  private int pollTimelineServiceAddress(int retries) throws YarnException {
    while (timelineServiceAddress == null && retries > 0) {
      try {
        Thread.sleep(this.serviceRetryInterval);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new YarnException("Interrupted while trying to connect ATS");
      }
      retries--;
    }
    return retries;
  }

  private final class EntitiesHolder extends FutureTask<Void> {
    private final TimelineEntities entities;
    private final boolean isSync;

    EntitiesHolder(final TimelineEntities entities, final boolean isSync,
        final boolean subappwrite) {
      super(new Callable<Void>() {
        // publishEntities()
        public Void call() throws Exception {
          MultivaluedMap<String, String> params = new MultivaluedMapImpl();
          params.add("appid", getContextAppId().toString());
          params.add("async", Boolean.toString(!isSync));
          params.add("subappwrite", Boolean.toString(subappwrite));
          putObjects("entities", params, entities);
          return null;
        }
      });
      this.entities = entities;
      this.isSync = isSync;
    }

    public boolean isSync() {
      return isSync;
    }

    public TimelineEntities getEntities() {
      return entities;
    }
  }

  /**
   * This class is responsible for collecting the timeline entities and
   * publishing them in async.
   */
  private class TimelineEntityDispatcher {
    /**
     * Time period for which the timelineclient will wait for draining after
     * stop.
     */
    private final long drainTimeoutPeriod;

    private int numberOfAsyncsToMerge;
    private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
    private ExecutorService executor;

    TimelineEntityDispatcher(Configuration conf) {
      timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
      numberOfAsyncsToMerge =
          conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
              YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
      drainTimeoutPeriod = conf.getLong(
          YarnConfiguration.TIMELINE_V2_CLIENT_DRAIN_TIME_MILLIS,
          YarnConfiguration.DEFAULT_TIMELINE_V2_CLIENT_DRAIN_TIME_MILLIS);
    }

    Runnable createRunnable() {
      return new Runnable() {
        @Override
        public void run() {
          try {
            EntitiesHolder entitiesHolder;
            while (!Thread.currentThread().isInterrupted()) {
              // Merge all the async calls and make one push, but if its sync
              // call push immediately
              try {
                entitiesHolder = timelineEntityQueue.take();
              } catch (InterruptedException ie) {
                LOG.info("Timeline dispatcher thread was interrupted ");
                Thread.currentThread().interrupt();
                return;
              }
              if (entitiesHolder != null) {
                publishWithoutBlockingOnQueue(entitiesHolder);
              }
            }
          } finally {
            if (!timelineEntityQueue.isEmpty()) {
              LOG.info("Yet to publish " + timelineEntityQueue.size()
                  + " timelineEntities, draining them now. ");
            }
            // Try to drain the remaining entities to be published @ the max for
            // 2 seconds
            long timeTillweDrain =
                System.currentTimeMillis() + drainTimeoutPeriod;
            while (!timelineEntityQueue.isEmpty()) {
              publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
              if (System.currentTimeMillis() > timeTillweDrain) {
                // time elapsed stop publishing further....
                if (!timelineEntityQueue.isEmpty()) {
                  LOG.warn("Time to drain elapsed! Remaining "
                      + timelineEntityQueue.size() + "timelineEntities will not"
                      + " be published");
                  // if some entities were not drained then we need interrupt
                  // the threads which had put sync EntityHolders to the queue.
                  EntitiesHolder nextEntityInTheQueue = null;
                  while ((nextEntityInTheQueue =
                      timelineEntityQueue.poll()) != null) {
                    nextEntityInTheQueue.cancel(true);
                  }
                }
                break;
              }
            }
          }
        }

        /**
         * Publishes the given EntitiesHolder and return immediately if sync
         * call, else tries to fetch the EntitiesHolder from the queue in non
         * blocking fashion and collate the Entities if possible before
         * publishing through REST.
         *
         * @param entitiesHolder
         */
        private void publishWithoutBlockingOnQueue(
            EntitiesHolder entitiesHolder) {
          if (entitiesHolder.isSync()) {
            entitiesHolder.run();
            return;
          }
          int count = 1;
          while (true) {
            // loop till we find a sync put Entities or there is nothing
            // to take
            EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
            if (nextEntityInTheQueue == null) {
              // Nothing in the queue just publish and get back to the
              // blocked wait state
              entitiesHolder.run();
              break;
            } else if (nextEntityInTheQueue.isSync()) {
              // flush all the prev async entities first
              entitiesHolder.run();
              // and then flush the sync entity
              nextEntityInTheQueue.run();
              break;
            } else {
              // append all async entities together and then flush
              entitiesHolder.getEntities().addEntities(
                  nextEntityInTheQueue.getEntities().getEntities());
              count++;
              if (count == numberOfAsyncsToMerge) {
                // Flush the entities if the number of the async
                // putEntites merged reaches the desired limit. To avoid
                // collecting multiple entities and delaying for a long
                // time.
                entitiesHolder.run();
                break;
              }
            }
          }
        }
      };
    }

    public void dispatchEntities(boolean sync,
        TimelineEntity[] entitiesTobePublished, boolean subappwrite)
        throws YarnException {
      if (executor.isShutdown()) {
        throw new YarnException("Timeline client is in the process of stopping,"
            + " not accepting any more TimelineEntities");
      }

      // wrap all TimelineEntity into TimelineEntities object
      TimelineEntities entities = new TimelineEntities();
      for (TimelineEntity entity : entitiesTobePublished) {
        entities.addEntity(entity);
      }

      // created a holder and place it in queue
      EntitiesHolder entitiesHolder =
          new EntitiesHolder(entities, sync, subappwrite);
      try {
        timelineEntityQueue.put(entitiesHolder);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new YarnException(
            "Failed while adding entity to the queue for publishing", e);
      }

      if (sync) {
        // In sync call we need to wait till its published and if any error then
        // throw it back
        try {
          entitiesHolder.get();
        } catch (ExecutionException e) {
          throw new YarnException("Failed while publishing entity",
              e.getCause());
        } catch (InterruptedException | CancellationException e) {
          Thread.currentThread().interrupt();
          throw new YarnException("Interrupted while publishing entity", e);
        } catch (Exception e) {
          throw new YarnException("Encountered error while publishing entity", e);
        }
      }
    }

    public void start() {
      executor = Executors.newSingleThreadExecutor();
      executor.execute(createRunnable());
    }

    public void stop() {
      LOG.info("Stopping TimelineClient.");
      executor.shutdownNow();
      try {
        executor.awaitTermination(drainTimeoutPeriod, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop DirectTimelineWriter 源码

hadoop FileSystemTimelineWriter 源码

hadoop TimelineClientImpl 源码

hadoop TimelineConnector 源码

hadoop TimelineReaderClientImpl 源码

hadoop TimelineWriter 源码

hadoop package-info 源码

0  赞