[apex-core] branch master updated: APEXCORE-807 Added renewal of tokens before renewal expiry interval functionality into the engine, refactored token renewal component

classic Classic list List threaded Threaded
1 message Options
thw
Reply | Threaded
Open this post in threaded view
|

[apex-core] branch master updated: APEXCORE-807 Added renewal of tokens before renewal expiry interval functionality into the engine, refactored token renewal component

thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new a8bbec7  APEXCORE-807 Added renewal of tokens before renewal expiry interval functionality into the engine, refactored token renewal component
a8bbec7 is described below

commit a8bbec7f54e94d67106a46f8b1ca6d8e7890f126
Author: Pramod Immaneni <[hidden email]>
AuthorDate: Sun Feb 4 18:59:18 2018 -0800

    APEXCORE-807 Added renewal of tokens before renewal expiry interval functionality into the engine, refactored token renewal component
---
 .../stram/StreamingAppMasterService.java           |  26 +--
 .../datatorrent/stram/client/StramAppLauncher.java |  12 +-
 .../datatorrent/stram/client/StramClientUtils.java |   4 +
 .../stram/engine/StreamingContainer.java           |  24 +--
 .../stram/plan/logical/LogicalPlan.java            |   2 +
 .../datatorrent/stram/security/StramUserLogin.java |  69 -------
 .../apache/apex/engine/security/TokenRenewer.java  | 207 +++++++++++++++++++++
 7 files changed, 239 insertions(+), 105 deletions(-)

diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 6c640ee..e172541 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -28,7 +28,6 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -52,13 +51,12 @@ import org.apache.apex.engine.plugin.DefaultApexPluginDispatcher;
 import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
-import org.apache.commons.io.FileUtils;
+import org.apache.apex.engine.security.TokenRenewer;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.CompositeService;
@@ -111,7 +109,6 @@ import com.datatorrent.stram.plan.physical.PTContainer;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
 import com.datatorrent.stram.security.StramDelegationTokenManager;
-import com.datatorrent.stram.security.StramUserLogin;
 import com.datatorrent.stram.util.ConfigUtils;
 import com.datatorrent.stram.util.SecurityUtils;
 import com.datatorrent.stram.webapp.AppInfo;
@@ -166,6 +163,7 @@ public class StreamingAppMasterService extends CompositeService
   private ApexPluginDispatcher apexPluginDispatcher;
   private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
   private static final long REMOVE_CONTAINER_TIMEOUT = PropertiesHelper.getLong("org.apache.apex.nodemanager.containerKill.timeout", 30 * 1000, 0, Long.MAX_VALUE);
+  private TokenRenewer tokenRenewer;
 
   public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
   {
@@ -693,19 +691,10 @@ public class StreamingAppMasterService extends CompositeService
   private void execute() throws YarnException, IOException
   {
     LOG.info("Starting ApplicationMaster");
-    final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-    LOG.info("number of tokens: {}", credentials.getAllTokens().size());
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<?> token = iter.next();
-      LOG.debug("token: {}", token);
-    }
     final Configuration conf = getConfig();
-    long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
-    long expiryTime = System.currentTimeMillis() + tokenLifeTime;
-    LOG.debug(" expiry token time {}", tokenLifeTime);
-    String principal = dag.getValue(LogicalPlan.PRINCIPAL);
-    String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      tokenRenewer = new TokenRenewer(dag, true, conf, appAttemptID.getApplicationId().toString());
+    }
 
     // Register self with ResourceManager
     RegisterApplicationMasterResponse response = amRmClient.registerApplicationMaster(appMasterHostname, 0, appMasterTrackingUrl);
@@ -778,9 +767,8 @@ public class StreamingAppMasterService extends CompositeService
         loopCounter++;
         final long currentTimeMillis = System.currentTimeMillis();
 
