Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is asynchronous nack thread blocking? #183

Open
livius-ungureanu opened this issue Nov 27, 2019 · 4 comments
Open

Is asynchronous nack thread blocking? #183

livius-ungureanu opened this issue Nov 27, 2019 · 4 comments

Comments

@livius-ungureanu
Copy link

livius-ungureanu commented Nov 27, 2019

It looks like explicit nack can be used only synchronously and the only way to use it asynchronously is by supplying it as an implicit default RecoveryStrategy for BoundConsumerDefinition(i.e. the subscription).

So to consume some item asynchronously we should roughly use something like ack(Future{ body_process_result}) . The doc says that if the future fails then the RecoveryStrategy is applied(i.e the wanted nack is called). But this leads to

being hit which looks like thread blocking.

Are there any plans to improve this?

And a 2nd question: Is there any other approach to use nack asynchronously besides the approach above(also used in /op-rabbit/core/src/test/scala/com/spingo/op_rabbit/consumerSpec.scala test) ?

@kw217
Copy link

kw217 commented Feb 7, 2020

I don't think this is actually a problem - the real work is handled by

case delivery: Delivery =>
pendingDeliveries.add(delivery.envelope.getDeliveryTag)
implicit val ec = handlerExecutionContext
applyHandler("running handler", delivery)(async(subscription.handler)) pipeTo self

which runs the work (subscription.handler) asynchronously as expected. Only if the work throws does anything interesting happen - in this case the exception becomes a ReceiveResult.Fail message, which invokes the recovery strategy synchronously in

try
Await.result(
applyHandler("running recoveryStrategy", fail.delivery)(
subscription.recoveryStrategy(subscription.queue.queueName, channel, fail.exception)),
5.minutes)
catch { case ex: Throwable =>
ReceiveResult.Fail(fail.delivery, Some("exception while running recoveryStrategy"), ex) }
} match {

The recovery strategy in our application (presumably typical) is just RecoveryStrategy.nack(requeue=true), which will complete immediately (ultimately it reduces to p.success(ReceiveResult.nack)). So the thread will only be blocked momentarily, which is not a problem.

I think this issue can probably be closed.

@kw217
Copy link

kw217 commented Feb 7, 2020

To answer point 2, isn't it the case that if the body returns ReceiveResult.nack then op-rabbit will nack the message?

@livius-ungureanu
Copy link
Author

livius-ungureanu commented Feb 11, 2020

thank you @kw217

Right.

  • On nominal scenarios the RecoveryStrategy is not called so the code is 100% async for nominal scenarios.
  • Just in case of failure, the RecoveryStrategy is called synchronously, blocking the thread but this could be accepted as an insignificant amount of work done in this case (just call the RecoverStrategy.nack to send out a nack)

So the main concern does not exist. Though it would have been preferred an implementation without thread blocking at all.

Regarding the 2nd point it would be nice if op-rabbit's RecoveryStrategy would be more flexible and allow the user to specify a reject with requeue = true/false as needed. Currently the user is forced to specify only one of them.

@livius-ungureanu
Copy link
Author

We'd like to also have op-rabbit's thoughts on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants