Introduce AsyncPipedOutputStream
authorMathieu Baudier <mbaudier@argeo.org>
Fri, 6 Oct 2023 05:23:32 +0000 (07:23 +0200)
committerMathieu Baudier <mbaudier@argeo.org>
Fri, 6 Oct 2023 05:23:32 +0000 (07:23 +0200)
org.argeo.cms/src/org/argeo/cms/util/AsyncPipedOutputStream.java [new file with mode: 0644]

diff --git a/org.argeo.cms/src/org/argeo/cms/util/AsyncPipedOutputStream.java b/org.argeo.cms/src/org/argeo/cms/util/AsyncPipedOutputStream.java
new file mode 100644 (file)
index 0000000..636f5e3
--- /dev/null
@@ -0,0 +1,71 @@
+package org.argeo.cms.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.UncheckedIOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * An output stream whose {@link #close()} method will wait for read actions to
+ * be completed. It is meant to be used transparently as an
+ * {@link OutputStream}, fulfilling the expectation that everything has been
+ * done when the {@link #close()} method has returned.
+ */
+public class AsyncPipedOutputStream extends PipedOutputStream {
+//     private final static Logger logger = System.getLogger(AsyncPipedOutputStream.class.getName());
+
+       private CompletableFuture<Void> readingDone;
+
+       private long timeout = 60 * 1000;
+
+       /**
+        * Provides the actions which will read (and close) the related piped input
+        * stream. Reading starts immediately asynchronously, but the provided
+        * {@link InputStream} will block until data starts to be written to this output
+        * stream.
+        */
+       public void asyncRead(Consumer<InputStream> readActions) {
+               try {
+                       PipedInputStream in = new PipedInputStream(this);
+                       readingDone = CompletableFuture.runAsync(() -> {
+                               readActions.accept(in);
+                       });
+               } catch (IOException e) {
+                       throw new UncheckedIOException("Cannot create piped input stream", e);
+               }
+       }
+
+       /**
+        * Closes this output stream immediately but then wait for the reading of the
+        * related input stream to be completed.
+        */
+       @Override
+       public void close() throws IOException {
+               Objects.requireNonNull(readingDone, "Async read must have started");
+               super.flush();
+               super.close();
+               readingDone.orTimeout(timeout, TimeUnit.MILLISECONDS).join();
+//             logger.log(Logger.Level.DEBUG, "OUT waiting " + timeout);
+//             try {
+//                     readingDone.get(timeout, TimeUnit.MILLISECONDS);
+//             } catch (InterruptedException | ExecutionException | TimeoutException e) {
+//                     logger.log(Logger.Level.ERROR, "Reading was not completed", e);
+//             }
+//             logger.log(Logger.Level.DEBUG, "OUT closed");
+       }
+
+       /**
+        * Sets the timeout in milliseconds when waiting for reading to be completed
+        * before returning in the {@link #close()} method.
+        */
+       public void setTimeout(long timeout) {
+               this.timeout = timeout;
+       }
+
+}