Aiven source connector for Salesforce on Apache Kafka Configuration
Authentication
To make the connector work, a user has to specify Salesforce client credentials to allow it to connect to Salesforce:
- Client Credentials are the only supported credentials at the moment
- Configure Client Credentials flow documentation
- specify the client credentials using the configuration options
salesforce.client.secretfor the Salesforce client secretsalesforce.client.idfor the Salesforce client id
- Configure your Salesforce urls
salesforce.oauth.uriis used to authenticate against. Often this will be “https://MyCompany.my.salesforce.com/services/oauth2/token”- More details can be found on the Salesforce website
salesforce.uriis the api we query against and is often “https://MyCompany.my.salesforce.com”
- The api version you wish to use of the Bulk API is also selectable
1.
salesforce.api.versioncan be set to select a particular version e.g. ‘v66.0’, ‘v60.0’ etc- By default, version
v65.0is selected
- By default, version
Notes
-
Define your query
salesforce.bulk.api.queries- Queries are defined using the SOQL language and are subject to SOQL Syntax and case sensitivity restrictions
- The queries all need to have the system field LastModifiedDate included in the SELECT statement
- The WHERE Clause is optional but should not contain the “LastModifiedDate” as this is used internally
- The Order By statement is not allowed to be used as this is used to order the query results
- Multiple queries are supported
- Example query:
SELECT Id, Name, LastModifiedDate FROM Account; -
- It is possible to get duplicate entries. In particular, if your SELECT statement does not include a field and that field is updated the LastModifiedDate will be updated and you will receive a new event from that entry.
- The Connector does not support using sub queries as part of your Bulk API query ``
-
Some important configuration options to handle how often the Bulk API is queried by the connector
salesforce.soql.query.wait- How long to wait between querying a Salesforce object
- The default is 60 seconds, but if this data does not change very often we would recommend extending this delay to help preserve your API limits
- How long to wait between querying a Salesforce object
salesforce.status.check.wait- How long to wait after submitting a query to Salesforce before checking if a bulk query is ready for consumption
- The default is 5 seconds. If you are expecting a large amount of data you may need to extend this time to reduce the number of API calls.
- How long to wait after submitting a query to Salesforce before checking if a bulk query is ready for consumption
salesforce.max.records- Defines how many records per page should be returned by the Bulk API. With larger Objects this may need to be decreased to allow for smaller chunking of data to the API.
- The default is 50,000 records
- Defines how many records per page should be returned by the Bulk API. With larger Objects this may need to be decreased to allow for smaller chunking of data to the API.
-
This connector supports just one task. This helps prevent issues with timing and also prevents using up the API calls available to your account too quickly.
- See further details on Salesforce allocations and limits on the Bulk API 2.0
Configuration
Apache Kafka has produced a users guide that describes how to run Kafka Connect. This documentation describes the Connect workers configuration. They have also produced a good description of how Kafka Connect resumes after a failure or stoppage.
Configuration options
All the configuration options are listed below in alphabetical order.
errors.tolerance
-
Configuration option: errors.tolerance
-
Since: 0.1.0
-
Default value: NONE
-
Type: STRING
-
Valid values: (case insensitive) [ALL, NONE]
-
Importance: MEDIUM
Indicates to the connector what level of exceptions are allowed before the connector stops.
key.converter.schema.registry.enable
-
Configuration option: key.converter.schema.registry.enable
-
Since: 0.1.0
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
The key converter schema registry enablement flag. If ‘true’ the key converter schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the key converter schema registry will be enabled if the schema.registry.enable parameter enabled.
key.converter.schema.registry.url
-
Configuration option: key.converter.schema.registry.url
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: A valid URL. URL scheme must be: ‘https’.
-
Importance: MEDIUM
Schema registry URL for key converters. If not specified the ‘schema.registry.url’ value will be used.
max.poll.records
-
Configuration option: max.poll.records
-
Since: 0.1.0
-
Default value: 500
-
Type: INT
-
Valid values: [1,…]
-
Importance: MEDIUM
Max poll records
native.start.key
-
Configuration option: native.start.key
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Importance: MEDIUM
An identifier for the source connector to know which key to start processing from, on a restart it will also begin reading messages from this point as well
ring.buffer.size
-
Configuration option: ring.buffer.size
-
Since: 0.1.0
-
Default value: 1000
-
Type: INT
-
Valid values: [1,…]
-
Importance: MEDIUM
The number of storage key to store in the ring buffer.
salesforce.api.version
-
Configuration option: salesforce.api.version
-
Since: 0.1.0
-
Default value: v65.0
-
Type: STRING
-
Valid values: non-empty string
-
Importance: MEDIUM
API version of the Salesforce API to use when communicating with Salesforce, default value is v65.0
salesforce.bulk.api.queries
-
Configuration option: salesforce.bulk.api.queries
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: A valid SOQL Query. Requires the SELECT statement to have LastModifiedDate specified, it must also not specify the LastModifiedDate in the WHERE clause.
-
Importance: MEDIUM
Salesforce bulk api queries are used to query for large amounts of data using SOQL a query typically looks like `SELECT Id,Name,LastModifiedDate FROM Account` or when querying multiple Objects `SELECT Id,Name, LastModifiedDate FROM Account ; SELECT Id, FirstName, Name, LastModifiedDate FROM Contact; SELECT LastName__c, FirstName__c, PhoneNumber__c FROM Phone_Book__b`
salesforce.client.id
-
Configuration option: salesforce.client.id
-
Since: 0.1.0
-
Default value: none
-
Type: PASSWORD
-
Importance: MEDIUM
Salesforce client id that is used to authenticate over oauth with the api.
salesforce.client.secret
-
Configuration option: salesforce.client.secret
-
Since: 0.1.0
-
Default value: none
-
Type: PASSWORD
-
Importance: MEDIUM
Salesforce client secret that is used to authenticate over oauth with the api.
salesforce.max.records
-
Configuration option: salesforce.max.records
-
Since: 0.1.0
-
Default value: 50000
-
Type: INT
-
Valid values: [1,…,150000]
-
Importance: MEDIUM
Salesforce default maximum number of records to retrieve from the Bulk API. Must be at least 100 and at most 100000, default value is 50000
salesforce.max.retries
-
Configuration option: salesforce.max.retries
-
Since: 0.1.0
-
Default value: 3
-
Type: INT
-
Valid values: [1,…,5]
-
Importance: MEDIUM
Salesforce default maximum number of retries against API. Must be at least 1 and at most 5, default value is 3
salesforce.oauth.uri
-
Configuration option: salesforce.oauth.uri
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string
-
Importance: MEDIUM
Salesforce oauth uri that is used to authenticate over oauth with the api, this is a uri specific to your organization and domain supplied by Salesforce.
salesforce.soql.query.wait
-
Configuration option: salesforce.soql.query.wait
-
Since: 0.1.0
-
Default value: 300
-
Type: LONG
-
Valid values: [1,…,604800]
-
Importance: MEDIUM
salesforce.soql.query.wait allows a user to configure the minimum time in seconds between re-executing the same SOQL query against the API the default value is 300 seconds. Minimum 1 second delay and a maximum of 604800 seconds or one week.
salesforce.status.check.wait
-
Configuration option: salesforce.status.check.wait
-
Since: 0.1.0
-
Default value: 5
-
Type: LONG
-
Valid values: [5,…,3600]
-
Importance: MEDIUM
salesforce.status.check.wait allows a user to configure the time in seconds between individual api calls to check the status of a bulk job. Each SOQL query requires 1 or many calls to see if the job is ready to be processed, this configuration allows the user to reduce or increase the number of calls made to the api to check the status. 300 seconds. Minimum 5 seconds and maximum 3600 seconds or one hour.
salesforce.uri
-
Configuration option: salesforce.uri
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string
-
Importance: MEDIUM
Salesforce domain uri that is used to query the bulk api this is a uri specific to your organization and domain supplied by Salesforce.
schema.registry.enable
-
Configuration option: schema.registry.enable
-
Since: 0.1.0
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
The schema registry enablement flag. If ‘true’ the schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the registry will be enabled if the schema.registry.url parameter is set.
schema.registry.url
-
Configuration option: schema.registry.url
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: A valid URL. URL scheme must be: ‘https’.
-
Importance: MEDIUM
The default schema registry URL.
tasks.max
-
Configuration option: tasks.max
-
Since: Kafka 0.9.0.0
-
Default value: 1
-
Type: INT
-
Valid values: [1,…]
-
Importance: HIGH
Maximum number of tasks to use for this connector.
topic
-
Configuration option: topic
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string
-
Importance: MEDIUM
The name of the topic to write records to.
topic.prefix
-
Configuration option: topic.prefix
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Importance: HIGH
topic.prefix is a required to determine what topic to put the events onto, the prefix is used as `<topic.prefix>.<api_name>.<object_name>` for example if set to history then events from the Account table from the bulk api will be produced to history.bulkApi.Account.
value.converter.schema.registry.enable
-
Configuration option: value.converter.schema.registry.enable
-
Since: 0.1.0
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
The value converter schema registry enablement flag. If ‘true’ the value converter schema registry will be enabled, if ‘false’ the registry will not be enabled. If not set the value converter schema registry will be enabled if the schema.registry.enable parameter enabled.
value.converter.schema.registry.url
-
Configuration option: value.converter.schema.registry.url
-
Since: 0.1.0
-
Default value: none
-
Type: STRING
-
Valid values: A valid URL. URL scheme must be: ‘https’.
-
Importance: MEDIUM
Schema registry URL for value converters. If not specified the ‘schema.registry.url’ value will be used.
Example
A copy of this example configuration file is available for download.
### Standard connector configuration
## Fill in your values in these:
# The Java class for the connector
connector.class=io.aiven.kafka.connect.salesforce.source.SalesforceSourceConnector
# Number of worker tasks to run concurrently, only '1' is supported on this connector
tasks.max=1
# All data will be produced to topics with the prefix using the below an example topic would be
# salesforce.test.bulkapi.Account
topics.prefix=salesforce.test
# The maximum number of times to retry a query or authentication request against the Bulk API before failing
max.retries=3
# The version of the Salesforce API to use for all queries
salesforce.api.version=v65.0
# Salesforce Client Secret
salesforce.client.secret=YOUR_CLIENT_SECRET
# Salesforce Client Id
salesforce.client.id=YOUR_CLIENT_ID
# The Salesforce OAUTH Uri may be unique for your salesforce deployment
salesforce.oauth.uri=https://www.salesforce.com/services/oauth2/token
salesforce.bulk.api.queries=Select Id,Name,LastModifiedDate FROM Account;Select Id,LastModifiedDate,Department,Email,FirstName,LastName FROM Contact WHERE HasOptedOutOfEmail=False;
# Optional
# Wait at least an hour in between each query being called
# Time is in seconds
salesforce.soql.query.wait=3600
# When you have large Object tables it can take longer for the job to be processed by the API
# reduce the number of API calls by increasing the time between job status checks
# Time is in seconds
salesforce.status.check.wait=120