Introduce ServiceChannel.
authorMathieu Baudier <mbaudier@argeo.org>
Tue, 21 Jan 2020 08:59:43 +0000 (09:59 +0100)
committerMathieu Baudier <mbaudier@argeo.org>
Tue, 21 Jan 2020 08:59:43 +0000 (09:59 +0100)
org.argeo.util/src/org/argeo/util/ServiceChannel.java [new file with mode: 0644]

diff --git a/org.argeo.util/src/org/argeo/util/ServiceChannel.java b/org.argeo.util/src/org/argeo/util/ServiceChannel.java
new file mode 100644 (file)
index 0000000..7997384
--- /dev/null
@@ -0,0 +1,78 @@
+package org.argeo.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousByteChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/** An {@link AsynchronousByteChannel} based on an {@link ExecutorService}. */
+public class ServiceChannel implements AsynchronousByteChannel {
+       private final ReadableByteChannel in;
+       private final WritableByteChannel out;
+
+       private boolean open = true;
+
+       private ExecutorService executor;
+
+       public ServiceChannel(ReadableByteChannel in, WritableByteChannel out, ExecutorService executor) {
+               this.in = in;
+               this.out = out;
+               this.executor = executor;
+       }
+
+       @Override
+       public Future<Integer> read(ByteBuffer dst) {
+               return executor.submit(() -> in.read(dst));
+       }
+
+       @Override
+       public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
+               try {
+                       Future<Integer> res = read(dst);
+                       handler.completed(res.get(), attachment);
+               } catch (Exception e) {
+                       handler.failed(e, attachment);
+               }
+       }
+
+       @Override
+       public Future<Integer> write(ByteBuffer src) {
+               return executor.submit(() -> out.write(src));
+       }
+
+       @Override
+       public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
+               try {
+                       Future<Integer> res = write(src);
+                       handler.completed(res.get(), attachment);
+               } catch (Exception e) {
+                       handler.failed(e, attachment);
+               }
+       }
+
+       @Override
+       public synchronized void close() throws IOException {
+               try {
+                       in.close();
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+               try {
+                       out.close();
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+               open = false;
+               notifyAll();
+       }
+
+       @Override
+       public synchronized boolean isOpen() {
+               return open;
+       }
+
+}