Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Commit coordination #10351

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented May 18, 2024

This PR is the next stage in submitting the Kafka Connect Iceberg sink connector, and is a follow up to #8701, #9466, and #9641. It includes the commit coordinator and related tests.

Still not included for the sink are the integration tests, distribution build, or docs, which will be added in follow up PRs. For reference, the current sink implementation can be found at https://github.com/tabular-io/iceberg-kafka-connect, and you can read some existing docs at https://github.com/tabular-io/iceberg-kafka-connect/tree/main/docs.

@@ -45,7 +45,7 @@ private EventTestUtil() {}
new Schema(ImmutableList.of(Types.NestedField.required(1, "id", Types.LongType.get())));

static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(1).build();
PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(0).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: default is 0, so no need to explicitly set it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I changed this.

}

// use reflection here to avoid requiring Hadoop as a dependency
private static Object loadHadoopConfig(IcebergSinkConfig config) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to org.apache.iceberg.CatalogUtil so that it can be used by java API folks also if they have hadoop conf directory?

This method can accept String hadoopConfDir and config.hadoopProps() instead of sink config. In that case no need of this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that could be useful, though it may require some discussion to get right, so I'd rather start with this here.

import org.apache.iceberg.connect.channel.CommitterImpl;

public class CommitterFactory {
public static Committer createCommitter(IcebergSinkConfig config) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is config needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't needed now, but possibly in the future, to indicate the type of committer to create. The API is public so I designed it with that in mind.

@@ -80,7 +80,6 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the connect consumer group now, rather than a separate consumer group that we keep in sync.

public void put(Collection<SinkRecord> sinkRecords) {
if (committer != null) {
committer.save(sinkRecords);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should throw an exception when committer is null else the producer will assume it has been put?

Copy link
Contributor Author

@bryanck bryanck Jun 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen, so I changed this to a precondition check. I'll revisit this if needed when I add the integration tests.

.filter(distinctByKey(dataFile -> dataFile.path().toString()))
.collect(Collectors.toList());

List<DeleteFile> deleteFiles =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only supporting append, do we need this code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments for below RowDelta block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep the coordinator capable of handling delete files, so when we do add in delta support, it should be fairly straightforward, and won't require changes to the control message data model.

}

@Test
public void testCommitDelta() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe delete file tests can be added when we have delete writers feature added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above, I'd prefer to add this into the coordinator now, rather than later.

@ajantha-bhat ajantha-bhat added this to the Iceberg 1.6.0 milestone May 21, 2024
Comment on lines 87 to 90
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
committer.save(null);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we overriding the flush method when we don't have any flush-specific code path in Committer?

The put method will be called on a regular basis (potentially with an empty collection of sink records) so this feels redundant.

Also, I'm fairly certain that this flush method will never actually be called since we are overriding the preCommit method. The default preCommit implementation is the only place where flush is called by the Kafka Connect runtime. Unless you call flush yourself in the preCommit method you've defined (or anywhere else), flush will never actually be called.

Overall, I would recommend you just omit this flush method definition from this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here in case future committer implementations want to perform an action on flush. The current committer doesn't do anything when the record collection is null.

Comment on lines +57 to +78
@Override
public void close(Collection<TopicPartition> partitions) {
close();
}

private void close() {
if (committer != null) {
committer.stop();
committer = null;
}

if (catalog != null) {
if (catalog instanceof AutoCloseable) {
try {
((AutoCloseable) catalog).close();
} catch (Exception e) {
LOG.warn("An error occurred closing catalog instance, ignoring...", e);
}
}
catalog = null;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you move these close methods so they're after preCommit but before stop? Just so these methods are arranged in life-cycle order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow your suggestion. When the KC close() or stop() lifecycle methods are called, we close the committer and the catalog.


@Override
public void open(Collection<TopicPartition> partitions) {
catalog = CatalogUtils.loadCatalog(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure about this but IIRC from past experience, Kafka Connect doesn't really guarantee that close will be called before open if a SinkTask instance is reused, this can lead to resource leaks (or worse). So I usually defensively call close myself in the open method first before opening new resources (catalog and committer in this case).

Note: I can't find any documentation/issues to explicitly support this right now but looking at how things are implemented in the Kafka Connect runtime using the ConsumerRebalanceListener API I think I can see at least one way where this would be possible (open called without/before close).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added a precondition check here to be safer.


@Override
public void open(Collection<TopicPartition> partitions) {
catalog = CatalogUtils.loadCatalog(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open method is called with only the newly assigned partitions. Is there a strong reason to pass just the newly assigned partitions to the Committer.start method when the Committer can just retrieve all partitions assigned to this task via context.assignment anyway?

I'm also worried we might have a bug here. The Committer implementation uses this partitions argument to check if partition 0 of the first topic is assigned to this task and if so, it spawns a Coordinator process. I'm worried that if there was a rebalance where the partition 0 of the first topic doesn't move between tasks, then it would not be included in the partitions argument for any Task and thus we could potentially end up with a Connector that doesn't have any Coordinator process running on any Task. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not my understanding, open() will be called with the new assignment, i.e. all assigned topic partitions. See the javadoc: "The list of partitions that are now assigned to the task (may include partitions previously assigned to the task)"


@Override
public String version() {
return IcebergSinkConfig.version();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this method was implemented before this PR but I have a question about this.
This method returns a String that looks something like this:

IcebergBuild.version() + "-kc-" + kcVersion;

where kcVersion = IcebergSinkConfig.class.getPackage().getImplementationVersion().

Won't kcVersion and IcebergBuild.version() be the same value since AFAIK this connector's releases will be tied with the general iceberg releases? CMIIW

Note: when I run the existing unit test locally, I get 1.6.0-SNAPHSHOT-kc-unknown currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this, we can just use the Iceberg version now. I left in the config method in case we want to add to this later.

private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class);

private IcebergSinkConfig config;
private Catalog catalog;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know Catalog objects are not guaranteed to be thread-safe.
And at least on the "leader" tasks, we can have multiple threads using the same Catalog instance at the same time ("leader" tasks have both a main thread as well as a CoordinatorThread).
I haven't heard users report any issues about this but it would be better to avoid this risk entirely?


private void routeRecordDynamically(SinkRecord record) {
String routeField = config.tablesRouteField();
Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really be checked at config parsing time or at SinkWriter construction time?
Instead of on every record.
To be clear; I'm not worried about this from a performance perspective (I'm confident the JVM will optimize this away) but it just seems awkward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already check this in the config, so I'll just remove this.

new TopicPartition(record.topic(), record.kafkaPartition()),
new Offset(record.kafkaOffset() + 1, timestamp));

if (config.dynamicTablesEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we really don't need to check this on every record?

Feels like we're missing an abstraction here, something like the concept of a Router which has a StaticRouter implementation and a DynamicRouter implementation and only one of those is constructed for the lifetime of a SinkWriter based on the config.dynamicTablesEnabled() setting.

Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");

String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should throw an error instead of dropping the record.
Users can filter out messages easily using an SMT to get the same behaviour, if necessary.
In the future, I imagine we can allow users to supply custom exception-handler implementations which could also allow users to drop records on error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel skipping is better than getting into a state where the sink can no longer progress. When we add DLQ support then we could route to that.

Comment on lines +120 to +124
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
String tableName = routeValue.toLowerCase();
writerForTable(tableName, record, true).write(record);
}
Copy link
Contributor

@fqaiser94 fqaiser94 May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support writing to multiple tables if this is a comma separate list of table names? Like we do in "static" mode.

Unless there are other concerns, I think this is an important building block for more advanced functionality e.g. we can remove route-value-regex-table-routing-mode as it could be implemented entirely as an SMT + dynamic mode.

Comment on lines +101 to +112
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
config
.tables()
.forEach(
tableName -> {
Pattern regex = config.tableConfig(tableName).routeRegex();
if (regex != null && regex.matcher(routeValue).matches()) {
writerForTable(tableName, record, false).write(record);
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this really "static" routing? I guess it's a static list of tables, but the table/s each message is written to is determined dynamically based on the route value ...

More importantly, is this an important enough use-case to support within the connector? I would strongly prefer if we didn't support this within the connector itself (users can easily implement this by writing an SMT + dynamic mode).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static in the sense that the list of tables is fixed and doesn't change, rather than deriving it from the record. This feature is in use by some.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants