Skip to content

Commit

Permalink
Support kotlin coroutines
Browse files Browse the repository at this point in the history
Resolves: #383

Inspired by https://github.com/stas29a/coroutine-feign-client

## TODO

- [ ] Separate Kotlin support module
- [ ] Enhance test case
- [ ] Refactoring
- [ ] Clean up pom.xml
  • Loading branch information
wplong11 committed Jul 31, 2022
1 parent 8e5e8ce commit c532664
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 6 deletions.
53 changes: 53 additions & 0 deletions feign-reactor-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
<version>3.2.4-SNAPSHOT</version>
</parent>

<properties>
<kotlin.version>1.5.30</kotlin.version>
<kotlinx.coroutines.version>1.5.2</kotlinx.coroutines.version>
</properties>

<artifactId>feign-reactor-core</artifactId>
<packaging>jar</packaging>
<name>Feign Reactive Core</name>
Expand Down Expand Up @@ -63,6 +68,23 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
<version>${kotlinx.coroutines.version}</version>
</dependency>

<!-- Tests -->
<dependency>
<groupId>io.projectreactor</groupId>
Expand Down Expand Up @@ -185,6 +207,37 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>kotlin-maven-plugin</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/src/main/kotlin</sourceDir>
<sourceDir>${project.basedir}/src/main/java</sourceDir>
</sourceDirs>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/src/test/kotlin</sourceDir>
<sourceDir>${project.basedir}/src/test/java</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
28 changes: 28 additions & 0 deletions feign-reactor-core/src/main/java/reactivefeign/KotlinExtensions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
@file:JvmName("KotlinExtensions")

package reactivefeign

import kotlinx.coroutines.reactor.awaitSingle
import reactivefeign.client.ReactiveHttpResponse
import reactor.core.publisher.Mono
import java.lang.reflect.Method
import java.lang.reflect.Type
import kotlin.reflect.jvm.javaType
import kotlin.reflect.jvm.kotlinFunction

internal suspend fun Mono<*>.awaitReactiveHttpResponse(): Any {
val result = awaitSingle()
if (result is ReactiveHttpResponse<*>) {
val body = result.body()
require(body is Mono<*>) { "Only Mono type is allowed for suspend method" }
return body.awaitSingle()
}

return result
}

internal fun Method.isSuspendMethod(): Boolean =
kotlinFunction?.isSuspend ?: false

internal fun Method.getKotlinMethodReturnType(): Type? =
kotlinFunction?.returnType?.javaType
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import static feign.Util.checkNotNull;
import static java.util.Arrays.asList;
import static reactivefeign.KotlinExtensions.getKotlinMethodReturnType;
import static reactivefeign.KotlinExtensions.isSuspendMethod;
import static reactivefeign.utils.FeignUtils.*;

/**
Expand All @@ -48,7 +50,13 @@ public List<MethodMetadata> parseAndValidateMetadata(final Class<?> targetType)

for (final MethodMetadata metadata : methodsMetadata) {
final Type type = metadata.returnType();
if (!isReactorType(type)) {

boolean isSuspend = isSuspendMethod(metadata.method());
if (isSuspend) {
modifySuspendMethodMetadata(metadata);
}

if (!isReactorType(type) && !isSuspend) {
throw new IllegalArgumentException(String.format(
"Method %s of contract %s doesn't returns reactor.core.publisher.Mono or reactor.core.publisher.Flux",
metadata.configKey(), targetType.getSimpleName()));
Expand All @@ -64,6 +72,23 @@ public List<MethodMetadata> parseAndValidateMetadata(final Class<?> targetType)
return methodsMetadata;
}

private static void modifySuspendMethodMetadata(MethodMetadata metadata) {
Type kotlinMethodReturnType = getKotlinMethodReturnType(metadata.method());
if (kotlinMethodReturnType == null) {
throw new IllegalArgumentException(String.format(
"Method %s can't have continuation argument, only kotlin method is allowed",
metadata.configKey()));
}
metadata.returnType(kotlinMethodReturnType);

int continuationIndex = metadata.method().getParameterCount() - 1;
metadata.ignoreParamater(continuationIndex);

if(metadata.bodyIndex() != null && metadata.bodyIndex().equals(continuationIndex)) {
metadata.bodyIndex(null);
}
}

private static final Set<Class> REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class));

private boolean isReactorType(final Type type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import static feign.Util.checkNotNull;
import static feign.Util.isDefault;
import static reactivefeign.KotlinExtensions.isSuspendMethod;
import static reactivefeign.client.ReactiveHttpExchangeFilterFunction.ofRequestProcessor;
import static reactivefeign.client.ReactiveHttpExchangeFilterFunction.ofResponseProcessor;
import static reactivefeign.client.StatusHandlerPostProcessor.handleStatus;
Expand Down Expand Up @@ -287,7 +288,9 @@ public static PublisherHttpClient retry(
MethodMetadata methodMetadata,
Retry retry) {
Type returnPublisherType = returnPublisherType(methodMetadata);
if(returnPublisherType == Mono.class){
if (isSuspendMethod(methodMetadata.method())) {
return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
} else if (returnPublisherType == Mono.class) {
return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
} else if(returnPublisherType == Flux.class) {
return new FluxRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
Expand All @@ -297,7 +300,9 @@ public static PublisherHttpClient retry(
}

protected PublisherHttpClient toPublisher(ReactiveHttpClient reactiveHttpClient, MethodMetadata methodMetadata){
if(isResponsePublisher(methodMetadata.returnType())){
if (isSuspendMethod(methodMetadata.method())) {
return new ResponsePublisherHttpClient(reactiveHttpClient);
} else if (isResponsePublisher(methodMetadata.returnType())) {
return new ResponsePublisherHttpClient(reactiveHttpClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
import feign.InvocationHandlerFactory;
import feign.InvocationHandlerFactory.MethodHandler;
import feign.Target;
import kotlin.coroutines.Continuation;
import reactor.core.publisher.Mono;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Map;

import static feign.Util.checkNotNull;
import static reactivefeign.KotlinExtensions.awaitReactiveHttpResponse;
import static reactivefeign.KotlinExtensions.isSuspendMethod;

/**
* {@link InvocationHandler} implementation that transforms calls to methods of feign contract into
Expand Down Expand Up @@ -61,6 +66,13 @@ private void defineObjectMethodsHandlers() {

@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
if (isSuspendMethod(method)) {
Object[] newArgs = Arrays.copyOfRange(args, 0, args.length - 1);
Mono<?> result = (Mono<?>) dispatch.get(method).invoke(newArgs);
Continuation<Object> continuation = (Continuation<Object>) args[args.length - 1];
return awaitReactiveHttpResponse(result, continuation);
}

return dispatch.get(method).invoke(args);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.lang.reflect.Type;

import static feign.Util.checkNotNull;
import static reactivefeign.KotlinExtensions.isSuspendMethod;
import static reactivefeign.utils.FeignUtils.isResponsePublisher;
import static reactivefeign.utils.FeignUtils.returnPublisherType;

Expand All @@ -35,7 +36,9 @@ public MethodHandler create(MethodMetadata metadata) {
MethodHandler methodHandler = new PublisherClientMethodHandler(
target, metadata, publisherClientFactory.create(metadata));

if(isResponsePublisher(metadata.returnType())){
if (isSuspendMethod(metadata.method())) {
return new MonoMethodHandler(methodHandler);
} else if (isResponsePublisher(metadata.returnType())) {
return new MonoMethodHandler(methodHandler);
}

Expand All @@ -53,7 +56,9 @@ public MethodHandler create(MethodMetadata metadata) {
public MethodHandler createDefault(Method method) {
MethodHandler defaultMethodHandler = new DefaultMethodHandler(method);

if(method.getReturnType() == Mono.class){
if (isSuspendMethod(method)) {
return new MonoMethodHandler(defaultMethodHandler);
} else if (method.getReturnType() == Mono.class) {
return new MonoMethodHandler(defaultMethodHandler);
} else if(method.getReturnType() == Flux.class) {
return new FluxMethodHandler(defaultMethodHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public static Type returnActualType(MethodMetadata methodMetadata) {
}

public static Type returnActualType(Type returnType) {
if (!(returnType instanceof ParameterizedType)) {
return returnType;
}

Class<?> publisher = (Class)((ParameterizedType) returnType).getRawType();
Type typeInPublisher = resolveLastTypeParameter(returnType, publisher);
if(isResponsePublisher(publisher, typeInPublisher)){
Expand Down
66 changes: 66 additions & 0 deletions feign-reactor-core/src/test/java/reactivefeign/SuspendTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package reactivefeign

import com.fasterxml.jackson.core.JsonProcessingException
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.AfterClass
import org.junit.BeforeClass
import org.junit.Test
import reactivefeign.resttemplate.client.RestTemplateFakeReactiveFeign
import reactivefeign.testcase.SuspendIceCreamServiceApi
import reactivefeign.testcase.domain.OrderGenerator
import reactor.core.publisher.Mono
import reactor.netty.DisposableServer
import reactor.netty.http.HttpProtocol
import reactor.netty.http.server.HttpServer
import reactor.netty.http.server.HttpServerRequest
import reactor.netty.http.server.HttpServerResponse
import reactor.netty.http.server.HttpServerRoutes
import java.time.Duration

class SuspendTest {
companion object {
private lateinit var server: DisposableServer
private const val DELAY_IN_MILLIS: Long = 500L
private val cannedValue = OrderGenerator().generate(1)

@BeforeClass
@JvmStatic
@Throws(JsonProcessingException::class)
fun startServer() {
val data = TestUtils.MAPPER.writeValueAsString(cannedValue).toByteArray()
server = HttpServer.create()
.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C)
.route { routes: HttpServerRoutes ->
routes[
"/icecream/orders/1", { req: HttpServerRequest?, res: HttpServerResponse ->
res.header("Content-Type", "application/json")
Mono.delay(Duration.ofMillis(DELAY_IN_MILLIS))
.thenEmpty(res.sendByteArray(Mono.just(data)))
}
]
}
.bindNow()
}

@JvmStatic
@AfterClass
fun stopServer() {
server.disposeNow()
}
}

@Test
fun shouldRun(): Unit = runBlocking {
val client = RestTemplateFakeReactiveFeign
.builder<SuspendIceCreamServiceApi>()
.target(
SuspendIceCreamServiceApi::class.java,
"http://localhost:" + server.port()
)

val firstOrder = client.findOrder(orderId = 1)

assertThat(firstOrder).usingRecursiveComparison().isEqualTo(cannedValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;

import static org.springframework.core.ParameterizedTypeReference.forType;
import static reactivefeign.KotlinExtensions.isSuspendMethod;
import static reactivefeign.utils.FeignUtils.returnActualType;
import static reactivefeign.utils.FeignUtils.returnPublisherType;

Expand All @@ -51,7 +52,11 @@ public class RestTemplateFakeReactiveHttpClient implements ReactiveHttpClient {
this.restTemplate = restTemplate;
this.acceptGzip = acceptGzip;

returnPublisherType = returnPublisherType(methodMetadata);
if (isSuspendMethod(methodMetadata.method())) {
returnPublisherType = Mono.class;
} else {
returnPublisherType = returnPublisherType(methodMetadata);
}
returnActualType = forType(returnActualType(methodMetadata));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package reactivefeign.testcase

import feign.Param
import feign.RequestLine
import reactivefeign.testcase.domain.IceCreamOrder

interface SuspendIceCreamServiceApi {
@RequestLine("GET /icecream/orders/{orderId}")
suspend fun findOrder(@Param("orderId") orderId: Int): IceCreamOrder
}

0 comments on commit c532664

Please sign in to comment.