AMQP Source Connector Architecture Notes
Aiven source connector for AMQP on Apache Kafka connects to an AMQP provider using the protonj2 client from the Apache QPID™ messaging library to read AMQP messages. It utilizes the Aiven Kafka Connector Framework to do most of the heavy lifting.
At this time Aiven source connector for AMQP on Apache Kafka only supports queue type messages. This means that the Apache QPID will distribute the inbound messages across all the listeners. In effect the number of Kafka Connect tasks becomes the number of simultaneous messages that can be processed.
The AMQP message provider contract does not require that a message have a unique key, the connector framework expects that each message will have a unique ID and that those IDs can be lexically ordered. The framework calls these IDs NativeKeys. To meet the native key requirement we assign every message that arrives in the system a ULID. We use the Sulky java implementation of ULID from Jörn Huxhorn.
Each message is parsed into a JSON document that contains all the components of the message. Each AMQP message creates one Kafka message. The extraction of the kafka message body from the AMQP message occurs in the AmqpExtractor class.
Data Mapping
The JSON object generated from the message has a number of properties. If the property listed below as not set in the message it will not be present. The potential properties are:
absoluteExpiry: The absolute time for the expiration of the message.annotations: Conceptually a map of strings to objects. The JSON serialization of this is a standard JSON map serialization.body: The body of the message may be a List of objects, a byte array, or a single object. Byte arrays are always encoded using Base64 encoding.contentEncoding: The content-encoding property is used as a modifier to the content-type. When present, its value indicates what additional content encodings have been applied to the application-data, and thus what decoding mechanisms need to be applied in order to obtain the media-type referenced by the content-type header field.contentType: The RFC-2046 MIME type for the message's application-data section (body). As per RFC-2046 this can contain a charset parameter defining the character encoding used: e.g., ‘text/plain; charset=“utf-8”’.correlationId: An id that correlates messages together.creationTime: the absolute time for the creation of the message.deliveryCount: the number of failed delivery attempts this message has been part of.durable: the flag that indicates if the message is durable.firstAcquirer: if this message has been acquired by another link previously.footers: conceptually a map of strings to objects. The JSON serialization of this is a standard JSON map serialization.groupId: the group id for the message.groupSequence: the group sequence number for the message.messageId: The message Id if any was provided.properties: conceptually a map of strings to objects. The JSON serialization of this is a standard JSON map serialization.replyTo: the suggested reply destination.replyToGroup: the group to which replies should be sent.subject: the subject of the messageto: The intended destination of the message if set.userId: The Id of the user that created the message if it was provided.
Data Serialization
Data is serialized using the Jackson JSON serialization libraries. The system can serialize any object that Jackson can natively serialize. In addition, custom serializers are used to serialize the AMQP native components so that they can be rendered by into a JSON format that is readable outside the Kafka environment. The Kafka data is a JSON string. Using the JSON converter will return the expected JSON object, using a String converter will return the JSON string. All other converters will probably fail to work.