-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ low-code: Add Incremental Parent State Handling to SubstreamPartitionRouter #38211
base: master
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
611acee
to
11851ee
Compare
Please split CDK and connectors changes into different PRs |
@artem1205 I will create separate PRs for connectors; I just added these changes here for now for easier testing. This way we can execute regression tests from this branch. |
@@ -28,3 +28,21 @@ def stream_slices(self) -> Iterable[StreamSlice]: | |||
|
|||
:return: List of stream slices | |||
""" | |||
|
|||
def set_parent_state(self, stream_state: StreamState) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is only used on partition routers so I think it would be worth creating a new PartitionRouter
interface that would be more specific.
We should also leave the two methods as abstract to ensure developers explicitly define them instead of relying on a default behavior that may or may not make sense for their use case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any stream slicer can handle parent states. For example CartesianProductStreamSlicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeclarativeCursor
doesn't, right?
I think it makes sense for CartesianProductStreamSlicer
for also being a PartitionRouter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed CartesianProductStreamSlicer to PartitionRouter
|
||
def set_parent_state(self, stream_state: StreamState) -> None: | ||
""" | ||
Set the state of the parent streams. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the docstring should be more specific about the parameter expectations. stream_state must have parent_state
which is not obvious from the signature. It might even be worth creating a new class to codify the expectation (kind of like how StreamSlice codifies the presence of a partition and a cursor_slice)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated docs for all classes
|
||
yield from stream_slices_for_parent | ||
|
||
def set_parent_state(self, stream_state: Optional[StreamState]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is stream_state
an Optional
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -86,6 +86,8 @@ def set_initial_state(self, stream_state: StreamState) -> None: | |||
for state in stream_state["states"]: | |||
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) | |||
|
|||
self._partition_router.set_parent_state(stream_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this deserves a comment as it's not obvious why we need to update the router's parent state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading through the code again. We're passing the full state, not just the parent_state. Is there a reason this can't just call the router's set_initial_state
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream slicers and partition routers don't keep track of the cursor and don't have a set_initial_state
method. set_parent_state
sets the state to the parent streams that partition routers operate on, if such parent streams exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to me this means we want to add a set_initial_state
method to the routers - from the caller's perspective, it doesn't really matter that it's setting a parent state. It's an implementation detail.
Furthermore, not all stream slicers know or care about parents (eg list partition router)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
edit: I do think the state being set is somewhat different, but the interface feels very strange to me for two reasons
- Not every stream slicers should know or care about parent streams
- even from the caller, the fact that it's updating a parent stream state is irrelevant. It's just passing the whole state object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same goes with set_initial_state
for stream slicers, as not every stream slicer cares about the state. Introducing a set_initial_state
method could create confusion, implying that partition routers are somehow dependent on the state. set_parent_state
clearly articulates that it is only connected to the parent streams, as the stream interacts with the parent stream through the partition router.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's exactly the point - we're now saying that SubstreamPartitionRouter
and CartesianProductStreamSlicer
are incremental and have a concept of state they need to keep track of.
I'm not sure if it'll make the code simpler or more complicated, but conceptually, this makes me think the SubstreamPartitionRouter
should have a Cursor
when incremental_dependency
is True.
This would essentially mean that instead of keeping the parent state in a dict, it could pass the parent records to its cursor's observe method while iterating over them.
I feel 4/10 strongly on this issue so not worth blocking on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the methods' names. Regarding using a Cursor to keep track of the state, it is already managed by the cursor of parent streams. Partition routers just retrieve the state from the internal parent streams.
@@ -28,3 +28,21 @@ def stream_slices(self) -> Iterable[StreamSlice]: | |||
|
|||
:return: List of stream slices | |||
""" | |||
|
|||
def set_parent_state(self, stream_state: StreamState) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeclarativeCursor
doesn't, right?
I think it makes sense for CartesianProductStreamSlicer
for also being a PartitionRouter
@@ -28,3 +28,45 @@ def stream_slices(self) -> Iterable[StreamSlice]: | |||
|
|||
:return: List of stream slices | |||
""" | |||
|
|||
def set_parent_state(self, stream_state: StreamState) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's leave the method as abstract to force the concrete class to implement it (or intentionally implement a no-op)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
""" | ||
pass | ||
|
||
def get_parent_state(self) -> Optional[Mapping[str, StreamState]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's leave the method as abstract to force the concrete class to implement it (or intentionally implement a no-op)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -86,6 +86,8 @@ def set_initial_state(self, stream_state: StreamState) -> None: | |||
for state in stream_state["states"]: | |||
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) | |||
|
|||
self._partition_router.set_parent_state(stream_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's exactly the point - we're now saying that SubstreamPartitionRouter
and CartesianProductStreamSlicer
are incremental and have a concept of state they need to keep track of.
I'm not sure if it'll make the code simpler or more complicated, but conceptually, this makes me think the SubstreamPartitionRouter
should have a Cursor
when incremental_dependency
is True.
This would essentially mean that instead of keeping the parent state in a dict, it could pass the parent records to its cursor's observe method while iterating over them.
I feel 4/10 strongly on this issue so not worth blocking on.
parent_partition = parent_stream_slice.partition if parent_stream_slice else {} | ||
|
||
# we need to read all records for slice to update the parent stream cursor | ||
stream_slices_for_parent = [] | ||
for parent_record in parent_stream.read_records( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a comment here explaining that the sync mode and the stream state passed are irrelevant since we set the initial state from set_parent_state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! could you please add more tests to ensure that changes and stream state is handled correctly.
e.g. test_incremental_parent_state
- create test_source
- create stream_A (parent)
- create stream_B (child)
- run test_source.read() with/without stream_state
- assert:
- requests
- records
- stream_state (in output)
What
This PR resolves the issue with incremental substream reads in the low-code. It ensures that the state is passed to the parent stream when generating partitions to avoid reading the parent stream in full refresh mode. Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/7369
Connector's changes:
How
The code changes introduce a new field
incremental_dependency
in theParentStreamConfig
class. TheSubstreamPartitionRouter
was updated to read parent state incrementally and save the parent state when the stream slice is processed. Additionally, new default methods were added to StreamSlicer for cases when a stream slicer does not have parent streams.Review guide
User Impact
Can this PR be safely reverted and rolled back?