From: Mathieu Baudier Date: Fri, 6 Oct 2023 05:23:32 +0000 (+0200) Subject: Introduce AsyncPipedOutputStream X-Git-Tag: v2.3.19~4 X-Git-Url: https://git.argeo.org/?a=commitdiff_plain;h=dbbdf57196d2c064dd37ea473059b2fc8d711cdf;p=lgpl%2Fargeo-commons.git Introduce AsyncPipedOutputStream --- diff --git a/org.argeo.cms/src/org/argeo/cms/util/AsyncPipedOutputStream.java b/org.argeo.cms/src/org/argeo/cms/util/AsyncPipedOutputStream.java new file mode 100644 index 000000000..636f5e33a --- /dev/null +++ b/org.argeo.cms/src/org/argeo/cms/util/AsyncPipedOutputStream.java @@ -0,0 +1,71 @@ +package org.argeo.cms.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.io.UncheckedIOException; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * An output stream whose {@link #close()} method will wait for read actions to + * be completed. It is meant to be used transparently as an + * {@link OutputStream}, fulfilling the expectation that everything has been + * done when the {@link #close()} method has returned. + */ +public class AsyncPipedOutputStream extends PipedOutputStream { +// private final static Logger logger = System.getLogger(AsyncPipedOutputStream.class.getName()); + + private CompletableFuture readingDone; + + private long timeout = 60 * 1000; + + /** + * Provides the actions which will read (and close) the related piped input + * stream. Reading starts immediately asynchronously, but the provided + * {@link InputStream} will block until data starts to be written to this output + * stream. + */ + public void asyncRead(Consumer readActions) { + try { + PipedInputStream in = new PipedInputStream(this); + readingDone = CompletableFuture.runAsync(() -> { + readActions.accept(in); + }); + } catch (IOException e) { + throw new UncheckedIOException("Cannot create piped input stream", e); + } + } + + /** + * Closes this output stream immediately but then wait for the reading of the + * related input stream to be completed. + */ + @Override + public void close() throws IOException { + Objects.requireNonNull(readingDone, "Async read must have started"); + super.flush(); + super.close(); + readingDone.orTimeout(timeout, TimeUnit.MILLISECONDS).join(); +// logger.log(Logger.Level.DEBUG, "OUT waiting " + timeout); +// try { +// readingDone.get(timeout, TimeUnit.MILLISECONDS); +// } catch (InterruptedException | ExecutionException | TimeoutException e) { +// logger.log(Logger.Level.ERROR, "Reading was not completed", e); +// } +// logger.log(Logger.Level.DEBUG, "OUT closed"); + } + + /** + * Sets the timeout in milliseconds when waiting for reading to be completed + * before returning in the {@link #close()} method. + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + +}