Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OpenLineage] Add new DAG job facet #39467

Open
2 tasks done
kacpermuda opened this issue May 7, 2024 · 1 comment · May be fixed by #39520
Open
2 tasks done

[OpenLineage] Add new DAG job facet #39467

kacpermuda opened this issue May 7, 2024 · 1 comment · May be fixed by #39520
Assignees
Labels

Comments

@kacpermuda
Copy link
Contributor

Description

I would like to add a new facet that would allow re-creation of DAG graph from OL events. It could look like in my example, but if anybody has any ideas on how to improve it, i would love to hear it.

The facet could look like:

@define(slots=False)
class AirflowDagJobFacet(BaseFacet):
    taskTree: dict
    taskGroups: dict
    tasks: dict

where taskGroups could look like:

{
    "tg1": {
        "parentGroup": "tg2"
    },
    "tg2": {
        "parentGroup": "tg3"
    },
    "tg3": {},
}

tasks could look like:

{
    "tg1.task.id.1": {
        "operator": "BashOperator",
        "task_group": "tg1"
    },
    "task.id.with.dots.2": {
        "operator": "EmptyOperator",
        "task_group": "tg2"
    },
    "task_3": {
        "operator": "SomeCustomOperator",
    }
}

and the taskTree could look like:

# Example task dependency definition (minus the dots, it would raise the error but it's more readable)
# tg1.task.id.1 >> task_3
# tg1.task.id.1 >> task_4 >> task_5
# task.id.with.dots.2
# task_6 >> [task_7, task_8] >> task_9
task_tree = {
    "tg1.task.id.1": {
        "task_3": {},
        "task_4": {
            "task_5": {}
        }
    },
    "task.id.with.dots.2": {},
    "task_6": {
        "task_7": {
            "task_9": {}  # yes, it would be duplicated becaues of [] dependency
        },
        "task_8": {
            "task_9": {}  # yes, it would be duplicated becaues of [] dependency
        },
    }
}

Use case/motivation

Some OL consumers are trying to re-create DAG graph or simply deduce task dependencies from OL events. Currently it mainly works based on upstream/downstream task ids attributes delivered in AirflowRunFacet for each task. This approach has its downsides when using EmptyOperators (that may not be executed and thus won't deliver events) or Operators not included in OpenLineage (disabled using disabled-for-operators). The above make it impossible to create the dependency graph correctly.

To make it easier for the consumers, I think we should provide all the necessary information to re-create the DAG graph in the DAG START event, and for that I would like to propose adding a new facet.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@kacpermuda kacpermuda added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels May 7, 2024
@kacpermuda
Copy link
Contributor Author

We could also add the information for each task about the selective lineage being on/off for the task, Operators being disabled / enabled with config. This way, the OL consumer would know exactly what events to expect.

Also, for a dag complete/fail events we could provide them with the state for each task in a dagrun, then the consumer can compare that information with the events received and see why some events are missing f.e. tasks are skipped as there was a branching operator.

@nathadfield nathadfield added area:providers and removed needs-triage label for new issues that we didn't triage yet labels May 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants