Fork me on GitHub

Configuration options

All the configuration options are listed below in alphabetical order.

data.compression.type

  • Configuration option: data.compression.type

  • Since: 0.2.0

  • Default value: NONE

  • Type: STRING

  • Valid values: (case insensitive) [ZSTD, GZIP, NONE, SNAPPY]

  • Importance: MEDIUM

    The compression to use to read/write data streams from the data storage. Note: different backends define the data stream differently. Please check the documentation

kafka.retry.backoff.ms

  • Configuration option: kafka.retry.backoff.ms

  • Since: 0.2.0

  • Default value: none

  • Type: LONG

  • Valid values: [0 Milliseconds,…,1.0 Days]

  • Importance: MEDIUM

    The retry backoff in milliseconds. This config is used to notify Kafka Connect to retry delivering a message batch or performing recovery in case of transient exceptions. Maximum value is 1.0 Days (86400000 Milliseconds)

key.converter.schema.registry.enable

  • Configuration option: key.converter.schema.registry.enable

  • Since: 0.2.0

  • Default value: none

  • Type: BOOLEAN

  • Importance: MEDIUM

    The key converter schema registry enablement flag. If ‘true’ the key converter schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the key converter schema registry will be enabled if the schema.registry.enable parameter enabled.

key.converter.schema.registry.url

  • Configuration option: key.converter.schema.registry.url

  • Since: 0.2.0

  • Default value: none

  • Type: STRING

  • Valid values: A valid URL. URL scheme must be: ‘https’.

  • Importance: MEDIUM

    Schema registry URL for key converters. If not specified the ‘schema.registry.url’ value will be used.

salesforce.api.version

  • Configuration option: salesforce.api.version

  • Since: 0.2.0

  • Default value: v65.0

  • Type: STRING

  • Valid values: non-empty string

  • Importance: MEDIUM

    API version of the Salesforce API to use when communicating with Salesforce, default value is v65.0

salesforce.bulk.api.sink.object

  • Configuration option: salesforce.bulk.api.sink.object

  • Since: 0.2.0

  • Default value: none

  • Type: STRING

  • Importance: MEDIUM

    The destination object type that will be written to Salesforce (such as Account or Contact)

salesforce.client.id

  • Configuration option: salesforce.client.id

  • Since: 0.2.0

  • Default value: none

  • Type: PASSWORD

  • Importance: MEDIUM

    Salesforce client id that is used to authenticate over oauth with the api.

salesforce.client.secret

  • Configuration option: salesforce.client.secret

  • Since: 0.2.0

  • Default value: none

  • Type: PASSWORD

  • Importance: MEDIUM

    Salesforce client secret that is used to authenticate over oauth with the api.

salesforce.max.records

  • Configuration option: salesforce.max.records

  • Since: 0.2.0

  • Default value: 50000

  • Type: INT

  • Valid values: [1,…,150000]

  • Importance: MEDIUM

    Salesforce default maximum number of records to retrieve from the Bulk API. Must be at least 100 and at most 100000, default value is 50000

salesforce.max.retries

  • Configuration option: salesforce.max.retries

  • Since: 0.2.0

  • Default value: 3

  • Type: INT

  • Valid values: [1,…,5]

  • Importance: MEDIUM

    Salesforce default maximum number of retries against API. Must be at least 1 and at most 5, default value is 3

salesforce.oauth.uri

  • Configuration option: salesforce.oauth.uri

  • Since: 0.2.0

  • Default value: none

  • Type: STRING

  • Valid values: non-empty string

  • Importance: MEDIUM

    Salesforce oauth uri that is used to authenticate over oauth with the api, this is a uri specific to your organization and domain supplied by Salesforce.

salesforce.soql.query.wait

  • Configuration option: salesforce.soql.query.wait

  • Since: 0.2.0

  • Default value: 300

  • Type: LONG

  • Valid values: [1,…,604800]

  • Importance: MEDIUM

    salesforce.soql.query.wait allows a user to configure the minimum time in seconds between re-executing the same SOQL query against the API the default value is 300 seconds. Minimum 1 second delay and a maximum of 604800 seconds or one week.

salesforce.status.check.wait

  • Configuration option: salesforce.status.check.wait

  • Since: 0.2.0

  • Default value: 5

  • Type: LONG

  • Valid values: [5,…,3600]

  • Importance: MEDIUM

    salesforce.status.check.wait allows a user to configure the time in seconds between individual api calls to check the status of a bulk job. Each SOQL query requires 1 or many calls to see if the job is ready to be processed, this configuration allows the user to reduce or increase the number of calls made to the api to check the status. 300 seconds. Minimum 5 seconds and maximum 3600 seconds or one hour.

salesforce.uri

  • Configuration option: salesforce.uri

  • Since: 0.2.0

  • Default value: none

  • Type: STRING

  • Valid values: non-empty string

  • Importance: MEDIUM

    Salesforce domain uri that is used to query the bulk api this is a uri specific to your organization and domain supplied by Salesforce.

schema.registry.enable

  • Configuration option: schema.registry.enable

  • Since: 0.2.0

  • Default value: none

  • Type: BOOLEAN

  • Importance: MEDIUM

    The schema registry enablement flag. If ‘true’ the schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the registry will be enabled if the schema.registry.url parameter is set.

schema.registry.url

  • Configuration option: schema.registry.url

  • Since: 0.2.0

  • Default value: none

  • Type: STRING

  • Valid values: A valid URL. URL scheme must be: ‘https’.

  • Importance: MEDIUM

    The default schema registry URL.

tasks.max

  • Configuration option: tasks.max

  • Since: Kafka 0.9.0.0

  • Default value: 1

  • Type: INT

  • Valid values: [1,…]

  • Importance: HIGH

    Maximum number of tasks to use for this connector.

topic.prefix

  • Configuration option: topic.prefix

  • Since: 0.2.0

  • Default value: none

  • Type: STRING

  • Importance: HIGH

    topic.prefix is a required to determine what topic to put the events onto, the prefix is used as `<topic.prefix>.<api_name>.<object_name>` for example if set to history then events from the Account table from the bulk api will be produced to history.bulkApi.Account.

value.converter.schema.registry.enable

  • Configuration option: value.converter.schema.registry.enable

  • Since: 0.2.0

  • Default value: none

  • Type: BOOLEAN

  • Importance: MEDIUM

    The value converter schema registry enablement flag. If ‘true’ the value converter schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the value converter schema registry will be enabled if the schema.registry.enable parameter enabled.

value.converter.schema.registry.url

  • Configuration option: value.converter.schema.registry.url

  • Since: 0.2.0

  • Default value: none

  • Type: STRING

  • Valid values: A valid URL. URL scheme must be: ‘https’.

  • Importance: MEDIUM

    Schema registry URL for value converters. If not specified the ‘schema.registry.url’ value will be used.

Example

A copy of this example configuration file is available for download.


# Copyright 2026 Aiven Oy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

# ---------------------------------------------------------------------------
# This is a an example of a basic sink connector configuration.

# For additional information:
# - https://kafka.apache.org/42/kafka-connect/
# - https://aiven-open.github.io/salesforce-connector-for-apache-kafka/sink/configuration.html

name=AccountSink
connector.class=io.aiven.kafka.connect.salesforce.sink.SalesforceSinkConnector
tasks.max=3

# Comma separated list of accounts to use (alternatively use topic.regex to specify a set)
topics=account

# How the messages in the Kafka topic are converted into Kafka Connect records
value.converter=org.apache.kafka.connect.json.JsonConverter

# How often to flush records to the Salesforce sink; a larger interval means fewer API calls but larger batches
offset.flush.interval.ms=60000
# The maximum time to wait for each flush to complete (default is 5000 ms); increase this for large batches or high Salesforce latency
offset.flush.timeout.ms=5000

# The maximum number of times to retry an insert or authentication request against the Bulk API before failing
max.retries=3

# The version of the Salesforce API to use for all queries
salesforce.api.version=v65.0

# The Salesforce client id (also known as consumer key) and secret.
# https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_eca.htm#sfdx_dev_auth_eca
salesforce.client.id=CONSUMER_KEY
salesforce.client.secret=CONSUMER_SECRET

# The Salesforce OAuth URI may be unique for your Salesforce deployment.

# Sandbox: salesforce.oauth.uri=https://test.salesforce.com/services/oauth2/token
# Org-specific domain: salesforce.oauth.uri=https://<your-domain>.my.salesforce.com/services/oauth2/token
# Developer edition: salesforce.oauth.uri=https://orgfarm-1234abc123-dev-ed.develop.my.salesforce.com/services/oauth2/token
# Production:
salesforce.oauth.uri=https://login.salesforce.com/services/oauth2/token

salesforce.uri=https://orgfarm-1234abc123-dev-ed.develop.my.salesforce.com