Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bitsail][connector]Doris batch replace model use recordStream buffer #305

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

DongLiang-0
Copy link
Contributor

Signed-off-by:

Pre-Checklist

Note: Please complete ALL items in the following checklist.

  • I have read through the CONTRIBUTING.md documentation.
  • My code has the necessary comments and documentation (if needed).
  • I have added relevant tests.

Purpose

Some description about what this PR wants to do.

Approaches

Some description about how this PR achives the purpose.

Related Issues

e.g. Close #796

New Behavior (screenshots if needed)

N/A

Copy link
Collaborator

@BlockLiu BlockLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -73,6 +73,10 @@ public interface DorisWriterOptions extends WriterOptions.BaseWriterOptions {
key(WRITER_PREFIX + "sink_flush_interval_ms")
.defaultValue(5000);

ConfigOption<Integer> SINK_CHECK_INTERVAL =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be long type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SINK_CHECK_INTERVAL represents the time interval, not too large

}

private synchronized void startLoad(List<byte[]> flushCache) throws IOException {
this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(), true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Label maybe duplicate when multi task start load at same time, so label prefix should contains subtaskid info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Label is generated according to timestamp. Due to the synchronized keyword in this method, only a single thread can be entered at a time

private AtomicInteger cacheRecordSize;
private AtomicInteger cacheRecordCount;
private volatile boolean loading = false;
private final ArrayList<byte[]> cache = new ArrayList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: replace to linkedqueue? I think queue more match list in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good suggestion, I will improve here

this.cacheRecordSize = new AtomicInteger();
this.cacheRecordCount = new AtomicInteger();
this.scheduler = Executors.newScheduledThreadPool(1,
new BasicThreadFactory.Builder().namingPattern("Doris-replace-writer").daemon(true).build());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this thread is daemon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that when the user thread exits, the JVM not necessary to manage the checkDone program after setting it as a daemon thread

this.dorisOptions = dorisOptions;
this.labelGenerator = new LabelGenerator(dorisExecutionOptions.getLabelPrefix(), dorisExecutionOptions.isEnable2PC());
this.recordStream = new RecordStream(dorisExecutionOptions.getBufferSize(), dorisExecutionOptions.getBufferCount());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need not be a field for DorisReplaceProxy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will improve here

}
byte[] array = buf.array();
dorisStreamLoad.writeRecord(buf.array());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there need invoke twice writeRecord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be what I forgot to delete when I deleted the redundant code at the end, I will improve here

LOG.info("not loading, skip timer checker");
return;
}
if (dorisStreamLoad.getPendingLoadFuture() != null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have question about this line,does the pending load future change between the interval? i think the right pipeline should be

  1. Future pendingLoadFuture = dorisStreamLoad.getPendingLoadFuture();
  2. check pendingLoadFuture same with dorisStreamLoad.getPendingLoadFuture()
  3. check pendingLoadFuture is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a problem. As long as dorisStreamLoad.getPendingLoadFuture() is not done, it needs to be stopped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging this pull request may close these issues.

None yet

3 participants