Use single thread for workspace indexing.
[lgpl/argeo-commons.git] / org.argeo.cms / src / org / argeo / cms / internal / kernel / CmsWorkspaceIndexer.java
index 45c71afd1f3854e1abf214f153cc503506b0b4d4..bb38d6a633c02b33e4b0ba76b6cca336ff8aeb8e 100644 (file)
@@ -1,10 +1,8 @@
 package org.argeo.cms.internal.kernel;
 
 import java.util.GregorianCalendar;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jcr.Node;
 import javax.jcr.Property;
@@ -30,9 +28,9 @@ class CmsWorkspaceIndexer implements EventListener {
 
 //     private final static String MIX_ETAG = "mix:etag";
        private final static String JCR_ETAG = "jcr:etag";
-       private final static String JCR_LAST_MODIFIED = "jcr:lastModified";
-       private final static String JCR_LAST_MODIFIED_BY = "jcr:lastModifiedBy";
-       private final static String JCR_MIXIN_TYPES = "jcr:mixinTypes";
+//     private final static String JCR_LAST_MODIFIED = "jcr:lastModified";
+//     private final static String JCR_LAST_MODIFIED_BY = "jcr:lastModifiedBy";
+//     private final static String JCR_MIXIN_TYPES = "jcr:mixinTypes";
        private final static String JCR_DATA = "jcr:data";
        private final static String JCR_CONTENT = "jcr:data";
 
@@ -42,6 +40,10 @@ class CmsWorkspaceIndexer implements EventListener {
        private Session session;
        private VersionManager versionManager;
 
+       private LinkedBlockingDeque<Event> toProcess = new LinkedBlockingDeque<>();
+       private IndexingThread indexingThread;
+       private AtomicBoolean stopping = new AtomicBoolean(false);
+
        public CmsWorkspaceIndexer(RepositoryImpl repositoryImpl, String cn, String workspaceName)
                        throws RepositoryException {
                this.cn = cn;
@@ -54,14 +56,26 @@ class CmsWorkspaceIndexer implements EventListener {
                try {
                        String[] nodeTypes = { NodeType.NT_FILE, NodeType.MIX_LAST_MODIFIED };
                        session.getWorkspace().getObservationManager().addEventListener(this,
-                                       Event.NODE_ADDED | Event.NODE_REMOVED | Event.PROPERTY_CHANGED, "/", true, null, nodeTypes, true);
+                                       Event.NODE_ADDED | Event.PROPERTY_CHANGED, "/", true, null, nodeTypes, true);
                        versionManager = session.getWorkspace().getVersionManager();
+
+                       indexingThread = new IndexingThread();
+                       indexingThread.start();
                } catch (RepositoryException e1) {
                        throw new IllegalStateException(e1);
                }
        }
 
        public void destroy() {
+               stopping.set(true);
+               indexingThread.interrupt();
+               // TODO make it configurable
+               try {
+                       indexingThread.join(10 * 60 * 1000);
+               } catch (InterruptedException e1) {
+                       log.warn("Indexing thread interrupted. Will log out session.");
+               }
+
                try {
                        session.getWorkspace().getObservationManager().removeEventListener(this);
                } catch (RepositoryException e) {
@@ -77,7 +91,12 @@ class CmsWorkspaceIndexer implements EventListener {
                long count = 0;
                while (events.hasNext()) {
                        Event event = events.nextEvent();
-                       processEvent(event);
+                       try {
+                               toProcess.put(event);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+//                     processEvent(event);
                        count++;
                }
                long duration = System.currentTimeMillis() - begin;
@@ -95,6 +114,7 @@ class CmsWorkspaceIndexer implements EventListener {
                                if (log.isTraceEnabled())
                                        log.trace("NODE_ADDED " + eventPath);
 //                             session.refresh(true);
+                               session.refresh(false);
                                Node node = session.getNode(eventPath);
                                Node parentNode = node.getParent();
                                if (parentNode.isNodeType(NodeType.NT_FILE)) {
@@ -103,7 +123,10 @@ class CmsWorkspaceIndexer implements EventListener {
                                                        node.addMixin(NodeType.MIX_LAST_MODIFIED);
                                                Property property = node.getProperty(Property.JCR_DATA);
                                                String etag = toEtag(property.getValue());
+                                               session.save();
                                                node.setProperty(JCR_ETAG, etag);
+                                               if (log.isTraceEnabled())
+                                                       log.trace("ETag and last modified added to new " + node);
                                        } else if (node.isNodeType(NodeType.NT_RESOURCE)) {
 //                                             if (!node.isNodeType(MIX_ETAG))
 //                                                     node.addMixin(MIX_ETAG);
@@ -113,47 +136,55 @@ class CmsWorkspaceIndexer implements EventListener {
 //                                             node.setProperty(JCR_ETAG, etag);
 //                                             session.save();
                                        }
-                                       setLastModified(parentNode, event);
-                                       session.save();
-                                       if (log.isTraceEnabled())
-                                               log.trace("ETag and last modified added to new " + node);
+//                                     setLastModifiedRecursive(parentNode, event);
+//                                     session.save();
+//                                     if (log.isTraceEnabled())
+//                                             log.trace("ETag and last modified added to new " + node);
                                }
 
-                               if (node.isNodeType(NodeType.NT_FOLDER)) {
-                                       setLastModified(node, event);
-                                       session.save();
-                                       if (log.isTraceEnabled())
-                                               log.trace("Last modified added to new " + node);
-                               }
+//                             if (node.isNodeType(NodeType.NT_FOLDER)) {
+//                                     setLastModifiedRecursive(node, event);
+//                                     session.save();
+//                                     if (log.isTraceEnabled())
+//                                             log.trace("Last modified added to new " + node);
+//                             }
                        } else if (event.getType() == Event.PROPERTY_CHANGED) {
                                String propertyName = extractItemName(eventPath);
                                // skip if last modified properties are explicitly set
-                               if (propertyName.equals(JCR_LAST_MODIFIED))
-                                       return;
-                               if (propertyName.equals(JCR_LAST_MODIFIED_BY))
-                                       return;
-                               if (propertyName.equals(JCR_MIXIN_TYPES))
-                                       return;
-                               if (propertyName.equals(JCR_ETAG))
+                               if (!propertyName.equals(JCR_DATA))
                                        return;
+//                             if (propertyName.equals(JCR_LAST_MODIFIED))
+//                                     return;
+//                             if (propertyName.equals(JCR_LAST_MODIFIED_BY))
+//                                     return;
+//                             if (propertyName.equals(JCR_MIXIN_TYPES))
+//                                     return;
+//                             if (propertyName.equals(JCR_ETAG))
+//                                     return;
 
                                if (log.isTraceEnabled())
                                        log.trace("PROPERTY_CHANGED " + eventPath);
 
                                if (!session.propertyExists(eventPath))
                                        return;
-//                             session.refresh(true);
+                               session.refresh(false);
                                Property property = session.getProperty(eventPath);
                                Node node = property.getParent();
                                if (property.getType() == PropertyType.BINARY && propertyName.equals(JCR_DATA)
                                                && node.isNodeType(NodeType.NT_UNSTRUCTURED)) {
                                        String etag = toEtag(property.getValue());
                                        node.setProperty(JCR_ETAG, etag);
+                                       Node parentNode = node.getParent();
+                                       if (parentNode.isNodeType(NodeType.MIX_LAST_MODIFIED)) {
+                                               setLastModified(parentNode, event);
+                                       }
+                                       if (log.isTraceEnabled())
+                                               log.trace("ETag and last modified updated for " + node);
                                }
-                               setLastModified(node, event);
-                               session.save();
-                               if (log.isTraceEnabled())
-                                       log.trace("ETag and last modified updated for " + node);
+//                             setLastModified(node, event);
+//                             session.save();
+//                             if (log.isTraceEnabled())
+//                                     log.trace("ETag and last modified updated for " + node);
                        } else if (event.getType() == Event.NODE_REMOVED) {
                                String removeNodePath = eventPath;
                                String nodeName = extractItemName(eventPath);
@@ -161,7 +192,7 @@ class CmsWorkspaceIndexer implements EventListener {
                                        return;
                                if (log.isTraceEnabled())
                                        log.trace("NODE_REMOVED " + eventPath);
-                               String parentPath = JcrUtils.parentPath(removeNodePath);
+//                             String parentPath = JcrUtils.parentPath(removeNodePath);
 //                             session.refresh(true);
 //                             setLastModified(parentPath, event);
 //                             session.save();
@@ -198,20 +229,21 @@ class CmsWorkspaceIndexer implements EventListener {
 
        @Override
        public void onEvent(EventIterator events) {
-               Runnable toRun = new Runnable() {
-
-                       @Override
-                       public void run() {
-                               processEvents(events);
-                       }
-               };
-               Future<?> future = Activator.getInternalExecutorService().submit(toRun);
-               try {
-                       // make the call synchronous
-                       future.get(60, TimeUnit.SECONDS);
-               } catch (TimeoutException | ExecutionException | InterruptedException e) {
-                       // silent
-               }
+               processEvents(events);
+//             Runnable toRun = new Runnable() {
+//
+//                     @Override
+//                     public void run() {
+//                             processEvents(events);
+//                     }
+//             };
+//             Future<?> future = Activator.getInternalExecutorService().submit(toRun);
+//             try {
+//                     // make the call synchronous
+//                     future.get(60, TimeUnit.SECONDS);
+//             } catch (TimeoutException | ExecutionException | InterruptedException e) {
+//                     // silent
+//             }
        }
 
        static String toEtag(Value v) {
@@ -224,16 +256,20 @@ class CmsWorkspaceIndexer implements EventListener {
 
        }
 
-       /** Recursively set the last updated time on parents. */
        protected synchronized void setLastModified(Node node, Event event) throws RepositoryException {
+               GregorianCalendar calendar = new GregorianCalendar();
+               calendar.setTimeInMillis(event.getDate());
+               node.setProperty(Property.JCR_LAST_MODIFIED, calendar);
+               node.setProperty(Property.JCR_LAST_MODIFIED_BY, event.getUserID());
+               if (log.isTraceEnabled())
+                       log.trace("Last modified set on " + node);
+       }
+
+       /** Recursively set the last updated time on parents. */
+       protected synchronized void setLastModifiedRecursive(Node node, Event event) throws RepositoryException {
                if (versionManager.isCheckedOut(node.getPath())) {
                        if (node.isNodeType(NodeType.MIX_LAST_MODIFIED)) {
-                               GregorianCalendar calendar = new GregorianCalendar();
-                               calendar.setTimeInMillis(event.getDate());
-                               node.setProperty(Property.JCR_LAST_MODIFIED, calendar);
-                               node.setProperty(Property.JCR_LAST_MODIFIED_BY, event.getUserID());
-                               if (log.isTraceEnabled())
-                                       log.trace("Last modified set on " + node);
+                               setLastModified(node, event);
                        }
                        if (node.isNodeType(NodeType.NT_FOLDER) && !node.isNodeType(NodeType.MIX_LAST_MODIFIED)) {
                                node.addMixin(NodeType.MIX_LAST_MODIFIED);
@@ -253,7 +289,7 @@ class CmsWorkspaceIndexer implements EventListener {
                        return;
                } else {
                        Node parent = node.getParent();
-                       setLastModified(parent, event);
+                       setLastModifiedRecursive(parent, event);
                }
        }
 
@@ -261,13 +297,13 @@ class CmsWorkspaceIndexer implements EventListener {
         * Recursively set the last updated time on parents. Useful to use paths when
         * dealing with deletions.
         */
-       protected synchronized void setLastModified(String path, Event event) throws RepositoryException {
+       protected synchronized void setLastModifiedRecursive(String path, Event event) throws RepositoryException {
                // root node will always exist, so end condition is delegated to the other
                // recursive setLastModified method
                if (session.nodeExists(path)) {
-                       setLastModified(session.getNode(path), event);
+                       setLastModifiedRecursive(session.getNode(path), event);
                } else {
-                       setLastModified(JcrUtils.parentPath(path), event);
+                       setLastModifiedRecursive(JcrUtils.parentPath(path), event);
                }
        }
 
@@ -276,4 +312,32 @@ class CmsWorkspaceIndexer implements EventListener {
                return "Indexer for workspace " + workspaceName + " of repository " + cn;
        }
 
+       class IndexingThread extends Thread {
+
+               public IndexingThread() {
+                       super(CmsWorkspaceIndexer.this.toString());
+                       // TODO Auto-generated constructor stub
+               }
+
+               @Override
+               public void run() {
+                       life: while (session != null && session.isLive()) {
+                               try {
+                                       Event nextEvent = toProcess.take();
+                                       processEvent(nextEvent);
+                               } catch (InterruptedException e) {
+                                       // silent
+                                       interrupted();
+                               }
+
+                               if (stopping.get() && toProcess.isEmpty()) {
+                                       break life;
+                               }
+                       }
+                       if (log.isDebugEnabled())
+                               log.debug(CmsWorkspaceIndexer.this.toString() + " has shut down.");
+               }
+
+       }
+
 }
\ No newline at end of file