Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferrable support for HttpOperator #45228

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

TakayukiTanabeSS
Copy link

Enabling deferrable for HttpOperator causes an error.
The minimum test case is as follows

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.http.operators.http import HttpOperator

with DAG(
    dag_id="my_dag_name",
    catchup=False,
) as dag:

    test = HttpOperator(
        task_id="test",
        method="GET",
        http_conn_id="google_http_default",
        endpoint="search?q=airflow",
        log_response=True,
        deferrable=True,
    )

    EmptyOperator(task_id="start") >> test >> EmptyOperator(task_id="end")

For connections, set Connection Id to “google_http_default”, Connection Type to “HTTP” and HOST to “https://www.google.com”.

If you place it in airflow/files/dags/ and run it you will get the following error

[2024-12-27, 01:53:21 UTC] {taskinstance.py:3311} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/baseoperator.py", line 1814, in resume_execution
    return execute_callable(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/providers/http/operators/http.py", line 261, in execute_complete
    raise AirflowException(f"Unexpected error in the operation: {event['message']}")
airflow.exceptions.AirflowException: Unexpected error in the operation: Connection closed.

Causes:.

https://github.com/apache/airflow/blob/9178f8f0b1ffd80b8eacf4d02b732528fde218e5/providers/src/airflow/providers/http/triggers/http .py#L97
A session object is created by the above caller.

It is at the following link
https://github.com/apache/airflow/blob/9178f8f0b1ffd80b8eacf4d02b732528fde218e5/providers/src/airflow/providers/http/hooks/http.py #L413

The following is then called.
https://github.com/apache/airflow/blob/9178f8f0b1ffd80b8eacf4d02b732528fde218e5/providers/src/airflow/providers/http/triggers/http .py#L103

https://github.com/apache/airflow/blob/9178f8f0b1ffd80b8eacf4d02b732528fde218e5/providers/src/airflow/providers/http/triggers/http .py#L117
It is expected that the ClientSession is alive at this point, but it is already out of scope and disconnected.

To properly correct this, we changed the configuration to the following Previously, the opposite was the case, which caused the bug.

Correct.
ClientSession scope {
ClientResponse scope{
response.read
}
}

Incorrect
ClientResponse scope {
ClientSession scope {
}
response.read expect session living (error)
}

With the interface changes, we modified the required tests, re-ran the required tests, and verified that they completed successfully. I would be happy to check and review! Best regards.

Takayuki Tanabe


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link

boring-cyborg bot commented Dec 27, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant