[apex-core] branch master updated: APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[apex-core] branch master updated: APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure

vrozov
This is an automated email from the ASF dual-hosted git repository.

vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new d17f464  APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure
d17f464 is described below

commit d17f464fcaf19778e2f8edbe2b03419151558068
Author: Pramod Immaneni <[hidden email]>
AuthorDate: Thu Sep 14 17:23:54 2017 -0700

    APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure
---
 api/src/main/java/com/datatorrent/api/Context.java |  9 ++++++
 api/src/main/java/com/datatorrent/api/DAG.java     |  3 ++
 .../main/java/com/datatorrent/api/Operator.java    | 24 ++++++++++++++-
 .../api/annotation/OperatorAnnotation.java         | 11 +++++++
 .../java/com/datatorrent/stram/engine/Node.java    | 13 ++++++--
 .../stram/engine/StreamingContainer.java           | 36 +++++++++++++++++++---
 .../stram/plan/logical/LogicalPlan.java            |  6 ++++
 7 files changed, 95 insertions(+), 7 deletions(-)

diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 9fe0c46..0da930d 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
 import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.api.Operator.RecoveryMode;
 import com.datatorrent.api.StringCodec.Class2String;
 import com.datatorrent.api.StringCodec.Collection2String;
 import com.datatorrent.api.StringCodec.Integer2String;
@@ -317,6 +318,14 @@ public interface Context
     Attribute<AutoMetric.DimensionsScheme> METRICS_DIMENSIONS_SCHEME = new Attribute<>(Object2String.<AutoMetric.DimensionsScheme>getInstance());
 
     /**
+     * Specify how to recover the operator in cases of a failure event. The default is to load from checkpoint. However,
+     * in some cases reusing same instance of the operator from before the failure event may be desired. See
+     * {@link RecoveryMode} The latter is only applicable in cases where the recovery is due to a failure of the
+     * upstream operator and not the operator itself.
+     */
+    Attribute<RecoveryMode> RECOVERY_MODE = new Attribute<RecoveryMode>(RecoveryMode.CHECKPOINT);
+
+    /**
      * Return the operator runtime id.
      *
      * @return The id
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 93936d7..471950b 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -27,6 +27,7 @@ import javax.annotation.Nonnull;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.annotation.OperatorAnnotation;
 
 /**
  * DAG contains the logical declarations of operators and streams.
@@ -190,6 +191,8 @@ public interface DAG extends DAGContext, Serializable
 
     OutputPortMeta getMeta(Operator.OutputPort<?> port);
 
+    OperatorAnnotation getOperatorAnnotation();
+
     /**
      * Return collection of stream which are connected to this operator's
      * input ports.
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index dd694d0..f0357e5 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.api;
 
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.GenericOperator;
@@ -71,6 +73,25 @@ public interface Operator extends Component<OperatorContext>, GenericOperator
 
   }
 
+  @Evolving
+  enum RecoveryMode
+  {
+    /**
+     * Recover the operator from checkpoint
+     */
+    CHECKPOINT,
+    /**
+     * Reuse the same instance of the operator from before the failure event.
+     *
+     * This applies to scenarios where the failure is in an upstream operator and the not the operator itself.
+     * Reusing the same instance may not be applicable in all cases as it can lead to incorrect results because the
+     * operator state will not be consistent with the processing position in the stream. This should be used only for
+     * operators that are either invariant to reusing the same state with the stream processing position modified
+     * according to the processing mode or tolerant to it.
+     */
+    REUSE_INSTANCE
+  }
+
   /**
    * This method gets called at the beginning of each window.
    *
@@ -227,7 +248,8 @@ public interface Operator extends Component<OperatorContext>, GenericOperator
   {
     /**
      * Do the operations just before the operator starts processing tasks within the windows.
-     * e.g. establish a network connection.
+     * e.g. establish a network connection. This method is called irrespective of what {@link RecoveryMode}
+     * is being used.
      * @param context - the context in which the operator is executing.
      */
     void activate(CONTEXT context);
diff --git a/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
index 16fd370..e7922b3 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
@@ -24,6 +24,8 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
+import com.datatorrent.api.Operator;
+
 /**
  * Annotation to specify characteristics of an operator.
  *
@@ -49,4 +51,13 @@ public @interface OperatorAnnotation
    * @return whether operator can be checkpointed in middle of an application window.
    */
   boolean checkpointableWithinAppWindow() default true;
+
+  /**
+   * Element specifying the recovery mode for the operator.
+   * By default the operator state is recovered from checkpoint. The operator developer can indicate a preference for
+   * the recovery mode with this element, see {@link Operator.RecoveryMode}. The application developer can override this
+   * by setting the {@link Context.OperatorContext#RECOVERY_MODE} attribute.
+   * @return
+   */
+  Operator.RecoveryMode recoveryMode() default Operator.RecoveryMode.CHECKPOINT;
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 88b002f..5f222c5 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -135,6 +135,8 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
   public long firstWindowMillis;
   public long windowWidthMillis;
 
+  protected boolean reuseOperator;
+
   public Node(OPERATOR operator, OperatorContext context)
   {
     this.operator = operator;
@@ -180,12 +182,19 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
     return operator;
   }
 
+  public void setReuseOperator(boolean reuseOperator)
+  {
+    this.reuseOperator = reuseOperator;
+  }
+
   @Override
   public void setup(OperatorContext context)
   {
     shutdown = false;
-    logger.debug("Operator Context = {}", context);
-    operator.setup(context);
+    if (!reuseOperator) {
+      logger.debug("Operator Context = {}", context);
+      operator.setup(context);
+    }
 //    this is where the ports should be setup but since the
 //    portcontext is not available here, we are doing it in
 //    StramChild. In future version, we should move that code here
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 927ad6d..699d646 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -66,6 +66,7 @@ import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.StringCodec;
+import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.bufferserver.server.Server;
 import com.datatorrent.bufferserver.storage.DiskStorage;
@@ -163,6 +164,8 @@ public class StreamingContainer extends YarnContainerMain
   private RequestFactory requestFactory;
   private TokenRenewer tokenRenewer;
 
+  private Map<Integer, Node<?>> reuseOpNodes = new HashMap<>();
+
   static {
     try {
       eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop");
@@ -518,7 +521,7 @@ public class StreamingContainer extends YarnContainerMain
     }
   }
 
-  private synchronized void undeploy(List<Integer> nodeList)
+  private synchronized Map<Integer, Node<?>> undeploy(List<Integer> nodeList)
   {
     /**
      * make sure that all the operators which we are asked to undeploy are in this container.
@@ -565,6 +568,8 @@ public class StreamingContainer extends YarnContainerMain
     for (Integer operatorId : nodeList) {
       nodes.remove(operatorId);
     }
+
+    return toUndeploy;
   }
 
   public void teardown()
@@ -800,7 +805,9 @@ public class StreamingContainer extends YarnContainerMain
     if (rsp.undeployRequest != null) {
       logger.info("Undeploy request: {}", rsp.undeployRequest);
       processNodeRequests(false);
-      undeploy(rsp.undeployRequest);
+      Map<Integer, Node<?>> undeployNodes = undeploy(rsp.undeployRequest);
+      undeployNodes.entrySet().removeIf((entry) -> !isReuseOperator(entry.getValue()));
+      reuseOpNodes.putAll(undeployNodes);
     }
 
     if (rsp.shutdown != null) {
@@ -833,6 +840,19 @@ public class StreamingContainer extends YarnContainerMain
     processNodeRequests(true);
   }
 
+  private boolean isReuseOperator(Node<?> node)
+  {
+    if (node.context.getAttributes().contains(OperatorContext.RECOVERY_MODE)) {
+      return node.context.getValue(OperatorContext.RECOVERY_MODE) == Operator.RecoveryMode.REUSE_INSTANCE;
+    } else {
+      if (node.operator.getClass().isAnnotationPresent(OperatorAnnotation.class)) {
+        return node.operator.getClass().getAnnotation(OperatorAnnotation.class).recoveryMode() == Operator.RecoveryMode.REUSE_INSTANCE;
+      }
+    }
+    logger.debug("Is reuse operator {} {}", node, false);
+    return false;
+  }
+
   private void stopInputNodes()
   {
     for (Entry<Integer, Node<?>> e : nodes.entrySet()) {
@@ -924,8 +944,16 @@ public class StreamingContainer extends YarnContainerMain
 
       OperatorContext ctx = new OperatorContext(ndi.id, ndi.name, ndi.contextAttributes, parentContext);
       ctx.attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, ndi.checkpoint.windowId);
-      logger.debug("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless);
-      Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
+      Node<?> node = reuseOpNodes.get(ndi.id);
+      if (node == null) {
+        logger.info("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless);
+        node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
+      } else {
+        logger.info("Reusing previous operator instance {}", ndi.id);
+        node = Node.retrieveNode(node.operator, ctx, ndi.type);
+        node.setReuseOperator(true);
+        reuseOpNodes.remove(ndi.id);
+      }
       node.currentWindowId = ndi.checkpoint.windowId;
       node.applicationWindowCount = ndi.checkpoint.applicationWindowCount;
       node.firstWindowMillis = firstWindowMillis;
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 18a9a63..74510a6 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1166,6 +1166,12 @@ public class LogicalPlan implements Serializable, DAG
     }
 
     @Override
+    public OperatorAnnotation getOperatorAnnotation()
+    {
+      return operatorAnnotation;
+    }
+
+    @Override
     public InputPortMeta getMeta(Operator.InputPort<?> port)
     {
       return getPortMapping().inPortMap.get(port);