Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent start trigger initialization in scheduler #39585

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented May 13, 2024

Why

During #38674, I introduced a logic that might run user code in the scheduler here. This PR intent to propose a new logic that could avoid this.

What

  • Introduce a StartTriggerArgs class that should contain trigger_cls (or maybe it could be renamed trigger_cls_path?), tirgger_kwargs, timeout, and next_method, which are what we need when we run an operator in deferrable mode.
    • suggestion needed: airflow.models.abstractoperator might not be the best module to put this data class but might need some suggestion move it to airflow.triggers.base (Thanks @jedcunningham !)
  • Introduce a start_from_trigger flag to allow an operator to be executed as normal or from the triggerer

Operator authors will now need to set the start_trigger_args, start_from_trigger (or maybe start_from_triggerer?) this one to start execution directly from the triggerer.

from __future__ import annotations

from datetime import timedelta
from typing import Any

from airflow.models.baseoperator import BaseOperator, StartTriggerArgs
from airflow.triggers.temporal import TimeDeltaTrigger


class AsyncOperator(BaseOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.triggers.testing.SuccessTrigger",
        trigger_kwargs={},
        next_method="execute_complete",
        timeout=None,
    )
    start_from_triggerer = True

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

Note that the mapped operator is still not supported, but a draft PR #39912 has been created.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 7 times, most recently from d621388 to 657a1cf Compare May 14, 2024 15:29
@Lee-W Lee-W changed the title fix(baseoperator): change start_trigger into start_trigger_cls and start_trigger_kwargs Prevent start trigger initialization in scheduler May 14, 2024
@Lee-W Lee-W marked this pull request as ready for review May 14, 2024 15:38
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch from 657a1cf to 926b3d3 Compare May 14, 2024 15:42
@dstandish
Copy link
Contributor

maybe we should mark this experimental.

trigger_kwargs being None vs {} does not seem like an explicit / obvious enough way to decide whether to start from trigger. why wouldn't we just look at the start_from_trigger boolean`?

@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch from 926b3d3 to ec0a768 Compare May 15, 2024 02:27
@Lee-W
Copy link
Member Author

Lee-W commented May 15, 2024

maybe we should mark this experimental.

I'm ok with it. maybe wait for others' comment? if we're to mark it as experimental, where should I do so?

trigger_kwargs being None vs {} does not seem like an explicit / obvious enough way to decide whether to start from trigger. why wouldn't we just look at the start_from_trigger boolean`?

In the previous PR, @uranusjr and I discussed whether we could infer start_from_trigger through existing args. I do not have a strong opinion on this one, but create a commit with start_from_trigger so that even if we don't want it, I can still revert it easily

@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 4 times, most recently from 529ecc1 to 12fb69d Compare May 17, 2024 12:59
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 2 times, most recently from 445274d to dac10f5 Compare May 20, 2024 11:58
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 2 times, most recently from 4b0ac4a to 5f99ce7 Compare May 28, 2024 07:50
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 4 times, most recently from 5848604 to 68cbe7c Compare May 29, 2024 10:23

@internal_api_call
@provide_session
def _defer_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's unnecessary to change the signature of this function and add the other two wrapper functions.

just build the exception object and pass it

e = TaskDeferred(trigger=BaseTrigger(...), method_name="...", ...)

unfortunately, we don't have integration tests set up for AIP-44 but if we did then it would reveal that this will break that mode of execution because for example there's no mechanism for serializing Trigger, possibly among other things.

obviously it's not your fault and a shortcoming of the current state, but in any case i think it will be simpler anyway to keep the existing signature and just build and exc object that you don't raise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's unnecessary to change the signature of this function and add the other two wrapper functions.

The main reason for this PR to exist is that we don't want to run user code in scheduler https://github.com/apache/airflow/pull/38674/files#diff-807ca0a4fd53aeb41166621c9842b0f89b7931fc64e9a60befa36c776db45efaR1215-R1221. To do that, I change the code to accept only the args needed for marking a task as deferred directly instead of doing the import and serialization process.

        trigger_row = Trigger(
            classpath=start_trigger_args.trigger_cls, kwargs=start_trigger_args.trigger_kwargs
        )

unfortunately, we don't have integration tests set up for AIP-44 but if we did then it would reveal that this will break that mode of execution because for example there's no mechanism for serializing Trigger, possibly among other things.

I didn't quite understand that part. Could you provide more details or clarify it for me? Thanks!

@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 2 times, most recently from 742800e to 642bb42 Compare May 30, 2024 08:29
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch from 6931473 to c273cf9 Compare May 30, 2024 09:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants