From 2cec4f83e7e1455be01b5a9771dfdf1cc719fa4a Mon Sep 17 00:00:00 2001 From: Mathieu Baudier Date: Tue, 18 Feb 2020 15:27:25 +0100 Subject: [PATCH] Use single thread for workspace indexing. --- .../internal/kernel/CmsWorkspaceIndexer.java | 86 +++++++++++++++---- 1 file changed, 67 insertions(+), 19 deletions(-) diff --git a/org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsWorkspaceIndexer.java b/org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsWorkspaceIndexer.java index 7782b4291..bb38d6a63 100644 --- a/org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsWorkspaceIndexer.java +++ b/org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsWorkspaceIndexer.java @@ -1,10 +1,8 @@ package org.argeo.cms.internal.kernel; import java.util.GregorianCalendar; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jcr.Node; import javax.jcr.Property; @@ -42,6 +40,10 @@ class CmsWorkspaceIndexer implements EventListener { private Session session; private VersionManager versionManager; + private LinkedBlockingDeque toProcess = new LinkedBlockingDeque<>(); + private IndexingThread indexingThread; + private AtomicBoolean stopping = new AtomicBoolean(false); + public CmsWorkspaceIndexer(RepositoryImpl repositoryImpl, String cn, String workspaceName) throws RepositoryException { this.cn = cn; @@ -56,12 +58,24 @@ class CmsWorkspaceIndexer implements EventListener { session.getWorkspace().getObservationManager().addEventListener(this, Event.NODE_ADDED | Event.PROPERTY_CHANGED, "/", true, null, nodeTypes, true); versionManager = session.getWorkspace().getVersionManager(); + + indexingThread = new IndexingThread(); + indexingThread.start(); } catch (RepositoryException e1) { throw new IllegalStateException(e1); } } public void destroy() { + stopping.set(true); + indexingThread.interrupt(); + // TODO make it configurable + try { + indexingThread.join(10 * 60 * 1000); + } catch (InterruptedException e1) { + log.warn("Indexing thread interrupted. Will log out session."); + } + try { session.getWorkspace().getObservationManager().removeEventListener(this); } catch (RepositoryException e) { @@ -77,7 +91,12 @@ class CmsWorkspaceIndexer implements EventListener { long count = 0; while (events.hasNext()) { Event event = events.nextEvent(); - processEvent(event); + try { + toProcess.put(event); + } catch (InterruptedException e) { + e.printStackTrace(); + } +// processEvent(event); count++; } long duration = System.currentTimeMillis() - begin; @@ -210,20 +229,21 @@ class CmsWorkspaceIndexer implements EventListener { @Override public void onEvent(EventIterator events) { - Runnable toRun = new Runnable() { - - @Override - public void run() { - processEvents(events); - } - }; - Future future = Activator.getInternalExecutorService().submit(toRun); - try { - // make the call synchronous - future.get(60, TimeUnit.SECONDS); - } catch (TimeoutException | ExecutionException | InterruptedException e) { - // silent - } + processEvents(events); +// Runnable toRun = new Runnable() { +// +// @Override +// public void run() { +// processEvents(events); +// } +// }; +// Future future = Activator.getInternalExecutorService().submit(toRun); +// try { +// // make the call synchronous +// future.get(60, TimeUnit.SECONDS); +// } catch (TimeoutException | ExecutionException | InterruptedException e) { +// // silent +// } } static String toEtag(Value v) { @@ -292,4 +312,32 @@ class CmsWorkspaceIndexer implements EventListener { return "Indexer for workspace " + workspaceName + " of repository " + cn; } + class IndexingThread extends Thread { + + public IndexingThread() { + super(CmsWorkspaceIndexer.this.toString()); + // TODO Auto-generated constructor stub + } + + @Override + public void run() { + life: while (session != null && session.isLive()) { + try { + Event nextEvent = toProcess.take(); + processEvent(nextEvent); + } catch (InterruptedException e) { + // silent + interrupted(); + } + + if (stopping.get() && toProcess.isEmpty()) { + break life; + } + } + if (log.isDebugEnabled()) + log.debug(CmsWorkspaceIndexer.this.toString() + " has shut down."); + } + + } + } \ No newline at end of file -- 2.30.2