Skip to content

jcustenborder/kafka-connect-cdc-postgres

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Limitations

  1. Timestamp is when the record is read by Kafka, not when it is generated.
  2. Everything is done in the initial database. Can logical replication span databases or is the replication slot to a single database?

Configuration

PostgreSqlSourceConnector

The PostgreSql uses the Logical Decoding feature in PostgreSql 9.x. This will allow this connector to work with Amazon RDS for PostgreSQL.

name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.cdc.postgres.PostgreSqlSourceConnector

# Set these required values
initial.database=
server.name=
postgres.replication.slot.name=
password=
server.port=
username=
Name Description Type Default Valid Values Importance
initial.database The initial database to connect to. string high
password JDBC Password to connect to the database with. password high
postgres.replication.slot.name The PostgreSQL replication slot name to connect to. string high
server.name The server to connect to. string high
server.port The port on the server to connect to. int high
username JDBC Username to connect to the database with. string high
schema.key.name.format Format used to generate the name for the key schema. The following template properties are available for string replacement. ${databaseName}, ${schemaName}, ${tableName}, ${namespace} string ${namespace}.${tableName}Key high
schema.namespace.format The namespace for the schemas generated by the connector. The following template properties are available for string replacement. ${databaseName}, ${schemaName}, ${tableName}, ${namespace} string com.example.data.${databaseName} high
schema.value.name.format Format used to generate the name for the value schema. The following template properties are available for string replacement. ${databaseName}, ${schemaName}, ${tableName}, ${namespace} string ${namespace}.${tableName}Value high
topicFormat.format The topicFormat to write the data to. string ${databaseName}.${tableName} high
jdbc.pool.max.idle The maximum number of idle CONNECTIONS in the connection pool. int 10 medium
jdbc.pool.max.total The maximum number of CONNECTIONS for the connection pool to open. If a number greater than this value is requested, the caller will block waiting for a connection to be returned. int 30 medium
jdbc.pool.min.idle The minimum number of idle CONNECTIONS in the connection pool. int 3 medium
backoff.time.ms The number of milliseconds to wait when no records are returned. int 1000 [50,...] low
batch.size The number of records to return in a batch. int 512 [1,...] low
schema.cache.ms The number of milliseconds to cache schema metadata in memory. int 300000 [60000,...] low
schema.caseformat.column.name This setting is used to control how the column names are cased when the resulting schemas are generated. string NONE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} low
schema.caseformat.database.name This setting is used to control how the ${databaseName} variable is cased when it is passed to the formatters defined in the schema.namespace.format, schema.key.name.format, schema.value.name.format, topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a database name of USER_TRACKING to a more java like case of userTracking or all lowercase usertracking. string NONE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} low
schema.caseformat.input The naming convention used by the database format. This is used to define the source naming convention used by the other schema.caseformat.* properties. string UPPER_UNDERSCORE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} low
schema.caseformat.schema.name This setting is used to control how the ${schemaName} variable is cased when it is passed to the formatters defined in the schema.namespace.format, schema.key.name.format, schema.value.name.format, topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a schema name of SCOTT to a more java like case of Scott or all lowercase scott. string NONE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} low
schema.caseformat.table.name This setting is used to control how the ${tableName} variable is cased when it is passed to the formatters defined in the schema.namespace.format, schema.key.name.format, schema.value.name.format, topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a table name of USER_SETTING to a more java like case of UserSetting or all lowercase usersetting. string NONE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} low

About

Kafka Connect connector for CDC data from postgres

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published