Use single thread for workspace indexing.
authorMathieu Baudier <mbaudier@argeo.org>
Tue, 18 Feb 2020 14:27:25 +0000 (15:27 +0100)
committerMathieu Baudier <mbaudier@argeo.org>
Tue, 18 Feb 2020 14:27:25 +0000 (15:27 +0100)
org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsWorkspaceIndexer.java

index 7782b4291f5122d9b72a2d85f0cd93fd7c170b1c..bb38d6a633c02b33e4b0ba76b6cca336ff8aeb8e 100644 (file)
@@ -1,10 +1,8 @@
 package org.argeo.cms.internal.kernel;
 
 import java.util.GregorianCalendar;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jcr.Node;
 import javax.jcr.Property;
@@ -42,6 +40,10 @@ class CmsWorkspaceIndexer implements EventListener {
        private Session session;
        private VersionManager versionManager;
 
+       private LinkedBlockingDeque<Event> toProcess = new LinkedBlockingDeque<>();
+       private IndexingThread indexingThread;
+       private AtomicBoolean stopping = new AtomicBoolean(false);
+
        public CmsWorkspaceIndexer(RepositoryImpl repositoryImpl, String cn, String workspaceName)
                        throws RepositoryException {
                this.cn = cn;
@@ -56,12 +58,24 @@ class CmsWorkspaceIndexer implements EventListener {
                        session.getWorkspace().getObservationManager().addEventListener(this,
                                        Event.NODE_ADDED | Event.PROPERTY_CHANGED, "/", true, null, nodeTypes, true);
                        versionManager = session.getWorkspace().getVersionManager();
+
+                       indexingThread = new IndexingThread();
+                       indexingThread.start();
                } catch (RepositoryException e1) {
                        throw new IllegalStateException(e1);
                }
        }
 
        public void destroy() {
+               stopping.set(true);
+               indexingThread.interrupt();
+               // TODO make it configurable
+               try {
+                       indexingThread.join(10 * 60 * 1000);
+               } catch (InterruptedException e1) {
+                       log.warn("Indexing thread interrupted. Will log out session.");
+               }
+
                try {
                        session.getWorkspace().getObservationManager().removeEventListener(this);
                } catch (RepositoryException e) {
@@ -77,7 +91,12 @@ class CmsWorkspaceIndexer implements EventListener {
                long count = 0;
                while (events.hasNext()) {
                        Event event = events.nextEvent();
-                       processEvent(event);
+                       try {
+                               toProcess.put(event);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+//                     processEvent(event);
                        count++;
                }
                long duration = System.currentTimeMillis() - begin;
@@ -210,20 +229,21 @@ class CmsWorkspaceIndexer implements EventListener {
 
        @Override
        public void onEvent(EventIterator events) {
-               Runnable toRun = new Runnable() {
-
-                       @Override
-                       public void run() {
-                               processEvents(events);
-                       }
-               };
-               Future<?> future = Activator.getInternalExecutorService().submit(toRun);
-               try {
-                       // make the call synchronous
-                       future.get(60, TimeUnit.SECONDS);
-               } catch (TimeoutException | ExecutionException | InterruptedException e) {
-                       // silent
-               }
+               processEvents(events);
+//             Runnable toRun = new Runnable() {
+//
+//                     @Override
+//                     public void run() {
+//                             processEvents(events);
+//                     }
+//             };
+//             Future<?> future = Activator.getInternalExecutorService().submit(toRun);
+//             try {
+//                     // make the call synchronous
+//                     future.get(60, TimeUnit.SECONDS);
+//             } catch (TimeoutException | ExecutionException | InterruptedException e) {
+//                     // silent
+//             }
        }
 
        static String toEtag(Value v) {
@@ -292,4 +312,32 @@ class CmsWorkspaceIndexer implements EventListener {
                return "Indexer for workspace " + workspaceName + " of repository " + cn;
        }
 
+       class IndexingThread extends Thread {
+
+               public IndexingThread() {
+                       super(CmsWorkspaceIndexer.this.toString());
+                       // TODO Auto-generated constructor stub
+               }
+
+               @Override
+               public void run() {
+                       life: while (session != null && session.isLive()) {
+                               try {
+                                       Event nextEvent = toProcess.take();
+                                       processEvent(nextEvent);
+                               } catch (InterruptedException e) {
+                                       // silent
+                                       interrupted();
+                               }
+
+                               if (stopping.get() && toProcess.isEmpty()) {
+                                       break life;
+                               }
+                       }
+                       if (log.isDebugEnabled())
+                               log.debug(CmsWorkspaceIndexer.this.toString() + " has shut down.");
+               }
+
+       }
+
 }
\ No newline at end of file