[apex-core] branch master updated: APEXCORE-810 Fixing race condition between publisher and subscriber teardowns

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[apex-core] branch master updated: APEXCORE-810 Fixing race condition between publisher and subscriber teardowns

vrozov
This is an automated email from the ASF dual-hosted git repository.

vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 436785b  APEXCORE-810 Fixing race condition between publisher and subscriber teardowns
436785b is described below

commit 436785bd63be0e90265cf8f8f18882647b8ecab0
Author: Pramod Immaneni <[hidden email]>
AuthorDate: Wed Jan 17 13:48:32 2018 -0800

    APEXCORE-810 Fixing race condition between publisher and subscriber teardowns
---
 .../bufferserver/internal/LogicalNode.java         |  7 +--
 .../datatorrent/bufferserver/server/Server.java    | 62 +++++++++-------------
 2 files changed, 27 insertions(+), 42 deletions(-)

diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 3e8846d..af5db09 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -115,12 +115,7 @@ public class LogicalNode implements DataListener
    */
   public void removeChannel(WriteOnlyClient client)
   {
-    for (PhysicalNode pn : physicalNodes) {
-      if (pn.getClient() == client) {
-        physicalNodes.remove(pn);
-        break;
-      }
-    }
+    physicalNodes.removeIf(node -> (node.getClient().equals(client)));
   }
 
   /**
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c5700f2..6332a18 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -24,9 +24,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -117,9 +114,11 @@ public class Server extends AbstractServer
   @Override
   public void unregistered(SelectionKey key)
   {
+    logger.debug("Unregistered {}", this);
     for (LogicalNode ln : subscriberGroups.values()) {
       ln.boot();
     }
+    super.unregistered(key);
     /*
      * There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor.
      */
@@ -860,41 +859,32 @@ public class Server extends AbstractServer
       }
       torndown = true;
 
-      /*
-       * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
-       * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
-       * are not being written to, just stick around till the next publisher shows up and eat into
-       * the data it's publishing for the new subscribers.
-       */
-
-      /**
-       * since the publisher server died, the queue which it was using would stop pumping the data unless
-       * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
-       * with the same identifier as the one which just died.
-       */
-      if (publisherChannels.containsValue(this)) {
-        final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator();
-        while (i.hasNext()) {
-          if (i.next().getValue() == this) {
-            i.remove();
-            break;
-          }
-        }
-      }
-
-      ArrayList<LogicalNode> list = new ArrayList<>();
-      String publisherIdentifier = datalist.getIdentifier();
-      Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
-      while (iterator.hasNext()) {
-        LogicalNode ln = iterator.next();
-        if (publisherIdentifier.equals(ln.getUpstream())) {
-          list.add(ln);
+      serverHelperExecutor.submit(() ->
+      {
+        /*
+         * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
+         * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
+         * are not being written to, just stick around till the next publisher shows up and eat into
+         * the data it's publishing for the new subscribers.
+         */
+
+        /**
+         * since the publisher server died, the queue which it was using would stop pumping the data unless
+         * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
+         * with the same identifier as the one which just died.
+         */
+        String publisherIdentifier = datalist.getIdentifier();
+        if (!publisherChannels.remove(publisherIdentifier, Publisher.this)) {
+          logger.warn("{} could not be removed from channels", Publisher.this);
         }
-      }
 
-      for (LogicalNode ln : list) {
-        ln.boot();
-      }
+        subscriberGroups.forEach((type, ln) ->  {
+          if (publisherIdentifier.equals(ln.getUpstream())) {
+            logger.debug("Booting logical node {} from publisher", ln);
+            ln.boot();
+          }
+        });
+      });
     }
 
   }

--
To stop receiving notification emails like this one, please contact
[hidden email].