package org.argeo.cms.internal.kernel;
import java.util.GregorianCalendar;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jcr.Node;
import javax.jcr.Property;
private Session session;
private VersionManager versionManager;
+ private LinkedBlockingDeque<Event> toProcess = new LinkedBlockingDeque<>();
+ private IndexingThread indexingThread;
+ private AtomicBoolean stopping = new AtomicBoolean(false);
+
public CmsWorkspaceIndexer(RepositoryImpl repositoryImpl, String cn, String workspaceName)
throws RepositoryException {
this.cn = cn;
session.getWorkspace().getObservationManager().addEventListener(this,
Event.NODE_ADDED | Event.PROPERTY_CHANGED, "/", true, null, nodeTypes, true);
versionManager = session.getWorkspace().getVersionManager();
+
+ indexingThread = new IndexingThread();
+ indexingThread.start();
} catch (RepositoryException e1) {
throw new IllegalStateException(e1);
}
}
public void destroy() {
+ stopping.set(true);
+ indexingThread.interrupt();
+ // TODO make it configurable
+ try {
+ indexingThread.join(10 * 60 * 1000);
+ } catch (InterruptedException e1) {
+ log.warn("Indexing thread interrupted. Will log out session.");
+ }
+
try {
session.getWorkspace().getObservationManager().removeEventListener(this);
} catch (RepositoryException e) {
long count = 0;
while (events.hasNext()) {
Event event = events.nextEvent();
- processEvent(event);
+ try {
+ toProcess.put(event);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+// processEvent(event);
count++;
}
long duration = System.currentTimeMillis() - begin;
@Override
public void onEvent(EventIterator events) {
- Runnable toRun = new Runnable() {
-
- @Override
- public void run() {
- processEvents(events);
- }
- };
- Future<?> future = Activator.getInternalExecutorService().submit(toRun);
- try {
- // make the call synchronous
- future.get(60, TimeUnit.SECONDS);
- } catch (TimeoutException | ExecutionException | InterruptedException e) {
- // silent
- }
+ processEvents(events);
+// Runnable toRun = new Runnable() {
+//
+// @Override
+// public void run() {
+// processEvents(events);
+// }
+// };
+// Future<?> future = Activator.getInternalExecutorService().submit(toRun);
+// try {
+// // make the call synchronous
+// future.get(60, TimeUnit.SECONDS);
+// } catch (TimeoutException | ExecutionException | InterruptedException e) {
+// // silent
+// }
}
static String toEtag(Value v) {
return "Indexer for workspace " + workspaceName + " of repository " + cn;
}
+ class IndexingThread extends Thread {
+
+ public IndexingThread() {
+ super(CmsWorkspaceIndexer.this.toString());
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void run() {
+ life: while (session != null && session.isLive()) {
+ try {
+ Event nextEvent = toProcess.take();
+ processEvent(nextEvent);
+ } catch (InterruptedException e) {
+ // silent
+ interrupted();
+ }
+
+ if (stopping.get() && toProcess.isEmpty()) {
+ break life;
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug(CmsWorkspaceIndexer.this.toString() + " has shut down.");
+ }
+
+ }
+
}
\ No newline at end of file