Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle async cancelled error explicitly #811

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
AsyncEvent,
AsyncLock,
AsyncShieldCancellation,
)
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

Expand Down Expand Up @@ -231,7 +236,7 @@ async def handle_async_request(self, request: Request) -> Response:
timeout = timeouts.get("pool", None)
try:
connection = await status.wait_for_connection(timeout=timeout)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
Expand All @@ -256,7 +261,7 @@ async def handle_async_request(self, request: Request) -> Response:
# status so that the request becomes queued again.
status.unset_connection()
await self._attempt_to_acquire_connection(status)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with AsyncShieldCancellation():
await self.response_closed(status)
raise exc
Expand Down
10 changes: 7 additions & 3 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
map_exceptions,
)
from .._models import Origin, Request, Response
from .._synchronization import AsyncLock, AsyncShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
AsyncLock,
AsyncShieldCancellation,
)
from .._trace import Trace
from .interfaces import AsyncConnectionInterface

Expand Down Expand Up @@ -126,7 +130,7 @@ async def handle_async_request(self, request: Request) -> Response:
"network_stream": self._network_stream,
},
)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with AsyncShieldCancellation():
async with Trace("response_closed", logger, request) as trace:
await self._response_closed()
Expand Down Expand Up @@ -328,7 +332,7 @@ async def __aiter__(self) -> AsyncIterator[bytes]:
async with Trace("receive_response_body", logger, self._request, kwargs):
async for chunk in self._connection._receive_response_body(**kwargs):
yield chunk
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
Expand Down
13 changes: 9 additions & 4 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
RemoteProtocolError,
)
from .._models import Origin, Request, Response
from .._synchronization import AsyncLock, AsyncSemaphore, AsyncShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
AsyncLock,
AsyncSemaphore,
AsyncShieldCancellation,
)
from .._trace import Trace
from .interfaces import AsyncConnectionInterface

Expand Down Expand Up @@ -107,7 +112,7 @@ async def handle_async_request(self, request: Request) -> Response:
kwargs = {"request": request}
async with Trace("send_connection_init", logger, request, kwargs):
await self._send_connection_init(**kwargs)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with AsyncShieldCancellation():
await self.aclose()
raise exc
Expand Down Expand Up @@ -160,7 +165,7 @@ async def handle_async_request(self, request: Request) -> Response:
"stream_id": stream_id,
},
)
except BaseException as exc: # noqa: PIE786
except EXCEPTION_OR_CANCELLED as exc: # noqa: PIE786
with AsyncShieldCancellation():
kwargs = {"stream_id": stream_id}
async with Trace("response_closed", logger, request, kwargs):
Expand Down Expand Up @@ -573,7 +578,7 @@ async def __aiter__(self) -> typing.AsyncIterator[bytes]:
request=self._request, stream_id=self._stream_id
):
yield chunk
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
Expand Down
11 changes: 8 additions & 3 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, Lock, ShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
Event,
Lock,
ShieldCancellation,
)
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

Expand Down Expand Up @@ -231,7 +236,7 @@ def handle_request(self, request: Request) -> Response:
timeout = timeouts.get("pool", None)
try:
connection = status.wait_for_connection(timeout=timeout)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
Expand All @@ -256,7 +261,7 @@ def handle_request(self, request: Request) -> Response:
# status so that the request becomes queued again.
status.unset_connection()
self._attempt_to_acquire_connection(status)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with ShieldCancellation():
self.response_closed(status)
raise exc
Expand Down
10 changes: 7 additions & 3 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
map_exceptions,
)
from .._models import Origin, Request, Response
from .._synchronization import Lock, ShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
Lock,
ShieldCancellation,
)
from .._trace import Trace
from .interfaces import ConnectionInterface

Expand Down Expand Up @@ -126,7 +130,7 @@ def handle_request(self, request: Request) -> Response:
"network_stream": self._network_stream,
},
)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with ShieldCancellation():
with Trace("response_closed", logger, request) as trace:
self._response_closed()
Expand Down Expand Up @@ -328,7 +332,7 @@ def __iter__(self) -> Iterator[bytes]:
with Trace("receive_response_body", logger, self._request, kwargs):
for chunk in self._connection._receive_response_body(**kwargs):
yield chunk
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
Expand Down
13 changes: 9 additions & 4 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
RemoteProtocolError,
)
from .._models import Origin, Request, Response
from .._synchronization import Lock, Semaphore, ShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
Lock,
Semaphore,
ShieldCancellation,
)
from .._trace import Trace
from .interfaces import ConnectionInterface

Expand Down Expand Up @@ -107,7 +112,7 @@ def handle_request(self, request: Request) -> Response:
kwargs = {"request": request}
with Trace("send_connection_init", logger, request, kwargs):
self._send_connection_init(**kwargs)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with ShieldCancellation():
self.close()
raise exc
Expand Down Expand Up @@ -160,7 +165,7 @@ def handle_request(self, request: Request) -> Response:
"stream_id": stream_id,
},
)
except BaseException as exc: # noqa: PIE786
except EXCEPTION_OR_CANCELLED as exc: # noqa: PIE786
with ShieldCancellation():
kwargs = {"stream_id": stream_id}
with Trace("response_closed", logger, request, kwargs):
Expand Down Expand Up @@ -573,7 +578,7 @@ def __iter__(self) -> typing.Iterator[bytes]:
request=self._request, stream_id=self._stream_id
):
yield chunk
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
Expand Down
18 changes: 17 additions & 1 deletion httpcore/_synchronization.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,37 @@
import threading
from types import TracebackType
from typing import Optional, Type
from typing import Optional, Tuple, Type, Union

import sniffio

from ._exceptions import ExceptionMapping, PoolTimeout, map_exceptions


EXCEPTION_OR_CANCELLED: Union[
Tuple[Type[BaseException]],
Tuple[Type[BaseException], Type[BaseException]],
Tuple[Type[BaseException], Type[BaseException], Type[BaseException]],
] = (Exception,)
T-256 marked this conversation as resolved.
Show resolved Hide resolved

# Our async synchronization primatives use either 'anyio' or 'trio' depending
# on if they're running under asyncio or trio.

try:
import trio

EXCEPTION_OR_CANCELLED += (trio.Cancelled,)
except ImportError: # pragma: nocover
trio = None # type: ignore

try:
import anyio

try:
import asyncio

EXCEPTION_OR_CANCELLED += (asyncio.CancelledError,)
except ImportError:
pass
except ImportError: # pragma: nocover
anyio = None # type: ignore

Expand Down