Source Connector Configuration
The AMQP source connector reads AMQP messages from a queue and writes them to Kafka. It formats the messages as described in the architecture documentation.
NOTE: The Key and Value sent to Kafka are Strings. Only the String converter is supported for the Key and either the JSON converter or the String converter are supported for the value. All other converters are not supported.
NOTE: At this time Aiven source connector for AMQP on Apache Kafka only supports queue type messages. This means that the messages will be distributed across all the tasks. In effect the maximum number of Kafka Connect tasks becomes the number of simultaneous messages that can be processed.
Configuration
Apache Kafka has produced a users guide that describes how to run Kafka Connect. This documentation describes the Connect workers configuration. They have also produced a good description of how Kafka Connect resumes after a failure or stoppage.
Configuration options
All the configuration options are listed below in alphabetical order.
amqp.address
-
Configuration option: amqp.address
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string without ISO control characters
-
Importance: MEDIUM
The address (topic) to listend to.
amqp.host
-
Configuration option: amqp.host
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Importance: MEDIUM
The host address for the AMQP service
amqp.password
-
Configuration option: amqp.password
-
Since: 0.1.0
-
Default value: none
-
Type: PASSWORD
-
Valid values: non-empty string without ISO control characters
-
Importance: MEDIUM
The password for the user to log into the AMQP server.
amqp.port
-
Configuration option: amqp.port
-
Since: 0.1.0
-
Default value: 5672
-
Type: INT
-
Valid values: [1,…,65534]
-
Importance: MEDIUM
The port for the AMQP server.
amqp.user
-
Configuration option: amqp.user
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string without ISO control characters
-
Importance: MEDIUM
The user to log into the AMQP server.
data.compression.type
-
Configuration option: data.compression.type
-
Since: 0.1.0
-
Default value: NONE
-
Type: STRING
-
Valid values: (case insensitive) [ZSTD, GZIP, NONE, SNAPPY]
-
Importance: MEDIUM
The compression to use to read/write data streams from the data storage. Note: different backends define the data stream differently. Please check the documentation
errors.tolerance
-
Configuration option: errors.tolerance
-
Since: 0.1.0
-
Default value: NONE
-
Type: STRING
-
Valid values: (case insensitive) [ALL, NONE]
-
Importance: MEDIUM
Indicates to the connector what level of exceptions are allowed before the connector stops.
kafka.retry.backoff.ms
-
Configuration option: kafka.retry.backoff.ms
-
Since: 0.1.0
-
Default value: none
-
Type: LONG
-
Valid values: [0 Milliseconds,…,1.0 Days]
-
Importance: MEDIUM
The retry backoff in milliseconds. This config is used to notify Kafka Connect to retry delivering a message batch or performing recovery in case of transient exceptions. Maximum value is 1.0 Days (86400000 Milliseconds)
key.converter.schema.registry.enable
-
Configuration option: key.converter.schema.registry.enable
-
Since: 0.1.0
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
The key converter schema registry enablement flag. If ‘true’ the key converter schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the key converter schema registry will be enabled if the schema.registry.enable parameter enabled.
key.converter.schema.registry.url
-
Configuration option: key.converter.schema.registry.url
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: A valid URL. URL scheme must be: ‘https’.
-
Importance: MEDIUM
Schema registry URL for key converters. If not specified the ‘schema.registry.url’ value will be used.
max.poll.records
-
Configuration option: max.poll.records
-
Since: 0.1.0
-
Default value: 500
-
Type: INT
-
Valid values: [1,…]
-
Importance: MEDIUM
Max poll records
schema.registry.enable
-
Configuration option: schema.registry.enable
-
Since: 0.1.0
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
The schema registry enablement flag. If ‘true’ the schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the registry will be enabled if the schema.registry.url parameter is set.
schema.registry.url
-
Configuration option: schema.registry.url
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: A valid URL. URL scheme must be: ‘https’.
-
Importance: MEDIUM
The default schema registry URL.
tasks.max
-
Configuration option: tasks.max
-
Since: Kafka 0.9.0.0
-
Default value: 1
-
Type: INT
-
Valid values: [1,…]
-
Importance: HIGH
Maximum number of tasks to use for this connector.
topic
-
Configuration option: topic
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string
-
Importance: MEDIUM
The name of the topic to write records to.
value.converter.schema.registry.enable
-
Configuration option: value.converter.schema.registry.enable
-
Since: 0.1.0
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
The value converter schema registry enablement flag. If ‘true’ the value converter schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the value converter schema registry will be enabled if the schema.registry.enable parameter enabled.
value.converter.schema.registry.url
-
Configuration option: value.converter.schema.registry.url
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: A valid URL. URL scheme must be: ‘https’.
-
Importance: MEDIUM
Schema registry URL for value converters. If not specified the ‘schema.registry.url’ value will be used.