-        if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
-          String applicationId = appAttemptID.getApplicationId().toString();
-          expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
+        if (tokenRenewer != null) {
+          tokenRenewer.checkAndRenew();
         }
 
         if (currentTimeMillis > nodeReportUpdateTime) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 2019f48..d3079d0 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -608,13 +608,23 @@ public class StramAppLauncher
     if (UserGroupInformation.isSecurityEnabled()) {
       long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
       dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
+      LOG.debug("HDFS token life time {}", hdfsTokenMaxLifeTime);
+      long hdfsTokenRenewInterval = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_RENEW_INTERVAL, conf.getLong(StramClientUtils.HDFS_TOKEN_RENEW_INTERVAL, StramClientUtils.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
+      dag.setAttribute(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL, hdfsTokenRenewInterval);
+      LOG.debug("HDFS token renew interval {}", hdfsTokenRenewInterval);
       long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
       dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
+      LOG.debug("RM token life time {}", rmTokenMaxLifeTime);
+      long rmTokenRenewInterval = conf.getLong(StramClientUtils.DT_RM_TOKEN_RENEW_INTERVAL, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY, YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
+      dag.setAttribute(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL, rmTokenRenewInterval);
+      LOG.debug("RM token renew interval {}", rmTokenRenewInterval);
       setTokenRefreshCredentials(dag, conf);
     }
     String tokenRefreshFactor = conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR);
     if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0) {
-      dag.setAttribute(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR, Double.parseDouble(tokenRefreshFactor));
+      double refreshFactor = Double.parseDouble(tokenRefreshFactor);
+      dag.setAttribute(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR, refreshFactor);
+      LOG.debug("Token refresh anticipatory factor {}", refreshFactor);
     }
     StramClient client = new StramClient(conf, dag);
     try {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index a310ee2..d4f190f 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -111,12 +111,16 @@ public class StramClientUtils
   public static final String SUBDIR_CONF = "conf";
   public static final long RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE = 10 * 1000;
   public static final String DT_HDFS_TOKEN_MAX_LIFE_TIME = StreamingApplication.DT_PREFIX + "namenode.delegation.token.max-lifetime";
+  public static final String DT_HDFS_TOKEN_RENEW_INTERVAL = StreamingApplication.DT_PREFIX + "namenode.delegation.token.renew-interval";
   public static final String HDFS_TOKEN_MAX_LIFE_TIME = "dfs.namenode.delegation.token.max-lifetime";
+  public static final String HDFS_TOKEN_RENEW_INTERVAL = "dfs.namenode.delegation.token.renew-interval";
   public static final String DT_RM_TOKEN_MAX_LIFE_TIME = StreamingApplication.DT_PREFIX + "resourcemanager.delegation.token.max-lifetime";
+  public static final String DT_RM_TOKEN_RENEW_INTERVAL = StreamingApplication.DT_PREFIX + "resourcemanager.delegation.token.renew-interval";
   @Deprecated
   public static final String KEY_TAB_FILE = StramUserLogin.DT_AUTH_PREFIX + "store.keytab";
   public static final String TOKEN_ANTICIPATORY_REFRESH_FACTOR = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.factor";
   public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60 * 1000;
+  public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24 * 60 * 60 * 1000;
   public static final String TOKEN_REFRESH_PRINCIPAL = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.principal";
   public static final String TOKEN_REFRESH_KEYTAB = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.keytab";
   /**
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index f5aaf35..927ad6d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -42,15 +42,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.security.TokenRenewer;
 import org.apache.apex.log.LogFileInformation;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.log4j.LogManager;
 
@@ -106,7 +104,6 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
 import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor;
 import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
-import com.datatorrent.stram.security.StramUserLogin;
 import com.datatorrent.stram.stream.BufferServerPublisher;
 import com.datatorrent.stram.stream.BufferServerSubscriber;
 import com.datatorrent.stram.stream.FastPublisher;
@@ -164,6 +161,7 @@ public class StreamingContainer extends YarnContainerMain
   private final MBassador<ContainerEvent> eventBus; // event bus for publishing container events
   HashSet<Component<ContainerContext>> components;
   private RequestFactory requestFactory;
+  private TokenRenewer tokenRenewer;
 
   static {
     try {
@@ -608,22 +606,16 @@ public class StreamingContainer extends YarnContainerMain
     logger.debug("Entering heartbeat loop (interval is {} ms)", this.heartbeatIntervalMillis);
     umbilical.log(containerId, "[" + containerId + "] Entering heartbeat loop..");
     final YarnConfiguration conf = new YarnConfiguration();
-    long tokenLifeTime = (long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
-    long expiryTime = System.currentTimeMillis();
-    final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-    String stackTrace = null;
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<?> token = iter.next();
-      logger.debug("token: {}", token);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      tokenRenewer = new TokenRenewer(containerContext, false, conf, containerId);
     }
-    String principal = containerContext.getValue(LogicalPlan.PRINCIPAL);
-    String hdfsKeyTabFile = containerContext.getValue(LogicalPlan.KEY_TAB_FILE);
+    String stackTrace = null;
     while (!exitHeartbeatLoop) {
 
-      if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) {
-        expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, principal, hdfsKeyTabFile, credentials, null, false);
+      if (tokenRenewer != null) {
+        tokenRenewer.checkAndRenew();
       }
+
       synchronized (this.heartbeatTrigger) {
         try {
           this.heartbeatTrigger.wait(heartbeatIntervalMillis);
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index bf4b2cb..18a9a63 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -161,7 +161,9 @@ public class LogicalPlan implements Serializable, DAG
    * Then it can be moved back to DAGContext.
    */
   public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false);
