Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Engine] Add new ThreadPoolTaskRunner and new PrefectFuture implementation #13337

Merged
merged 15 commits into from
May 16, 2024

Conversation

desertaxle
Copy link
Member

@desertaxle desertaxle commented May 14, 2024

This PR creates a new interface for task runners and prefect futures to work with the new engine. The new ThreadPoolTaskRunner is intended to replace the existing ConcurrentTaskRunner. The new PrefectFuture implementation is meant to serve as a wrapper for specific future implementations. More implementations will be added when we update the Dask and Ray task runners.

This PR turns on the logic to run the existing task tests through the new engine to ensure compatibility and expose differences.

Two main differences are:

  1. .submit on a task is always a sync call
  2. .result and .wait on a PrefectFuture are always sync calls

Example

Example of a flow submitting tasks to a ThreadPoolTaskRunner (should look familiar)

from time import sleep

from prefect import flow, task


@task
def task_the_first():
    sleep(10)
    return "hello"


@task
def task_the_second(thing_to_say):
    print(f"Upstream told me to say {thing_to_say}")


@task
def task_the_third():
    print("I'm the third task")


@flow
def my_flow():
    future1 = task_the_first.submit()
    future2 = task_the_second.submit(future1)
    task_the_third(wait_for=[future2])


if __name__ == "__main__":
    my_flow()

Checklist

  • This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • If this pull request adds new functionality, it includes unit tests that cover the changes
  • This pull request includes a label categorizing the change e.g. maintenance, fix, feature, enhancement, docs.

For documentation changes:

  • This pull request includes redirect settings in netlify.toml for files that are removed or renamed.

For new functions or classes in the Python SDK:

  • This pull request includes helpful docstrings.
  • If a new Python file was added, this pull request contains a stub page in the Python SDK docs and an entry in mkdocs.yml navigation.

@desertaxle desertaxle added the experimental Related to an experimental feature label May 14, 2024
@desertaxle desertaxle marked this pull request as ready for review May 16, 2024 13:13
@desertaxle desertaxle requested a review from a team as a code owner May 16, 2024 13:13
@desertaxle desertaxle requested a review from cicdw May 16, 2024 13:13
Comment on lines +803 to +809
if PREFECT_EXPERIMENTAL_ENABLE_NEW_ENGINE:
future = foo.submit()
future.wait()
state = future.state
else:
future = await foo.submit()
state = await future.wait()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the tests differ between the current engine and the new engine due to the sync-only nature of the new task runner and PrefectFuture interfaces

Comment on lines +117 to +131
_result = self._final_state.result(
raise_on_failure=raise_on_failure, fetch=True
)
# state.result is a `sync_compatible` function that may or may not return an awaitable
# depending on whether the parent frame is sync or not
if inspect.isawaitable(_result):
_result = run_sync(_result)
return _result
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result retrieval is a sharp edge in the new engine. @sync_compatible will return a coroutine for State.result() when called from a sync context if there is an active event loop. We likely need a sync implementation for State.result().

Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from sync review + read through, I think this looks good

@desertaxle desertaxle merged commit 4614711 into main May 16, 2024
25 checks passed
@desertaxle desertaxle deleted the new-concurrent-task-runner branch May 16, 2024 23:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
experimental Related to an experimental feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants