Start working around components
authorMathieu Baudier <mbaudier@argeo.org>
Mon, 3 Jan 2022 10:43:55 +0000 (11:43 +0100)
committerMathieu Baudier <mbaudier@argeo.org>
Mon, 3 Jan 2022 10:43:55 +0000 (11:43 +0100)
org.argeo.enterprise/src/org/argeo/osgi/internal/EnterpriseActivator.java
org.argeo.enterprise/src/org/argeo/osgi/util/FilterRequirement.java
org.argeo.enterprise/src/org/argeo/osgi/util/OnServiceRegistration.java [new file with mode: 0644]
org.argeo.enterprise/src/org/argeo/osgi/util/OsgiRegister.java [new file with mode: 0644]
org.argeo.enterprise/src/org/argeo/util/register/Component.java [new file with mode: 0644]
org.argeo.enterprise/src/org/argeo/util/register/Register.java [new file with mode: 0644]
org.argeo.enterprise/src/org/argeo/util/register/Singleton.java [new file with mode: 0644]

index 03054828c9820046560807a1007f93e56d6fd14b..bb495dd12df7cc0353218e4a46301b63c573e60c 100644 (file)
@@ -6,7 +6,7 @@ import org.osgi.framework.BundleContext;
 /**
  * Called to gather information about the OSGi runtime. Should not activate
  * anything else that canonical monitoring services (not creating implicit
- * APIs), which is the responsibility of higher levels..
+ * APIs), which is the responsibility of higher levels.
  */
 public class EnterpriseActivator implements BundleActivator {
 
index 7cb586310e220ec256d565657ce1fe7fefce7d20..31f1d4de6bff1e580926801bedd2d33deac4f335 100644 (file)
@@ -7,11 +7,10 @@ import org.osgi.resource.Namespace;
 import org.osgi.resource.Requirement;
 import org.osgi.resource.Resource;
 
+/** Simplify filtering resources. */
 public class FilterRequirement implements Requirement {
        private String namespace;
        private String filter;
-       
-       
 
        public FilterRequirement(String namespace, String filter) {
                this.namespace = namespace;
diff --git a/org.argeo.enterprise/src/org/argeo/osgi/util/OnServiceRegistration.java b/org.argeo.enterprise/src/org/argeo/osgi/util/OnServiceRegistration.java
new file mode 100644 (file)
index 0000000..5a6760e
--- /dev/null
@@ -0,0 +1,98 @@
+package org.argeo.osgi.util;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+
+public class OnServiceRegistration<R> implements Future<R> {
+       private BundleContext ownBundleContext = FrameworkUtil.getBundle(OnServiceRegistration.class).getBundleContext();
+
+       private ServiceTracker<?, ?> st;
+
+       private R result;
+       private boolean cancelled = false;
+       private Throwable exception;
+
+       public <T> OnServiceRegistration(Class<T> clss, Function<T, R> function) {
+               this(null, clss, function);
+       }
+
+       public <T> OnServiceRegistration(BundleContext bundleContext, Class<T> clss, Function<T, R> function) {
+               st = new ServiceTracker<T, T>(bundleContext != null ? bundleContext : ownBundleContext, clss, null) {
+
+                       @Override
+                       public T addingService(ServiceReference<T> reference) {
+                               T service = super.addingService(reference);
+                               try {
+                                       if (result != null)// we only want the first one
+                                               return service;
+                                       result = function.apply(service);
+                                       return service;
+                               } catch (Exception e) {
+                                       exception = e;
+                                       return service;
+                               } finally {
+                                       close();
+                               }
+                       }
+               };
+               st.open(bundleContext == null);
+       }
+
+       @Override
+       public boolean cancel(boolean mayInterruptIfRunning) {
+               if (result != null || exception != null || cancelled)
+                       return false;
+               st.close();
+               cancelled = true;
+               return true;
+       }
+
+       @Override
+       public boolean isCancelled() {
+               return cancelled;
+       }
+
+       @Override
+       public boolean isDone() {
+               return result != null || cancelled;
+       }
+
+       @Override
+       public R get() throws InterruptedException, ExecutionException {
+               st.waitForService(0);
+               return tryGetResult();
+       }
+
+       @Override
+       public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+               st.waitForService(TimeUnit.MILLISECONDS.convert(timeout, unit));
+               if (result == null)
+                       throw new TimeoutException("No result after " + timeout + " " + unit);
+               return tryGetResult();
+       }
+
+       protected R tryGetResult() throws ExecutionException, CancellationException {
+               if (cancelled)
+                       throw new CancellationException();
+               if (exception != null)
+                       throw new ExecutionException(exception);
+               if (result == null)// this should not happen
+                       try {
+                               throw new IllegalStateException("No result available");
+                       } catch (Exception e) {
+                               exception = e;
+                               throw new ExecutionException(e);
+                       }
+               return result;
+       }
+
+}
diff --git a/org.argeo.enterprise/src/org/argeo/osgi/util/OsgiRegister.java b/org.argeo.enterprise/src/org/argeo/osgi/util/OsgiRegister.java
new file mode 100644 (file)
index 0000000..7132b7c
--- /dev/null
@@ -0,0 +1,60 @@
+package org.argeo.osgi.util;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+
+import org.argeo.util.register.Register;
+import org.argeo.util.register.Singleton;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+public class OsgiRegister implements Register {
+       private final BundleContext bundleContext;
+       private Executor executor;
+
+       private CompletableFuture<Void> shutdownStarting = new CompletableFuture<Void>();
+
+       public OsgiRegister(BundleContext bundleContext) {
+               this.bundleContext = bundleContext;
+               // TODO experiment with dedicated executors
+               this.executor = ForkJoinPool.commonPool();
+       }
+
+       @Override
+       public <T> Singleton<T> set(T obj, Class<T> clss, Map<String, Object> attributes, Class<?>... classes) {
+               CompletableFuture<ServiceRegistration<?>> srf = new CompletableFuture<ServiceRegistration<?>>();
+               CompletableFuture<T> postRegistration = CompletableFuture.supplyAsync(() -> {
+                       List<String> lst = new ArrayList<>();
+                       lst.add(clss.getName());
+                       for (Class<?> c : classes) {
+                               lst.add(c.getName());
+                       }
+                       ServiceRegistration<?> sr = bundleContext.registerService(lst.toArray(new String[lst.size()]), obj,
+                                       new Hashtable<String, Object>(attributes));
+                       srf.complete(sr);
+                       return obj;
+               }, executor);
+               Singleton<T> singleton = new Singleton<T>(clss, postRegistration);
+
+               shutdownStarting. //
+                               thenCompose(singleton::prepareUnregistration). //
+                               thenRunAsync(() -> {
+                                       try {
+                                               srf.get().unregister();
+                                       } catch (InterruptedException | ExecutionException e) {
+                                               e.printStackTrace();
+                                       }
+                               }, executor);
+               return singleton;
+       }
+
+       public void shutdown() {
+               shutdownStarting.complete(null);
+       }
+}
diff --git a/org.argeo.enterprise/src/org/argeo/util/register/Component.java b/org.argeo.enterprise/src/org/argeo/util/register/Component.java
new file mode 100644 (file)
index 0000000..e3f3c2f
--- /dev/null
@@ -0,0 +1,265 @@
+package org.argeo.util.register;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+public class Component {
+       private final static AtomicBoolean started = new AtomicBoolean(false);
+       private final static IdentityHashMap<Object, Component> components = new IdentityHashMap<>();
+
+       private static synchronized void registerComponent(Component component) {
+               if (started.get()) // TODO make it rellay dynamic
+                       throw new IllegalStateException("Already activated");
+               if (components.containsKey(component.instance))
+                       throw new IllegalArgumentException("Already registered as component");
+               components.put(component.instance, component);
+       }
+
+       static synchronized Component get(Object instance) {
+               if (!components.containsKey(instance))
+                       throw new IllegalArgumentException("Not registered as component");
+               return components.get(instance);
+       }
+
+       public synchronized static void activate() {
+               if (started.get())
+                       throw new IllegalStateException("Already activated");
+               for (Component component : components.values()) {
+                       component.activationStarted.complete(null);
+               }
+               started.set(true);
+       }
+
+       public synchronized static void deactivate() {
+               if (!started.get())
+                       throw new IllegalStateException("Not activated");
+               for (Component component : components.values()) {
+                       component.deactivationStarted.complete(null);
+               }
+               started.set(false);
+       }
+
+       private final Object instance;
+
+       private Runnable init;
+       private Runnable close;
+
+       private final Map<Class<?>, PublishedType<?>> types;
+       private final Set<Dependency<?>> dependencies;
+
+       private CompletableFuture<Void> activationStarted = new CompletableFuture<Void>();
+       private CompletableFuture<Void> activated = new CompletableFuture<Void>();
+
+       private CompletableFuture<Void> deactivationStarted = new CompletableFuture<Void>();
+       private CompletableFuture<Void> deactivated = new CompletableFuture<Void>();
+
+       private Set<Dependency<?>> dependants = new HashSet<>();
+
+       Component(Object instance, Runnable init, Runnable close, Set<Dependency<?>> dependencies, Set<Class<?>> classes) {
+               assert instance != null;
+               assert init != null;
+               assert close != null;
+               assert dependencies != null;
+               assert classes != null;
+
+               this.instance = instance;
+               this.init = init;
+               this.close = close;
+
+               // types
+               Map<Class<?>, PublishedType<?>> types = new HashMap<>(classes.size());
+               for (Class<?> clss : classes) {
+                       if (!clss.isAssignableFrom(instance.getClass()))
+                               throw new IllegalArgumentException(
+                                               "Type " + clss.getName() + " is not compatible with " + instance.getClass().getName());
+                       types.put(clss, new PublishedType<>(clss));
+               }
+               this.types = Collections.unmodifiableMap(types);
+
+               // dependencies
+               this.dependencies = Collections.unmodifiableSet(new HashSet<>(dependencies));
+               for (Dependency<?> dependency : this.dependencies) {
+                       dependency.setDependantComponent(this);
+               }
+
+               // future activation
+               activated = activationStarted //
+                               .thenCompose(this::dependenciesActivated) //
+                               .thenRun(this.init);
+
+               // future deactivation
+               deactivated = deactivationStarted //
+                               .thenCompose(this::dependantsDeactivated) //
+                               .thenRun(this.close);
+
+               registerComponent(this);
+       }
+
+       CompletableFuture<Void> dependenciesActivated(Void v) {
+               Set<CompletableFuture<?>> constraints = new HashSet<>(this.dependencies.size());
+               for (Dependency<?> dependency : this.dependencies) {
+                       CompletableFuture<Void> dependencyActivated = dependency.getPublisher().activated //
+                                       .thenCompose(dependency::set);
+                       constraints.add(dependencyActivated);
+               }
+               return CompletableFuture.allOf(constraints.toArray(new CompletableFuture[constraints.size()]));
+       }
+
+       CompletableFuture<Void> dependantsDeactivated(Void v) {
+               Set<CompletableFuture<?>> constraints = new HashSet<>(this.dependants.size());
+               for (Dependency<?> dependant : this.dependants) {
+                       CompletableFuture<Void> dependantDeactivated = dependant.getDependantComponent().deactivated //
+                                       .thenCompose(dependant::unset);
+                       constraints.add(dependantDeactivated);
+               }
+               CompletableFuture<Void> dependantsDeactivated = CompletableFuture
+                               .allOf(constraints.toArray(new CompletableFuture[constraints.size()]));
+               return dependantsDeactivated;
+
+       }
+
+       void addDependant(Dependency<?> dependant) {
+               dependants.add(dependant);
+       }
+
+       public <T> PublishedType<T> getType(Class<T> clss) {
+               if (!types.containsKey(clss))
+                       throw new IllegalArgumentException(clss.getName() + " is not a type published by this component");
+               return (PublishedType<T>) types.get(clss);
+       }
+
+       public class PublishedType<T> {
+               private Class<T> clss;
+
+               private CompletableFuture<T> value;
+
+               public PublishedType(Class<T> clss) {
+                       this.clss = clss;
+
+                       value = CompletableFuture.completedFuture((T) Component.this.instance);
+               }
+
+               Component getPublisher() {
+                       return Component.this;
+               }
+
+               Class<T> getType() {
+                       return clss;
+               }
+       }
+
+       public static class Builder<I> {
+               private final I instance;
+
+               private Runnable init;
+               private Runnable close;
+
+               private Set<Dependency<?>> dependencies = new HashSet<>();
+               private Set<Class<?>> types = new HashSet<>();
+
+               public Builder(I instance) {
+                       this.instance = instance;
+               }
+
+               public Component build() {
+                       if (types.isEmpty()) {
+                               types.add(instance.getClass());
+                       }
+
+                       if (init == null)
+                               init = () -> {
+                               };
+                       if (close == null)
+                               close = () -> {
+                               };
+
+                       Component component = new Component(instance, init, close, dependencies, types);
+                       for (Dependency<?> dependency : dependencies) {
+                               dependency.type.getPublisher().addDependant(dependency);
+                       }
+                       return component;
+               }
+
+               public Builder<I> addType(Class<?>... classes) {
+                       types.addAll(Arrays.asList(classes));
+                       return this;
+               }
+
+               public Builder<I> addInit(Runnable init) {
+                       if (this.init != null)
+                               throw new IllegalArgumentException("init method is already set");
+                       this.init = init;
+                       return this;
+               }
+
+               public Builder<I> addClose(Runnable close) {
+                       if (this.close != null)
+                               throw new IllegalArgumentException("close method is already set");
+                       this.close = close;
+                       return this;
+               }
+
+               public <D> Builder<I> addDependency(PublishedType<D> type, Predicate<?> filter, Consumer<D> set,
+                               Consumer<D> unset) {
+                       dependencies.add(new Dependency<D>(type, filter, set, unset));
+                       return this;
+               }
+
+               public I get() {
+                       return instance;
+               }
+
+       }
+
+       static class Dependency<D> {
+               private PublishedType<D> type;
+               private Predicate<?> filter;
+               private Consumer<D> set;
+               private Consumer<D> unset;
+
+               // live
+               Component dependantComponent;
+               CompletableFuture<Void> setStage;
+               CompletableFuture<Void> unsetStage;
+
+               public Dependency(PublishedType<D> types, Predicate<?> filter, Consumer<D> set, Consumer<D> unset) {
+                       super();
+                       this.type = types;
+                       this.filter = filter;
+                       this.set = set;
+                       this.unset = unset != null ? unset : (v) -> set.accept(null);
+               }
+
+               // live
+               void setDependantComponent(Component component) {
+                       this.dependantComponent = component;
+               }
+
+               Component getPublisher() {
+                       return type.getPublisher();
+               }
+
+               Component getDependantComponent() {
+                       return dependantComponent;
+               }
+
+               CompletableFuture<Void> set(Void v) {
+                       return type.value.thenAccept(set);
+               }
+
+               CompletableFuture<Void> unset(Void v) {
+                       return type.value.thenAccept(unset);
+               }
+
+       }
+}
+
diff --git a/org.argeo.enterprise/src/org/argeo/util/register/Register.java b/org.argeo.enterprise/src/org/argeo/util/register/Register.java
new file mode 100644 (file)
index 0000000..1706280
--- /dev/null
@@ -0,0 +1,10 @@
+package org.argeo.util.register;
+
+import java.util.Map;
+
+/** A dynamic register of objects. */
+public interface Register {
+       <T> Singleton<T> set(T obj, Class<T> clss, Map<String, Object> attributes, Class<?>... classes);
+
+       void shutdown();
+}
diff --git a/org.argeo.enterprise/src/org/argeo/util/register/Singleton.java b/org.argeo.enterprise/src/org/argeo/util/register/Singleton.java
new file mode 100644 (file)
index 0000000..5d70e9a
--- /dev/null
@@ -0,0 +1,41 @@
+package org.argeo.util.register;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+public class Singleton<T> {
+       private final Class<T> clss;
+       private final CompletableFuture<T> registrationStage;
+       private final List<Consumer<T>> unregistrationHooks = new ArrayList<>();
+
+       public Singleton(Class<T> clss, CompletableFuture<T> registrationStage) {
+               this.clss = clss;
+               this.registrationStage = registrationStage;
+       }
+
+       CompletionStage<T> getRegistrationStage() {
+               return registrationStage.minimalCompletionStage();
+       }
+
+       public void addUnregistrationHook(Consumer<T> todo) {
+               unregistrationHooks.add(todo);
+       }
+
+       public Future<T> getValue() {
+               return registrationStage.copy();
+       }
+
+       public CompletableFuture<Void> prepareUnregistration(Void v) {
+               List<CompletableFuture<Void>> lst = new ArrayList<>();
+               for (Consumer<T> hook : unregistrationHooks) {
+                       lst.add(registrationStage.thenAcceptAsync(hook));
+               }
+               CompletableFuture<Void> prepareUnregistrationStage = CompletableFuture
+                               .allOf(lst.toArray(new CompletableFuture<?>[lst.size()]));
+               return prepareUnregistrationStage;
+       }
+}