You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When attempting to use the Neo4j Kafka Connect Source connector configured with an AVRO value converter and corresponding schema in Schema Registry, the connector is unable to correctly produce records due to serialization errors.
Expected Behavior (Mandatory)
When the connector is configured as Source in QUERY mode with a valid AVRO schema, the messages should be correctly serialized and produced to the configured topic.
Actual Behavior (Mandatory)
The connector throws an error due to it being unable to parse messages properly with the configured AVRO schema.
While testing different configurations, a few issues surfaced:
When the AVRO schema has fields configured as enum, and an incoming message has a valid value for the field, the connector fails with org.apache.avro.AvroTypeException: Not an enum: {EXPECTED_VALUE} for schema.
When disabling the neo4j.enforce.schema option, and having a valid incoming message, the connector fails with java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.IndexedRecord.
When one of the returned fields by the Cypher query is null, the connector fails with org.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field {FIELD_NAME} cannot be null.
How to Reproduce the Problem
Dataset and configurations
Connector config
{
"name": "Neo4jSourceConnectorAVRO",
"config": {
"topic": "neo4j.test.avro",
"connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.use.latest.version": "true",
"value.converter.enhanced.avro.schema.support": "true",
"value.converter.latest.compatibility.strict": "false",
"value.converter.auto.register.schemas": "false",
"value.converter.connect.meta.data": "true",
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.streaming.poll.interval.msecs": 5000,
"neo4j.streaming.from": "LAST_COMMITTED",
"neo4j.enforce.schema": "true",
"neo4j.source.query": "MATCH (n:TestNode) WHERE n.lastModifiedAt > datetime({epochMillis: toInteger($lastCheck)}) RETURN n.uuid AS uuid, n.name AS name, n.company AS company, n.numericId AS numericId, n.entityType AS entityType, n.status AS status, n.isDeleted AS isDeleted;"
}
}
Create a topic and an AVRO schema for it, with the following field types:
A mandatory STRING field
A mandatory STRING field of uuid logicalType
An optional STRING field [null, string]
A mandatory INT field
A mandatory ENUM field
An optional ENUM field (null, [enum1, enum2])
A mandatory boolean field
Create a Neo4j connector with the provided configuration
Create or update a TestNode to stream the expected values matching the defined schema
Test cases and stacktraces
This are the results when:
I create a node with all correctly populated properties (like in the provided sample), the Neo4j connector fails with this stacktrace, indicating that the value found Company is not part of the expected ENUMS of Company, Subcompany for the entityType field:
connect | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic neo4j.test.avro :
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
connect | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$9(AbstractWorkerSourceTask.java:495)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
connect | ... 12 more
connect | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166)
connect | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
connect | ... 16 more
connect | Caused by: org.apache.avro.AvroTypeException: Not an enum: Company for schema: {"type":"enum","name":"entityTypeTypes","namespace":"com.mycompany.neo4j.avro","symbols":["Company","Subcompany"]} in field entityType
connect | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic neo4j.test.avro :
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
connect | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$9(AbstractWorkerSourceTask.java:495)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
connect | ... 12 more
connect | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166)
connect | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
connect | ... 16 more
connect | Caused by: org.apache.avro.AvroTypeException: Not an enum: Company for schema: {"type":"enum","name":"entityTypeTypes","namespace":"com.mycompany.neo4j.avro","symbols":["Company","Subcompany"]} in field entityType
connect | at org.apache.avro.generic.GenericDatumWriter.addAvroTypeMsg(GenericDatumWriter.java:198)
connect | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:231)
connect | at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
connect | at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:180)
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
connect | ... 18 more
connect | Caused by: org.apache.avro.AvroTypeException: Not an enum: Company for schema: {"type":"enum","name":"entityTypeTypes","namespace":"com.mycompany.neo4j.avro","symbols":["Company","Subcompany"]}
connect | at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:241)
connect | at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:134)
connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
connect | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221)
connect | ... 24 more
With the same configuration and same node update, but setting neo4j.enforce.schema to false, it produces a different stack trace and fails:
connect | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic neo4j.test.avro :
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
connect | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$9(AbstractWorkerSourceTask.java:495)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
connect | ... 12 more
connect | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166)
connect | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
connect | ... 16 more
connect | Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.IndexedRecord (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.avro.generic.IndexedRecord is in unnamed module of loader 'app')
connect | at org.apache.avro.generic.GenericData.getField(GenericData.java:846)
connect | at org.apache.avro.generic.GenericData.getField(GenericData.java:865)
connect | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:219)
connect | at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
connect | at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:180)
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
connect | ... 18 more
When I update the property company in the node to null, and neo4j.enforce.schema is true, the connector fails and produces:
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field company cannot be null.
connect | at streams.kafka.connect.source.Neo4jSourceService.checkError(Neo4jSourceService.kt:131)
connect | at streams.kafka.connect.source.Neo4jSourceService.poll(Neo4jSourceService.kt:139)
connect | at streams.kafka.connect.source.Neo4jSourceTask.poll(Neo4jSourceTask.kt:31)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:470)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:349)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field company cannot be null.
connect | at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:327)
connect | at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.schema(ConnectExtensionFunctions.kt:33)
connect | at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.asStruct(ConnectExtensionFunctions.kt:39)
connect | at streams.kafka.connect.source.SourceRecordBuilder.build(SourceRecordBuilder.kt:59)
connect | at streams.kafka.connect.source.Neo4jSourceService.toSourceRecord(Neo4jSourceService.kt:107)
connect | at streams.kafka.connect.source.Neo4jSourceService.access$toSourceRecord(Neo4jSourceService.kt:28)
connect | at streams.kafka.connect.source.Neo4jSourceService$job$1.invokeSuspend$lambda$1(Neo4jSourceService.kt:84)
connect | at org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)
connect | at org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)
connect | at org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)
connect | at org.neo4j.driver.internal.InternalSession.readTransaction(InternalSession.java:103)
connect | at streams.kafka.connect.source.Neo4jSourceService$job$1.invokeSuspend(Neo4jSourceService.kt:79)
connect | at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
connect | at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
connect | at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
connect | at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
connect | at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
connect | at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
connect | at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
connect | at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
And with the same update as in the previous point, but with neo4j.enforce.schema set to false, a different error happens:
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic neo4j.test.avro :
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
connect | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$9(AbstractWorkerSourceTask.java:495)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
connect | ... 12 more
connect | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166)
connect | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
connect | ... 16 more
connect | Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.IndexedRecord (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.avro.generic.IndexedRecord is in unnamed module of loader 'app')
connect | at org.apache.avro.generic.GenericData.getField(GenericData.java:846)
connect | at org.apache.avro.generic.GenericData.getField(GenericData.java:865)
connect | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:219)
connect | at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
connect | at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:180)
connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
connect | ... 18 more
Specifications and Versions (Mandatory)
The initial issues with the AVRO schema were found with the following specs:
Neo4j Enterprise edition on version 4.4.19
Neo4j Kafka Connect connector on version 5.0.0-oss over confluentinc/cp-kafka-connect-base:7.3.1
Schema Registry on version 7.0.0
Kafka cluster on version 3.5.1
To replicate the issue and discard older version issues, I used the config provided above through the docker-compose file provided:
Neo4j Enterprise edition on latest version (5.16.0)
Neo4j Kafka Connect connector on latest version (5.0.3) from confluent hub.
Schema Registry on version 7.3.0
I have also tested the schema separately with other connectors and with a custom Producer using the schema without any issues.
The text was updated successfully, but these errors were encountered:
Hi @Romsick. Thank you for flagging it.
Unfortunately, Kafka Connect Neo4j Source Connector doesn’t support the Avro enum type. I would recommend converting them to string and implementing validation on the consumer side.
Regarding nullable properties and the error you receive on null company property. Neo4j is a schema-less database. Schema is derived based on individual messages and thus can conflict if a node or relationship within the same label or type has a different set of properties. I would recommend to ensure all the nodes have the same set of properties. For example, in your example, you could treat null properties as empty strings and filter out them on the consumer side. You could also leverage existence and type constraints. Although some of them might need you to upgrade to the later Neo4j Enterprise version. Alternatively, you can try a schema-less format, that does not require a Schema Registry like JSON.
Edit: The main reason why Kafka Connect Neo4j Source Connector doesn’t support the Avro enum type because Kafka Connect schema doesn't support Enum type.
Description
When attempting to use the Neo4j Kafka Connect Source connector configured with an AVRO value converter and corresponding schema in Schema Registry, the connector is unable to correctly produce records due to serialization errors.
Expected Behavior (Mandatory)
When the connector is configured as Source in QUERY mode with a valid AVRO schema, the messages should be correctly serialized and produced to the configured topic.
Actual Behavior (Mandatory)
The connector throws an error due to it being unable to parse messages properly with the configured AVRO schema.
While testing different configurations, a few issues surfaced:
When the AVRO schema has fields configured as
enum
, and an incoming message has a valid value for the field, the connector fails withorg.apache.avro.AvroTypeException: Not an enum: {EXPECTED_VALUE} for schema
.When disabling the
neo4j.enforce.schema
option, and having a valid incoming message, the connector fails withjava.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.IndexedRecord
.When one of the returned fields by the Cypher query is
null
, the connector fails withorg.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field {FIELD_NAME} cannot be null
.How to Reproduce the Problem
Dataset and configurations
Connector config
AVRO schema
Sample test Cypher to produce records for the connector
Steps (Mandatory)
uuid
logicalTypeTest cases and stacktraces
This are the results when:
Company
is not part of the expected ENUMS ofCompany
,Subcompany
for theentityType
field:neo4j.enforce.schema
tofalse
, it produces a different stack trace and fails:company
in the node tonull
, andneo4j.enforce.schema
istrue
, the connector fails and produces:neo4j.enforce.schema
set tofalse
, a different error happens:Specifications and Versions (Mandatory)
The initial issues with the AVRO schema were found with the following specs:
confluentinc/cp-kafka-connect-base:7.3.1
To replicate the issue and discard older version issues, I used the config provided above through the docker-compose file provided:
I have also tested the schema separately with other connectors and with a custom Producer using the schema without any issues.
The text was updated successfully, but these errors were encountered: