Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue of new dag getting old dataset events. #39603

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

tosheer
Copy link

@tosheer tosheer commented May 14, 2024

If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events) DAG see all dataset events from very first event for dataset.

Fixes: #39456

@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label May 14, 2024
Copy link

boring-cyborg bot commented May 14, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@@ -3776,6 +3776,11 @@ def test_create_dag_runs_datasets(self, session, dag_maker):
dataset1 = Dataset(uri="ds1")
dataset2 = Dataset(uri="ds2")

# Create DAG before the arrival of dataset events.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be part of its own, new test?

test_new_dagrun_ignores_old_datasets or similar?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RNHTTR I've added a new unit test. Please review it and let me know if it needs any adjustments.

Unit test is checking newly added Dag should just get the Dataset arrived after it was created(event3) and not the dataset events(event 1 and event2)

@@ -3811,20 +3816,40 @@ def test_create_dag_runs_datasets(self, session, dag_maker):
)
session.add(event2)

# Create a third event, creation time is more recent, but data interval is even older
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is reproducible with a single, previous DAG run having a dataset event, shouldn't the test only require 3 total DAG runs?

  • One DAG run from the original DAG (let's call this Run A)
  • One more DAG run (Run B) after the new DAG is introduced, which triggers the new DAG's DAG run which should only have dataset info from Run B.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly designed test case perfectly addresses the scenario you have mentioned. Let me know if i missing anything.

@jedcunningham
Copy link
Member

cc @uranusjr

@uranusjr
Copy link
Member

To be honest I don’t really consider the current behaviour (in 2.9.1) a bug, but I can see why people expect it this way. I think we probably need an entry in newsfragment to warn users about possible breakages.

@uranusjr
Copy link
Member

Also the tests need improvements, as mentioned in above reviews.

@tosheer
Copy link
Author

tosheer commented May 17, 2024

@RNHTTR / @uranusjr Can you guys please review it and let me know if it needs any adjustments.

Comment on lines 1278 to 1283
if previous_dag_run:
dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date)
else:
dataset_event_filters.append(
DatasetEvent.timestamp >= DagScheduleDatasetReference.created_at
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm just wondering if the difference between >= and > is something we should worry about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, moved it back to >

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DAGs are able to see historical dataset events when created new
4 participants