Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ low-code: Add Incremental Parent State Handling to SubstreamPartitionRouter #38211

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented May 15, 2024

What

This PR resolves the issue with incremental substream reads in the low-code. It ensures that the state is passed to the parent stream when generating partitions to avoid reading the parent stream in full refresh mode. Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/7369

Connector's changes:

How

The code changes introduce a new field incremental_dependency in the ParentStreamConfig class. The SubstreamPartitionRouter was updated to read parent state incrementally and save the parent state when the stream slice is processed. Additionally, new default methods were added to StreamSlicer for cases when a stream slicer does not have parent streams.

Review guide

User Impact

  • The end result is an optimized incremental substream reading process, which improves sync performance by reducing the need to read the parent stream in full refresh mode.
  • There are no negative side effects anticipated.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Copy link

vercel bot commented May 15, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jun 7, 2024 6:24pm

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit connectors/source/slack labels May 15, 2024
@tolik0 tolik0 self-assigned this May 15, 2024
@tolik0 tolik0 force-pushed the tolik0/airbyte-cdk/add-incremental-substreams branch from 611acee to 11851ee Compare May 16, 2024 15:03
@tolik0 tolik0 marked this pull request as ready for review May 23, 2024 12:59
@tolik0 tolik0 requested a review from a team as a code owner May 23, 2024 12:59
@tolik0 tolik0 requested a review from artem1205 May 23, 2024 13:00
@artem1205
Copy link
Collaborator

Please split CDK and connectors changes into different PRs

@tolik0
Copy link
Contributor Author

tolik0 commented May 23, 2024

@artem1205 I will create separate PRs for connectors; I just added these changes here for now for easier testing. This way we can execute regression tests from this branch.

@@ -28,3 +28,21 @@ def stream_slices(self) -> Iterable[StreamSlice]:

:return: List of stream slices
"""

def set_parent_state(self, stream_state: StreamState) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is only used on partition routers so I think it would be worth creating a new PartitionRouter interface that would be more specific.

We should also leave the two methods as abstract to ensure developers explicitly define them instead of relying on a default behavior that may or may not make sense for their use case

Copy link
Contributor Author

@tolik0 tolik0 May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any stream slicer can handle parent states. For example CartesianProductStreamSlicer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeclarativeCursor doesn't, right?

I think it makes sense for CartesianProductStreamSlicer for also being a PartitionRouter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed CartesianProductStreamSlicer to PartitionRouter


def set_parent_state(self, stream_state: StreamState) -> None:
"""
Set the state of the parent streams.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the docstring should be more specific about the parameter expectations. stream_state must have parent_state which is not obvious from the signature. It might even be worth creating a new class to codify the expectation (kind of like how StreamSlice codifies the presence of a partition and a cursor_slice)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated docs for all classes


yield from stream_slices_for_parent

def set_parent_state(self, stream_state: Optional[StreamState]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is stream_state an Optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -86,6 +86,8 @@ def set_initial_state(self, stream_state: StreamState) -> None:
for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])

self._partition_router.set_parent_state(stream_state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this deserves a comment as it's not obvious why we need to update the router's parent state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading through the code again. We're passing the full state, not just the parent_state. Is there a reason this can't just call the router's set_initial_state method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream slicers and partition routers don't keep track of the cursor and don't have a set_initial_state method. set_parent_state sets the state to the parent streams that partition routers operate on, if such parent streams exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to me this means we want to add a set_initial_state method to the routers - from the caller's perspective, it doesn't really matter that it's setting a parent state. It's an implementation detail.

Furthermore, not all stream slicers know or care about parents (eg list partition router)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit: I do think the state being set is somewhat different, but the interface feels very strange to me for two reasons

  1. Not every stream slicers should know or care about parent streams
  2. even from the caller, the fact that it's updating a parent stream state is irrelevant. It's just passing the whole state object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same goes with set_initial_state for stream slicers, as not every stream slicer cares about the state. Introducing a set_initial_state method could create confusion, implying that partition routers are somehow dependent on the state. set_parent_state clearly articulates that it is only connected to the parent streams, as the stream interacts with the parent stream through the partition router.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's exactly the point - we're now saying that SubstreamPartitionRouter and CartesianProductStreamSlicer are incremental and have a concept of state they need to keep track of.

I'm not sure if it'll make the code simpler or more complicated, but conceptually, this makes me think the SubstreamPartitionRouter should have a Cursor when incremental_dependency is True.

This would essentially mean that instead of keeping the parent state in a dict, it could pass the parent records to its cursor's observe method while iterating over them.

I feel 4/10 strongly on this issue so not worth blocking on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the methods' names. Regarding using a Cursor to keep track of the state, it is already managed by the cursor of parent streams. Partition routers just retrieve the state from the internal parent streams.

@girarda girarda requested a review from brianjlai May 26, 2024 22:25
@tolik0 tolik0 requested a review from girarda May 28, 2024 15:03
@@ -28,3 +28,21 @@ def stream_slices(self) -> Iterable[StreamSlice]:

:return: List of stream slices
"""

def set_parent_state(self, stream_state: StreamState) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeclarativeCursor doesn't, right?

I think it makes sense for CartesianProductStreamSlicer for also being a PartitionRouter

@@ -28,3 +28,45 @@ def stream_slices(self) -> Iterable[StreamSlice]:

:return: List of stream slices
"""

def set_parent_state(self, stream_state: StreamState) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave the method as abstract to force the concrete class to implement it (or intentionally implement a no-op)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

"""
pass

def get_parent_state(self) -> Optional[Mapping[str, StreamState]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave the method as abstract to force the concrete class to implement it (or intentionally implement a no-op)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -86,6 +86,8 @@ def set_initial_state(self, stream_state: StreamState) -> None:
for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])

self._partition_router.set_parent_state(stream_state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's exactly the point - we're now saying that SubstreamPartitionRouter and CartesianProductStreamSlicer are incremental and have a concept of state they need to keep track of.

I'm not sure if it'll make the code simpler or more complicated, but conceptually, this makes me think the SubstreamPartitionRouter should have a Cursor when incremental_dependency is True.

This would essentially mean that instead of keeping the parent state in a dict, it could pass the parent records to its cursor's observe method while iterating over them.

I feel 4/10 strongly on this issue so not worth blocking on.

parent_partition = parent_stream_slice.partition if parent_stream_slice else {}

# we need to read all records for slice to update the parent stream cursor
stream_slices_for_parent = []
for parent_record in parent_stream.read_records(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment here explaining that the sync mode and the stream state passed are irrelevant since we set the initial state from set_parent_state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Collaborator

@artem1205 artem1205 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! could you please add more tests to ensure that changes and stream state is handled correctly.
e.g. test_incremental_parent_state

  1. create test_source
  2. create stream_A (parent)
  3. create stream_B (child)
  4. run test_source.read() with/without stream_state
  5. assert:
    • requests
    • records
    • stream_state (in output)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants