Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-6388] Fix kafka source can not consumer all history data when use batch mode #6685

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from

Conversation

SbloodyS
Copy link
Member

Purpose of this pull request

fix #6388

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@@ -171,6 +171,9 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
&& record.offset()
>= sourceSplit
.getEndOffset()) {
// signal to the source that we have
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @SbloodyS for create this PR. Could you explain to us the purpose of move context.signalNoMoreElement(); to here? And why this change can fix kafka source can not consumer all history data when use batch mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, In Batch mode The pollNext will only be executed once because KafkaConsumer.poll can only be consume limited data restricted by max.poll.records and its timeout. Moving it internally can enable poolNext to execute multiple times to consume data. @Hisoka-X

Copy link
Member

@Hisoka-X Hisoka-X Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add test case to reproduce this bug? Like we can set max.poll.records to 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add test case to reproduce this bug? Like we can set max.poll.records to 1?

Can you tell me where to add it? I can't find any relavant unit tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kafka source can not consumer all history data when use batch mode
2 participants