Skip to content

Commit

Permalink
fix #4154: adding a callback for stream consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed May 10, 2023
1 parent f1185a0 commit 73b9367
Show file tree
Hide file tree
Showing 22 changed files with 228 additions and 372 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client;

import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletionStage;

public interface StreamConsumer {

public static StreamConsumer newStreamConsumer(OutputStream os) {
checkForPiped(os);
return newBlockingStreamConsumer(Channels.newChannel(os));
}

public static StreamConsumer newBlockingStreamConsumer(WritableByteChannel channel) {
return buffer -> {
int remaining = buffer.remaining();
if (channel.write(buffer) != remaining) {
throw new KubernetesClientException("Unsucessful blocking write");
}
return null;
};
}

public static void checkForPiped(Object object) {
if (object instanceof PipedOutputStream || object instanceof PipedInputStream) {
throw new KubernetesClientException("Piped streams should not be used");
}
}

/**
* A callback for consuming a stream as a series of {@link ByteBuffer}s
*
* @param buffer
* @return a {@link CompletionStage} that is completed when the buffer has been fully consumed,
* or null if it was already consumed
* @throws Exception
*/
CompletionStage<?> consume(ByteBuffer buffer) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.StreamConsumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedOutputStream;
Expand Down Expand Up @@ -70,7 +72,11 @@ public interface Loggable {
* @param out {@link OutputStream} for storing logs
* @return returns a Closeable interface for log watch
*/
LogWatch watchLog(OutputStream out);
default LogWatch watchLog(OutputStream out) {
return watchLog(StreamConsumer.newStreamConsumer(out), true);
}

LogWatch watchLog(StreamConsumer consumer, boolean blocking);

/**
* While waiting for Pod logs, how long shall we wait until a Pod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.StreamConsumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedOutputStream;
Expand All @@ -27,7 +29,11 @@ public interface TtyExecErrorable extends
* <p>
* In particular do no use a {@link PipedOutputStream} - use {@link #redirectingError()} instead
*/
TtyExecErrorChannelable writingError(OutputStream in);
default TtyExecErrorChannelable writingError(OutputStream in) {
return writingError(StreamConsumer.newStreamConsumer(in), true);
}

TtyExecErrorChannelable writingError(StreamConsumer consumer, boolean blocking);

/**
* If the {@link ExecWatch} should terminate when a stdErr message is received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.StreamConsumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedOutputStream;
Expand All @@ -27,7 +29,11 @@ public interface TtyExecOutputErrorable extends
* <p>
* In particular do no use a {@link PipedOutputStream} - use {@link #redirectingOutput()} instead
*/
TtyExecErrorable writingOutput(OutputStream in);
default TtyExecErrorable writingOutput(OutputStream in) {
return writingOutput(StreamConsumer.newStreamConsumer(in), true);
}

TtyExecErrorable writingOutput(StreamConsumer consumer, boolean blocking);

/**
* Will provide an {@link InputStream} via {@link ExecWatch#getOutput()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ public static ByteBuffer copy(ByteBuffer buffer) {

/**
* Very rudimentary method to check if the provided ByteBuffer contains text.
*
*
* @return true if the buffer contains text, false otherwise.
*/
public static boolean isPlainText(ByteBuffer originalBuffer) {
if (originalBuffer == null) {
return false;
}
final ByteBuffer buffer = copy(originalBuffer);
final ByteBuffer buffer = originalBuffer.asReadOnlyBuffer();
final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
try {
decoder.decode(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ default void onMessage(WebSocket webSocket, String text) {
/**
* Called once the full binary message has been built. {@link WebSocket#request()} must
* be called to receive more messages.
*
* @param bytes which will not further used nor modified by the {@link HttpClient}
*/
default void onMessage(WebSocket webSocket, ByteBuffer bytes) {
webSocket.request();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.StatusCause;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.StreamConsumer;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecListener.Response;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
Expand All @@ -37,8 +38,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -51,6 +50,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
* A {@link WebSocket.Listener} for exec operations.
Expand Down Expand Up @@ -106,7 +106,7 @@ public ListenerStream(String name) {
this.name = name;
}

private void handle(ByteBuffer byteString, WebSocket webSocket) throws IOException {
private void handle(ByteBuffer byteString, WebSocket webSocket) throws Exception {
if (handler != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("exec message received {} bytes on channel {}", byteString.remaining(), name);
Expand Down Expand Up @@ -178,37 +178,46 @@ private ListenerStream createStream(String name, StreamContext streamContext) {
if (streamContext == null) {
return stream;
}
OutputStream os = streamContext.getOutputStream();
if (os == null) {
StreamConsumer consumer = streamContext.getConsumer();
if (consumer == null) {
// redirecting
stream.inputStream = new ExecWatchInputStream(() -> this.webSocketRef.get().request());
this.exitCode.whenComplete(stream.inputStream::onExit);
stream.handler = b -> stream.inputStream.consume(Arrays.asList(b));
} else {
WritableByteChannel channel = Channels.newChannel(os);
stream.handler = b -> asyncWrite(channel, b);
stream.handler = b -> consume(consumer, b, streamContext.isBlocking() ? serialExecutor : Runnable::run,
this::postConsume);
}
return stream;
}

private void asyncWrite(WritableByteChannel channel, ByteBuffer b) {
CompletableFuture.runAsync(() -> {
public static void consume(StreamConsumer consumer, ByteBuffer bytes, Executor executor, Consumer<Throwable> postConsume) {
CompletableFuture.supplyAsync(() -> {
try {
channel.write(b);
} catch (IOException e) {
return consumer.consume(bytes);
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
}, serialExecutor).whenComplete((v, t) -> {
webSocketRef.get().request();
if (t != null) {
if (closed.get()) {
LOGGER.debug("Stream write failed after close", t);
} else {
// This could happen if the user simply closes their stream prior to completion
LOGGER.warn("Stream write failed", t);
}
}, executor)
.whenComplete((cs, t) -> {
if (cs != null) {
cs.whenComplete((v, t1) -> postConsume.accept(t1));
} else {
postConsume.accept(t);
}
});
}

private void postConsume(Throwable t) {
webSocketRef.get().request();
if (t != null) {
if (closed.get()) {
LOGGER.debug("Stream write failed after close", t);
} else {
// This could happen if the user simply closes their stream prior to completion
LOGGER.warn("Stream write failed", t);
}
});
}
}

@Override
Expand Down Expand Up @@ -299,6 +308,7 @@ public void onError(WebSocket webSocket, Throwable t, boolean connectionError) {
@Override
public void onMessage(WebSocket webSocket, String text) {
LOGGER.debug("Exec Web Socket: onMessage(String)");
// this is unexpected and will likely just result in an exception
onMessage(webSocket, ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
}

Expand Down Expand Up @@ -337,7 +347,7 @@ public void onMessage(WebSocket webSocket, ByteBuffer bytes) {
default:
throw new IOException("Unknown stream ID " + streamID);
}
} catch (IOException e) {
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
} finally {
if (close) {
Expand Down
Loading

0 comments on commit 73b9367

Please sign in to comment.