Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[compact]support compact unaware bucket for combine_mode #2858

Merged

Conversation

wg1026688210
Copy link
Contributor

@wg1026688210 wg1026688210 commented Feb 5, 2024

Refact the compaction of combine mode for supporting compacting unaware bucket table

Purpose

Linked issue: close #2670

Tests

CompactDatabaseActionITCase.java for IT case in stream and batch mode
CompactionTaskSerializerTest.java for the test of serializing compaction task
AppendOnlyMultiTableCompactionWorkerOperatorTest.java for the test of AppendOnlyMultiTableCompactionWorkerOperator

API and Format

Documentation

@wg1026688210 wg1026688210 force-pushed the compaction/support_unaware_mode branch 7 times, most recently from f6274ef to a43c51e Compare February 6, 2024 10:46
@wg1026688210 wg1026688210 changed the title poc/support_compact_unaware_bucket_for_combine_mode poc/support compact unaware bucket for combine_mode Feb 6, 2024
@wg1026688210 wg1026688210 force-pushed the compaction/support_unaware_mode branch 3 times, most recently from 65f0781 to cc99417 Compare February 6, 2024 14:00
@wg1026688210 wg1026688210 marked this pull request as ready for review February 25, 2024 14:55
@wg1026688210 wg1026688210 changed the title poc/support compact unaware bucket for combine_mode compact/support compact unaware bucket for combine_mode Feb 26, 2024
@wg1026688210 wg1026688210 changed the title compact/support compact unaware bucket for combine_mode [compact]/support compact unaware bucket for combine_mode Feb 27, 2024
@wg1026688210 wg1026688210 changed the title [compact]/support compact unaware bucket for combine_mode [compact]support compact unaware bucket for combine_mode Feb 27, 2024
@yuzelin yuzelin self-requested a review February 28, 2024 07:40
@wg1026688210 wg1026688210 force-pushed the compaction/support_unaware_mode branch from 1d80aac to c4ef352 Compare March 31, 2024 10:30
}

@VisibleForTesting
Iterable<Future<CommitMessage>> result() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any place using this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done ,this has been removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been finished at #3175


private final FileStoreTable table;
private final String commitUser;

private transient AppendOnlyFileStoreWrite write;
private UnwareBucketCompactionHelper compactionHelper;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,this has been finished at #3175

}

public AppendOnlyCompactionTask(
BinaryRow partition, List<DataFileMeta> files, Identifier identifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a new class MultiTableAppendOnlyCompactionTask which extends AppendOnlyCompactionTask seems better, in this way, we don't need Identifier.EMPTY which may cause hidden bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ,this change has been finished at #3174

Copy link
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove MultiTablesCompactorSourceFunction if you have renamed it

* unaware bucket table.
* </ol>
*/
public abstract class AbstractBucketScanLogic<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe another name? Like MultiTableScanBase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ,this has been finished at #3179

* This class is responsible for implementing the scanning logic {@link AbstractBucketScanLogic} for
* the table with multi bucket such as dynamic or fixed bucket table.
*/
public class MultiBucketScanLogic extends AbstractBucketScanLogic<Tuple2<Split, String>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MultiAwareBucketTableScan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ,this has been finished at #3179

* This class is responsible for implementing the scanning logic {@link AbstractBucketScanLogic} for
* the table with fix single bucket such as unaware bucket table.
*/
public class UnwareBucketScanLogic extends AbstractBucketScanLogic<AppendOnlyCompactionTask> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MultiUnawareBuckeTableScan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ,this has been finished at #3179

* </ol>
*/
public abstract class CompactionFileScanner<T> {
protected final AtomicBoolean isRunning;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以在 CompactionFileScanner 和 AtomicBoolean 之间加个换行

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

其他的类也一样

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ,this has been finished at #3179

@Override
public void scan(SourceFunction.SourceContext<T> ctx) throws Exception {
while (isRunning.get()) {
Boolean isEmpty = tableScanLogic.collectFiles(ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of weird here, use 'null' to tag 'end', can we find a better way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a simple Enum was added for a more clear expression at #3179

@wg1026688210 wg1026688210 force-pushed the compaction/support_unaware_mode branch from c4ef352 to 5366f01 Compare May 10, 2024 10:03
@wg1026688210 wg1026688210 force-pushed the compaction/support_unaware_mode branch 2 times, most recently from 39ef4b0 to ad6e873 Compare May 12, 2024 05:42
splits.forEach(ctx::collect);
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScanLogic =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just tableScan?

/** It is responsible for monitoring compactor source of aware bucket table in batch mode. */
public class CombinedAwareBatchSourceFunction
extends CombinedCompactorSourceFunction<Tuple2<Split, String>> {
private static final Logger LOGGER =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line feed

splits.forEach(ctx::collect);
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScanLogic =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just tableScan

*/
public class CombinedUnawareBatchSourceFunction
extends CombinedCompactorSourceFunction<MultiTableAppendOnlyCompactionTask> {
private static final Logger LOGGER =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line feed

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScanLogic =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

*/
public class CombinedUnawareStreamingSourceFunction
extends CombinedCompactorSourceFunction<MultiTableAppendOnlyCompactionTask> {
private final long monitorInterval;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line feed

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScanLogic =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


/** Compaction task for multi table . */
public class MultiTableAppendOnlyCompactionTask extends AppendOnlyCompactionTask {
private final Identifier tableIdentifier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line feed

/** Serializer for {@link MultiTableAppendOnlyCompactionTask}. */
public class MultiTableCompactionTaskSerializer
implements VersionedSerializer<MultiTableAppendOnlyCompactionTask> {
private static final int CURRENT_VERSION = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line feed

* table with multi bucket such as dynamic or fixed bucket table.
*/
public class MultiAwareBucketTableScan extends MultiTableScanBase<Tuple2<Split, String>> {
protected transient Map<Identifier, BucketsTable> tablesMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line feed (new line)

* </ol>
*/
public abstract class MultiTableScanBase<T> {
private static final Logger LOG = LoggerFactory.getLogger(MultiTableScanBase.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line feed


/** The Compactor of unaware bucket table to execute {@link AppendOnlyCompactionTask}. */
public class UnawareBucketCompactor {
private final FileStoreTable table;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line feed

Copy link
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Whole database compression combined mode does not support append unware bucket table
2 participants