-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[New Engine] Add new ThreadPoolTaskRunner
and new PrefectFuture
implementation
#13337
Conversation
2909332
to
ea5fafd
Compare
d1b4c69
to
36481b9
Compare
if PREFECT_EXPERIMENTAL_ENABLE_NEW_ENGINE: | ||
future = foo.submit() | ||
future.wait() | ||
state = future.state | ||
else: | ||
future = await foo.submit() | ||
state = await future.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the tests differ between the current engine and the new engine due to the sync-only nature of the new task runner and PrefectFuture
interfaces
_result = self._final_state.result( | ||
raise_on_failure=raise_on_failure, fetch=True | ||
) | ||
# state.result is a `sync_compatible` function that may or may not return an awaitable | ||
# depending on whether the parent frame is sync or not | ||
if inspect.isawaitable(_result): | ||
_result = run_sync(_result) | ||
return _result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Result retrieval is a sharp edge in the new engine. @sync_compatible
will return a coroutine for State.result()
when called from a sync context if there is an active event loop. We likely need a sync implementation for State.result()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from sync review + read through, I think this looks good
91f4127
to
aaeba59
Compare
8e05c03
to
d5b4ddb
Compare
This PR creates a new interface for task runners and prefect futures to work with the new engine. The new
ThreadPoolTaskRunner
is intended to replace the existingConcurrentTaskRunner
. The newPrefectFuture
implementation is meant to serve as a wrapper for specific future implementations. More implementations will be added when we update the Dask and Ray task runners.This PR turns on the logic to run the existing task tests through the new engine to ensure compatibility and expose differences.
Two main differences are:
.submit
on a task is always a sync call.result
and.wait
on aPrefectFuture
are always sync callsExample
Example of a flow submitting tasks to a
ThreadPoolTaskRunner
(should look familiar)Checklist
<link to issue>
"maintenance
,fix
,feature
,enhancement
,docs
.For documentation changes:
netlify.toml
for files that are removed or renamed.For new functions or classes in the Python SDK:
mkdocs.yml
navigation.