Fork me on GitHub

Source Connector Configuration

The AMQP source connector reads AMQP messages from a queue and writes them to Kafka. It formats the messages as described in the architecture documentation.

NOTE: The Key and Value sent to Kafka are Strings. Only the String converter is supported for the Key and either the JSON converter or the String converter are supported for the value. All other converters are not supported.

NOTE: At this time Aiven source connector for AMQP on Apache Kafka only supports queue type messages. This means that the messages will be distributed across all the tasks. In effect the maximum number of Kafka Connect tasks becomes the number of simultaneous messages that can be processed.

Configuration

Apache Kafka has produced a users guide that describes how to run Kafka Connect. This documentation describes the Connect workers configuration. They have also produced a good description of how Kafka Connect resumes after a failure or stoppage.

Configuration options

All the configuration options are listed below in alphabetical order.

amqp.address

  • Configuration option: amqp.address

  • Since: 0.1.0

  • Default value: none

  • Type: STRING

  • Valid values: non-empty string without ISO control characters

  • Importance: MEDIUM

    The address (topic) to listend to.

amqp.host

  • Configuration option: amqp.host

  • Since: 0.1.0

  • Default value: none

  • Type: STRING

  • Importance: MEDIUM

    The host address for the AMQP service

amqp.password

  • Configuration option: amqp.password

  • Since: 0.1.0

  • Default value: none

  • Type: PASSWORD

  • Valid values: non-empty string without ISO control characters

  • Importance: MEDIUM

    The password for the user to log into the AMQP server.

amqp.port

  • Configuration option: amqp.port

  • Since: 0.1.0

  • Default value: 5672

  • Type: INT

  • Valid values: [1,…,65534]

  • Importance: MEDIUM

    The port for the AMQP server.

amqp.user

  • Configuration option: amqp.user

  • Since: 0.1.0

  • Default value: none

  • Type: STRING

  • Valid values: non-empty string without ISO control characters

  • Importance: MEDIUM

    The user to log into the AMQP server.

data.compression.type

  • Configuration option: data.compression.type

  • Since: 0.1.0

  • Default value: NONE

  • Type: STRING

  • Valid values: (case insensitive) [ZSTD, GZIP, NONE, SNAPPY]

  • Importance: MEDIUM

    The compression to use to read/write data streams from the data storage. Note: different backends define the data stream differently. Please check the documentation

errors.tolerance

  • Configuration option: errors.tolerance

  • Since: 0.1.0

  • Default value: NONE

  • Type: STRING

  • Valid values: (case insensitive) [ALL, NONE]

  • Importance: MEDIUM

    Indicates to the connector what level of exceptions are allowed before the connector stops.

kafka.retry.backoff.ms

  • Configuration option: kafka.retry.backoff.ms

  • Since: 0.1.0

  • Default value: none

  • Type: LONG

  • Valid values: [0 Milliseconds,…,1.0 Days]

  • Importance: MEDIUM

    The retry backoff in milliseconds. This config is used to notify Kafka Connect to retry delivering a message batch or performing recovery in case of transient exceptions. Maximum value is 1.0 Days (86400000 Milliseconds)

key.converter.schema.registry.enable

  • Configuration option: key.converter.schema.registry.enable

  • Since: 0.1.0

  • Default value: none

  • Type: BOOLEAN

  • Importance: MEDIUM

    The key converter schema registry enablement flag. If ‘true’ the key converter schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the key converter schema registry will be enabled if the schema.registry.enable parameter enabled.

key.converter.schema.registry.url

  • Configuration option: key.converter.schema.registry.url

  • Since: 0.1.0

  • Default value: none

  • Type: STRING

  • Valid values: A valid URL. URL scheme must be: ‘https’.

  • Importance: MEDIUM

    Schema registry URL for key converters. If not specified the ‘schema.registry.url’ value will be used.

max.poll.records

  • Configuration option: max.poll.records

  • Since: 0.1.0

  • Default value: 500

  • Type: INT

  • Valid values: [1,…]

  • Importance: MEDIUM

    Max poll records

schema.registry.enable

  • Configuration option: schema.registry.enable

  • Since: 0.1.0

  • Default value: none

  • Type: BOOLEAN

  • Importance: MEDIUM

    The schema registry enablement flag. If ‘true’ the schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the registry will be enabled if the schema.registry.url parameter is set.

schema.registry.url

  • Configuration option: schema.registry.url

  • Since: 0.1.0

  • Default value: none

  • Type: STRING

  • Valid values: A valid URL. URL scheme must be: ‘https’.

  • Importance: MEDIUM

    The default schema registry URL.

tasks.max

  • Configuration option: tasks.max

  • Since: Kafka 0.9.0.0

  • Default value: 1

  • Type: INT

  • Valid values: [1,…]

  • Importance: HIGH

    Maximum number of tasks to use for this connector.

topic

  • Configuration option: topic

  • Since: 0.1.0

  • Default value: none

  • Type: STRING

  • Valid values: non-empty string

  • Importance: MEDIUM

    The name of the topic to write records to.

value.converter.schema.registry.enable

  • Configuration option: value.converter.schema.registry.enable

  • Since: 0.1.0

  • Default value: none

  • Type: BOOLEAN

  • Importance: MEDIUM

    The value converter schema registry enablement flag. If ‘true’ the value converter schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the value converter schema registry will be enabled if the schema.registry.enable parameter enabled.

value.converter.schema.registry.url

  • Configuration option: value.converter.schema.registry.url

  • Since: 0.1.0

  • Default value: none

  • Type: STRING

  • Valid values: A valid URL. URL scheme must be: ‘https’.

  • Importance: MEDIUM

    Schema registry URL for value converters. If not specified the ‘schema.registry.url’ value will be used.