+  public static Attribute<Long> HDFS_TOKEN_RENEWAL_INTERVAL = new Attribute<>(86400000L);
   public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L);
+  public static Attribute<Long> RM_TOKEN_RENEWAL_INTERVAL = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
   public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
   public static Attribute<String> PRINCIPAL = new Attribute<>(null, StringCodec.String2String.getInstance());
   public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, StringCodec.String2String.getInstance());
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index 71eb825..7522906 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -18,28 +18,16 @@
  */
 package com.datatorrent.stram.security;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.Iterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import com.datatorrent.api.StreamingApplication;
 
-import com.datatorrent.stram.client.StramClientUtils;
-import com.datatorrent.stram.util.FSUtil;
-
 /**
  * <p>StramUserLogin class.</p>
  *
@@ -85,63 +73,6 @@ public class StramUserLogin
     }
   }
 
-  public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String principal, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException
-  {
-    long expiryTime = System.currentTimeMillis() + tokenLifeTime;
-    //renew tokens
-    final String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
-    if (tokenRenewer == null || tokenRenewer.length() == 0) {
-      throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
-    }
-
-    File keyTabFile;
-    try (FileSystem fs = FileSystem.newInstance(conf)) {
-      keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf);
-    }
-
-    if (principal == null) {
-      principal = UserGroupInformation.getCurrentUser().getUserName();
-    }
-    UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath());
-    try {
-      ugi.doAs(new PrivilegedExceptionAction<Object>()
-      {
-        @Override
-        public Object run() throws Exception
-        {
-
-          Credentials creds = new Credentials();
-          try (FileSystem fs1 = FileSystem.newInstance(conf)) {
-            fs1.addDelegationTokens(tokenRenewer, creds);
-          }
-          if (renewRMToken) {
-            try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) {
-              new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds);
-            }
-          }
-          credentials.addAll(creds);
-
-          return null;
-        }
-      });
-      UserGroupInformation.getCurrentUser().addCredentials(credentials);
-    } catch (InterruptedException e) {
-      LOG.error("Error while renewing tokens ", e);
-      expiryTime = System.currentTimeMillis();
-    } catch (IOException e) {
-      LOG.error("Error while renewing tokens ", e);
-      expiryTime = System.currentTimeMillis();
-    }
-    LOG.debug("number of tokens: {}", credentials.getAllTokens().size());
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<?> token = iter.next();
-      LOG.debug("updated token: {}", token);
-    }
-    keyTabFile.delete();
-    return expiryTime;
-  }
-
   public static String getPrincipal()
   {
     return principal;
diff --git a/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java b/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java
new file mode 100644
index 0000000..cda7c98
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.client.StramClientUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.util.FSUtil;
+
+public class TokenRenewer
+{
+
+  // The constant is not available hence defining here. If in future it is available this can be removed
+  private static final Text HDFS_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
+  private static final Logger logger = LoggerFactory.getLogger(TokenRenewer.class);
+
+  boolean renewRMToken;
+  Configuration conf;
+  String destinationFile;
+
+  long tokenLifeTime;
+  long tokenRenewalInterval;
+  String principal;
+  String hdfsKeyTabFile;
+  InetSocketAddress rmAddress;
+
+  long expiryTime;
+  long renewTime;
+  Credentials credentials;
+
+  public TokenRenewer(Context context, boolean renewRMToken, Configuration conf, String destinationFile) throws IOException
+  {
+    this.renewRMToken = renewRMToken;
+    this.destinationFile = destinationFile;
+    this.conf = conf;
+
+    if (renewRMToken) {
+      tokenLifeTime = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(context.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), context.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
+      tokenRenewalInterval = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(context.getValue(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL), context.getValue(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL)));
+      rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+    } else {
+      tokenLifeTime = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * context.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
+      tokenRenewalInterval = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * context.getValue(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL));
+    }
+
+    principal = context.getValue(LogicalPlan.PRINCIPAL);
+    hdfsKeyTabFile = context.getValue(LogicalPlan.KEY_TAB_FILE);
+
+    expiryTime = System.currentTimeMillis() + tokenLifeTime;
+    renewTime = expiryTime;
+
+    logger.debug("token life time {} renewal interval {}", tokenLifeTime, tokenRenewalInterval);
+    logger.debug("Token expiry time {} renew time {}", expiryTime, renewTime);
+
+    credentials = UserGroupInformation.getCurrentUser().getCredentials();
+    // Check credentials are proper at RM
+    if (renewRMToken) {
+      renewTokens(false, true);
+    }
+  }
+
+  public void checkAndRenew() throws IOException
+  {
+    boolean renew = false;
+    boolean refresh = false;
+    long currentTimeMillis = System.currentTimeMillis();
+    if (currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
+      refresh = true;
+    } else if (currentTimeMillis >= renewTime) {
+      renew = true;
+    }
+    if (refresh || renew) {
+      long updateTime = renewTokens(refresh, false);
+      if (refresh) {
+        expiryTime = updateTime;
+        renewTime = currentTimeMillis + tokenRenewalInterval;
+        logger.debug("Token expiry time {} renew time {}", expiryTime, renewTime);
+      } else {
+        renewTime = updateTime;
+        logger.debug("Token renew time {}", renewTime);
+      }
+    }
+  }
+
+  private long renewTokens(final boolean refresh, boolean checkOnly) throws IOException
+  {
+    logger.info("{}", checkOnly ? "Checking renewal" : (refresh ? "Refreshing tokens" : "Renewing tokens"));
+    long expiryTime = System.currentTimeMillis() + (refresh ? tokenLifeTime : tokenRenewalInterval);
+
+    final String tokenRenewer = UserGroupInformation.getCurrentUser().getUserName();
+    logger.debug("Token renewer {}", tokenRenewer);
+
+    File keyTabFile = null;
+    try (FileSystem fs = FileSystem.newInstance(conf)) {
+      String destinationDir = FileUtils.getTempDirectoryPath();
+      keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf);
+
+      if (principal == null) {
+        //principal = UserGroupInformation.getCurrentUser().getUserName();
+        principal = UserGroupInformation.getLoginUser().getUserName();
+      }
+      logger.debug("Principal {}", principal);
+      UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath());
+      if (!checkOnly) {
+        try {
+          UserGroupInformation currUGI = UserGroupInformation.createProxyUser(tokenRenewer, ugi);
+          currUGI.doAs(new PrivilegedExceptionAction<Object>()
+          {
+            @Override
+            public Object run() throws Exception
+            {
+
+              if (refresh) {
+                Credentials creds = new Credentials();
+                try (FileSystem fs1 = FileSystem.newInstance(conf)) {
+                  logger.info("Refreshing fs tokens");
+                  fs1.addDelegationTokens(tokenRenewer, creds);
+                  logger.info("Refreshed tokens");
+                }
+                if (renewRMToken) {
+                  try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) {
+                    logger.info("Refreshing rm tokens");
+                    new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds);
+                    logger.info("Refreshed tokens");
+                  }
+                }
+                credentials.addAll(creds);
+              } else {
+                Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
+                for (Token<? extends TokenIdentifier> token : tokens) {
+                  logger.debug("Token {}", token);
+                  if (token.getKind().equals(HDFS_TOKEN_KIND) || (renewRMToken && token.getKind().equals(RMDelegationTokenIdentifier.KIND_NAME))) {
+                    logger.info("Renewing token {}", token.getKind());
+                    token.renew(conf);
+                    logger.info("Renewed token");
+                  }
+                }
+              }
+
+              return null;
+            }
+          });
+          UserGroupInformation.getCurrentUser().addCredentials(credentials);
+        } catch (InterruptedException e) {
+          logger.error("Error while renewing tokens ", e);
+          expiryTime = System.currentTimeMillis();
+        } catch (IOException e) {
+          logger.error("Error while renewing tokens ", e);
+          expiryTime = System.currentTimeMillis();
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("number of tokens: {}", credentials.getAllTokens().size());
+        Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+        while (iter.hasNext()) {
+          Token<?> token = iter.next();
+          logger.debug("updated token: {}", token);
+        }
+      }
+    } finally {
+      if (keyTabFile != null) {
+        keyTabFile.delete();
+      }
+    }
+    return expiryTime;
+  }
+
+}

--
To stop receiving notification emails like this one, please contact
[hidden email].