From: Mathieu Baudier Date: Mon, 3 Jan 2022 10:43:55 +0000 (+0100) Subject: Start working around components X-Git-Tag: argeo-commons-2.3.5~109 X-Git-Url: https://git.argeo.org/?a=commitdiff_plain;h=11dff0ce7fa612be5557822d8a0128cd61f54f8d;p=lgpl%2Fargeo-commons.git Start working around components --- diff --git a/org.argeo.enterprise/src/org/argeo/osgi/internal/EnterpriseActivator.java b/org.argeo.enterprise/src/org/argeo/osgi/internal/EnterpriseActivator.java index 03054828c..bb495dd12 100644 --- a/org.argeo.enterprise/src/org/argeo/osgi/internal/EnterpriseActivator.java +++ b/org.argeo.enterprise/src/org/argeo/osgi/internal/EnterpriseActivator.java @@ -6,7 +6,7 @@ import org.osgi.framework.BundleContext; /** * Called to gather information about the OSGi runtime. Should not activate * anything else that canonical monitoring services (not creating implicit - * APIs), which is the responsibility of higher levels.. + * APIs), which is the responsibility of higher levels. */ public class EnterpriseActivator implements BundleActivator { diff --git a/org.argeo.enterprise/src/org/argeo/osgi/util/FilterRequirement.java b/org.argeo.enterprise/src/org/argeo/osgi/util/FilterRequirement.java index 7cb586310..31f1d4de6 100644 --- a/org.argeo.enterprise/src/org/argeo/osgi/util/FilterRequirement.java +++ b/org.argeo.enterprise/src/org/argeo/osgi/util/FilterRequirement.java @@ -7,11 +7,10 @@ import org.osgi.resource.Namespace; import org.osgi.resource.Requirement; import org.osgi.resource.Resource; +/** Simplify filtering resources. */ public class FilterRequirement implements Requirement { private String namespace; private String filter; - - public FilterRequirement(String namespace, String filter) { this.namespace = namespace; diff --git a/org.argeo.enterprise/src/org/argeo/osgi/util/OnServiceRegistration.java b/org.argeo.enterprise/src/org/argeo/osgi/util/OnServiceRegistration.java new file mode 100644 index 000000000..5a6760e0f --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/osgi/util/OnServiceRegistration.java @@ -0,0 +1,98 @@ +package org.argeo.osgi.util; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; + +public class OnServiceRegistration implements Future { + private BundleContext ownBundleContext = FrameworkUtil.getBundle(OnServiceRegistration.class).getBundleContext(); + + private ServiceTracker st; + + private R result; + private boolean cancelled = false; + private Throwable exception; + + public OnServiceRegistration(Class clss, Function function) { + this(null, clss, function); + } + + public OnServiceRegistration(BundleContext bundleContext, Class clss, Function function) { + st = new ServiceTracker(bundleContext != null ? bundleContext : ownBundleContext, clss, null) { + + @Override + public T addingService(ServiceReference reference) { + T service = super.addingService(reference); + try { + if (result != null)// we only want the first one + return service; + result = function.apply(service); + return service; + } catch (Exception e) { + exception = e; + return service; + } finally { + close(); + } + } + }; + st.open(bundleContext == null); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (result != null || exception != null || cancelled) + return false; + st.close(); + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return result != null || cancelled; + } + + @Override + public R get() throws InterruptedException, ExecutionException { + st.waitForService(0); + return tryGetResult(); + } + + @Override + public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + st.waitForService(TimeUnit.MILLISECONDS.convert(timeout, unit)); + if (result == null) + throw new TimeoutException("No result after " + timeout + " " + unit); + return tryGetResult(); + } + + protected R tryGetResult() throws ExecutionException, CancellationException { + if (cancelled) + throw new CancellationException(); + if (exception != null) + throw new ExecutionException(exception); + if (result == null)// this should not happen + try { + throw new IllegalStateException("No result available"); + } catch (Exception e) { + exception = e; + throw new ExecutionException(e); + } + return result; + } + +} diff --git a/org.argeo.enterprise/src/org/argeo/osgi/util/OsgiRegister.java b/org.argeo.enterprise/src/org/argeo/osgi/util/OsgiRegister.java new file mode 100644 index 000000000..7132b7c3f --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/osgi/util/OsgiRegister.java @@ -0,0 +1,60 @@ +package org.argeo.osgi.util; + +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; + +import org.argeo.util.register.Register; +import org.argeo.util.register.Singleton; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; + +public class OsgiRegister implements Register { + private final BundleContext bundleContext; + private Executor executor; + + private CompletableFuture shutdownStarting = new CompletableFuture(); + + public OsgiRegister(BundleContext bundleContext) { + this.bundleContext = bundleContext; + // TODO experiment with dedicated executors + this.executor = ForkJoinPool.commonPool(); + } + + @Override + public Singleton set(T obj, Class clss, Map attributes, Class... classes) { + CompletableFuture> srf = new CompletableFuture>(); + CompletableFuture postRegistration = CompletableFuture.supplyAsync(() -> { + List lst = new ArrayList<>(); + lst.add(clss.getName()); + for (Class c : classes) { + lst.add(c.getName()); + } + ServiceRegistration sr = bundleContext.registerService(lst.toArray(new String[lst.size()]), obj, + new Hashtable(attributes)); + srf.complete(sr); + return obj; + }, executor); + Singleton singleton = new Singleton(clss, postRegistration); + + shutdownStarting. // + thenCompose(singleton::prepareUnregistration). // + thenRunAsync(() -> { + try { + srf.get().unregister(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }, executor); + return singleton; + } + + public void shutdown() { + shutdownStarting.complete(null); + } +} diff --git a/org.argeo.enterprise/src/org/argeo/util/register/Component.java b/org.argeo.enterprise/src/org/argeo/util/register/Component.java new file mode 100644 index 000000000..e3f3c2f72 --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/util/register/Component.java @@ -0,0 +1,265 @@ +package org.argeo.util.register; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Predicate; + +public class Component { + private final static AtomicBoolean started = new AtomicBoolean(false); + private final static IdentityHashMap components = new IdentityHashMap<>(); + + private static synchronized void registerComponent(Component component) { + if (started.get()) // TODO make it rellay dynamic + throw new IllegalStateException("Already activated"); + if (components.containsKey(component.instance)) + throw new IllegalArgumentException("Already registered as component"); + components.put(component.instance, component); + } + + static synchronized Component get(Object instance) { + if (!components.containsKey(instance)) + throw new IllegalArgumentException("Not registered as component"); + return components.get(instance); + } + + public synchronized static void activate() { + if (started.get()) + throw new IllegalStateException("Already activated"); + for (Component component : components.values()) { + component.activationStarted.complete(null); + } + started.set(true); + } + + public synchronized static void deactivate() { + if (!started.get()) + throw new IllegalStateException("Not activated"); + for (Component component : components.values()) { + component.deactivationStarted.complete(null); + } + started.set(false); + } + + private final Object instance; + + private Runnable init; + private Runnable close; + + private final Map, PublishedType> types; + private final Set> dependencies; + + private CompletableFuture activationStarted = new CompletableFuture(); + private CompletableFuture activated = new CompletableFuture(); + + private CompletableFuture deactivationStarted = new CompletableFuture(); + private CompletableFuture deactivated = new CompletableFuture(); + + private Set> dependants = new HashSet<>(); + + Component(Object instance, Runnable init, Runnable close, Set> dependencies, Set> classes) { + assert instance != null; + assert init != null; + assert close != null; + assert dependencies != null; + assert classes != null; + + this.instance = instance; + this.init = init; + this.close = close; + + // types + Map, PublishedType> types = new HashMap<>(classes.size()); + for (Class clss : classes) { + if (!clss.isAssignableFrom(instance.getClass())) + throw new IllegalArgumentException( + "Type " + clss.getName() + " is not compatible with " + instance.getClass().getName()); + types.put(clss, new PublishedType<>(clss)); + } + this.types = Collections.unmodifiableMap(types); + + // dependencies + this.dependencies = Collections.unmodifiableSet(new HashSet<>(dependencies)); + for (Dependency dependency : this.dependencies) { + dependency.setDependantComponent(this); + } + + // future activation + activated = activationStarted // + .thenCompose(this::dependenciesActivated) // + .thenRun(this.init); + + // future deactivation + deactivated = deactivationStarted // + .thenCompose(this::dependantsDeactivated) // + .thenRun(this.close); + + registerComponent(this); + } + + CompletableFuture dependenciesActivated(Void v) { + Set> constraints = new HashSet<>(this.dependencies.size()); + for (Dependency dependency : this.dependencies) { + CompletableFuture dependencyActivated = dependency.getPublisher().activated // + .thenCompose(dependency::set); + constraints.add(dependencyActivated); + } + return CompletableFuture.allOf(constraints.toArray(new CompletableFuture[constraints.size()])); + } + + CompletableFuture dependantsDeactivated(Void v) { + Set> constraints = new HashSet<>(this.dependants.size()); + for (Dependency dependant : this.dependants) { + CompletableFuture dependantDeactivated = dependant.getDependantComponent().deactivated // + .thenCompose(dependant::unset); + constraints.add(dependantDeactivated); + } + CompletableFuture dependantsDeactivated = CompletableFuture + .allOf(constraints.toArray(new CompletableFuture[constraints.size()])); + return dependantsDeactivated; + + } + + void addDependant(Dependency dependant) { + dependants.add(dependant); + } + + public PublishedType getType(Class clss) { + if (!types.containsKey(clss)) + throw new IllegalArgumentException(clss.getName() + " is not a type published by this component"); + return (PublishedType) types.get(clss); + } + + public class PublishedType { + private Class clss; + + private CompletableFuture value; + + public PublishedType(Class clss) { + this.clss = clss; + + value = CompletableFuture.completedFuture((T) Component.this.instance); + } + + Component getPublisher() { + return Component.this; + } + + Class getType() { + return clss; + } + } + + public static class Builder { + private final I instance; + + private Runnable init; + private Runnable close; + + private Set> dependencies = new HashSet<>(); + private Set> types = new HashSet<>(); + + public Builder(I instance) { + this.instance = instance; + } + + public Component build() { + if (types.isEmpty()) { + types.add(instance.getClass()); + } + + if (init == null) + init = () -> { + }; + if (close == null) + close = () -> { + }; + + Component component = new Component(instance, init, close, dependencies, types); + for (Dependency dependency : dependencies) { + dependency.type.getPublisher().addDependant(dependency); + } + return component; + } + + public Builder addType(Class... classes) { + types.addAll(Arrays.asList(classes)); + return this; + } + + public Builder addInit(Runnable init) { + if (this.init != null) + throw new IllegalArgumentException("init method is already set"); + this.init = init; + return this; + } + + public Builder addClose(Runnable close) { + if (this.close != null) + throw new IllegalArgumentException("close method is already set"); + this.close = close; + return this; + } + + public Builder addDependency(PublishedType type, Predicate filter, Consumer set, + Consumer unset) { + dependencies.add(new Dependency(type, filter, set, unset)); + return this; + } + + public I get() { + return instance; + } + + } + + static class Dependency { + private PublishedType type; + private Predicate filter; + private Consumer set; + private Consumer unset; + + // live + Component dependantComponent; + CompletableFuture setStage; + CompletableFuture unsetStage; + + public Dependency(PublishedType types, Predicate filter, Consumer set, Consumer unset) { + super(); + this.type = types; + this.filter = filter; + this.set = set; + this.unset = unset != null ? unset : (v) -> set.accept(null); + } + + // live + void setDependantComponent(Component component) { + this.dependantComponent = component; + } + + Component getPublisher() { + return type.getPublisher(); + } + + Component getDependantComponent() { + return dependantComponent; + } + + CompletableFuture set(Void v) { + return type.value.thenAccept(set); + } + + CompletableFuture unset(Void v) { + return type.value.thenAccept(unset); + } + + } +} + diff --git a/org.argeo.enterprise/src/org/argeo/util/register/Register.java b/org.argeo.enterprise/src/org/argeo/util/register/Register.java new file mode 100644 index 000000000..17062809e --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/util/register/Register.java @@ -0,0 +1,10 @@ +package org.argeo.util.register; + +import java.util.Map; + +/** A dynamic register of objects. */ +public interface Register { + Singleton set(T obj, Class clss, Map attributes, Class... classes); + + void shutdown(); +} diff --git a/org.argeo.enterprise/src/org/argeo/util/register/Singleton.java b/org.argeo.enterprise/src/org/argeo/util/register/Singleton.java new file mode 100644 index 000000000..5d70e9aeb --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/util/register/Singleton.java @@ -0,0 +1,41 @@ +package org.argeo.util.register; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +public class Singleton { + private final Class clss; + private final CompletableFuture registrationStage; + private final List> unregistrationHooks = new ArrayList<>(); + + public Singleton(Class clss, CompletableFuture registrationStage) { + this.clss = clss; + this.registrationStage = registrationStage; + } + + CompletionStage getRegistrationStage() { + return registrationStage.minimalCompletionStage(); + } + + public void addUnregistrationHook(Consumer todo) { + unregistrationHooks.add(todo); + } + + public Future getValue() { + return registrationStage.copy(); + } + + public CompletableFuture prepareUnregistration(Void v) { + List> lst = new ArrayList<>(); + for (Consumer hook : unregistrationHooks) { + lst.add(registrationStage.thenAcceptAsync(hook)); + } + CompletableFuture prepareUnregistrationStage = CompletableFuture + .allOf(lst.toArray(new CompletableFuture[lst.size()])); + return prepareUnregistrationStage; + } +}