BigQuery connector configuration options
All the configuration options are listed below in alphabetical order.
allBQFieldsNullable
-
Configuration option: allBQFieldsNullable
-
Default value: none
-
Type: BOOLEAN
-
Importance: LOW
If true, no fields in any produced BigQuery schema will be REQUIRED. All non-nullable avro fields will be translated as NULLABLE (or REPEATED, if arrays).
allowBigQueryRequiredFieldRelaxation
-
Configuration option: allowBigQueryRequiredFieldRelaxation
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE
allowNewBigQueryFields
-
Configuration option: allowNewBigQueryFields
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
If true, new fields can be added to BigQuery tables during subsequent schema updates
allowSchemaUnionization
-
Configuration option: allowSchemaUnionization
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates
autoCreateBucket
-
Configuration option: autoCreateBucket
-
Default value: true
-
Type: BOOLEAN
-
Importance: MEDIUM
Whether to automatically create the given bucket, if it does not exist. Only relevant if enableBatchLoad is configured.
autoCreateTables
-
Configuration option: autoCreateTables
-
Default value: true
-
Type: BOOLEAN
-
Importance: HIGH
Automatically create BigQuery tables if they don't already exist
avroDataCacheSize
-
Configuration option: avroDataCacheSize
-
Default value: 100
-
Type: INT
-
Valid values: [0,…]
-
Importance: LOW
The size of the cache to use when converting schemas from Avro to Kafka Connect
batchLoadIntervalSec
-
Configuration option: batchLoadIntervalSec
-
Default value: 120
-
Type: INT
-
Importance: LOW
The interval, in seconds, in which to attempt to run GCS to BQ load jobs. Only relevant if enableBatchLoad is configured.
bigQueryMessageTimePartitioning
-
Configuration option: bigQueryMessageTimePartitioning
-
Default value: none
-
Type: BOOLEAN
-
Importance: HIGH
Whether or not to use the message time when inserting records. Default uses the connector processing time.
bigQueryPartitionDecorator
-
Configuration option: bigQueryPartitionDecorator
-
Default value: true
-
Type: BOOLEAN
-
Importance: HIGH
-
Options that influence if or how this option can be used:
- useStorageWriteApi
Whether or not to append partition decorator to BigQuery table name when inserting records. Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.
bigQueryRetry
-
Configuration option: bigQueryRetry
-
Default value: none
-
Type: INT
-
Valid values: [0,…]
-
Importance: MEDIUM
The number of retry attempts that will be made per BigQuery request that fails with a backend error or a quota exceeded error. For GCS batch load(enableBatchLoad): upper bound, but retries may be cut short if the exponential backoff (starting at bigQueryRetryWait) exceeds the retry budget. For other writers (direct BigQuery, Storage API): used as a fixed retry count.
bigQueryRetryWait
-
Configuration option: bigQueryRetryWait
-
Default value: 1000
-
Type: LONG
-
Valid values: [0,…]
-
Importance: MEDIUM
The minimum amount of time, in milliseconds, to wait between BigQuery backend or quota exceeded error retry attempts. For GCS batch load(see enableBatchLoad): base delay for exponential backoff with jitter (capped at 10s). For other writers: constant wait time between retries.
clusteringPartitionFieldNames
-
Configuration option: clusteringPartitionFieldNames
-
Default value: none
-
Type: LIST
-
Valid values: Up to four clustering field names
-
Importance: LOW
List of fields on which data should be clustered by in BigQuery, separated by commas
commitInterval
-
Configuration option: commitInterval
-
Default value: 60
-
Type: INT
-
Valid values: [15,…,14400]
-
Importance: LOW
-
Options that influence if or how this option can be used:
- useStorageWriteApi
The interval, in seconds, in which to attempt to commit streamed records.
convertDebeziumTimestampToInteger
-
Configuration option: convertDebeziumTimestampToInteger
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
convertDebeziumVariableScaleDecimal
Deprecated since 2.8.0: Use variableScaleDecimalHandlingMode instead.
-
Configuration option: convertDebeziumVariableScaleDecimal
-
Default value: none
-
Type: BOOLEAN
-
Importance: LOW
convertDoubleSpecialValues
-
Configuration option: convertDoubleSpecialValues
-
Default value: none
-
Type: BOOLEAN
-
Importance: LOW
Should +Infinity be converted to Double.MAX_VALUE and -Infinity and NaN be converted to Double.MIN_VALUE so they can make it to BigQuery
decimalHandlingMode
-
Configuration option: decimalHandlingMode
-
Default value: FLOAT
-
Type: STRING
-
Valid values: RECORD, FLOAT, NUMERIC, BIGNUMERIC
-
Importance: MEDIUM
Specifies the conversion strategy for org.apache.kafka.connect.data.Decimalfields.
defaultDataset
-
Configuration option: defaultDataset
-
Default value: none
-
Type: STRING
-
Importance: HIGH
The default dataset to be used
deleteEnabled
-
Configuration option: deleteEnabled
-
Default value: none
-
Type: BOOLEAN
-
Importance: LOW
-
Options that this option influences:
-
intermediateTableSuffix
-
useStorageWriteApi
-
mergeIntervalMs
-
kafkaKeyFieldName
Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (i.e., a tombstone record) is read.
-
enableBatchLoad
-
Configuration option: enableBatchLoad
-
Default value: none
-
Type: LIST
-
Importance: LOW
The sublist of topics to be batch loaded through GCS.
enableBatchMode
-
Configuration option: enableBatchMode
-
Default value: none
-
Type: BOOLEAN
-
Importance: LOW
-
Options that influence if or how this option can be used:
- useStorageWriteApi
Use Google's New Storage Write API with batch mode
enableRetries
-
Configuration option: enableRetries
-
Default value: true
-
Type: BOOLEAN
-
Importance: MEDIUM
gcsBucketName
-
Configuration option: gcsBucketName
-
Default value: none
-
Type: STRING
-
Importance: HIGH
The name of the bucket in which gcs blobs used to batch load to BigQuery should be located. Only relevant if enableBatchLoad is configured.
gcsFolderName
-
Configuration option: gcsFolderName
-
Default value: none
-
Type: STRING
-
Importance: MEDIUM
The name of the folder under the bucket in which gcs blobs used to batch load to BigQuery should be located. Only relevant if enableBatchLoad is configured.
intermediateTableSuffix
-
Configuration option: intermediateTableSuffix
-
Default value: tmp
-
Type: STRING
-
Valid values: non-empty string
-
Importance: LOW
-
Options that influence if or how this option can be used:
-
deleteEnabled
-
upsertEnabled
A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table, but their names will always start with the name of the destination table, followed by this suffix, and possibly followed by an additional suffix.
-
kafkaDataFieldName
-
Configuration option: kafkaDataFieldName
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string
-
Importance: LOW
The name of the field of Kafka Data. Default to be null, which means Kafka Data Field will not be included.
kafkaKeyFieldName
-
Configuration option: kafkaKeyFieldName
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string
-
Importance: LOW
-
Options that influence if or how this option can be used:
-
deleteEnabled
-
upsertEnabled
The name of the field of Kafka key. Default to be null, which means Kafka Key Field will not be included.
-
keySource
-
Configuration option: keySource
-
Default value: FILE
-
Type: STRING
-
Valid values: [FILE, JSON, APPLICATION_DEFAULT]
-
Importance: MEDIUM
Determines whether the keyfile config is the path to the credentials json file or the raw json of the key itself. If set to APPLICATION_DEFAULT, the keyfile should not be provided and the connector will use any GCP application default credentials that it can find on the Connect worker for authentication.
keyfile
-
Configuration option: keyfile
-
Default value: none
-
Type: PASSWORD
-
Importance: MEDIUM
The file containing a JSON key with BigQuery service account credentials
max.retries
-
Configuration option: max.retries
-
Default value: 10
-
Type: INT
-
Valid values: [1,…]
-
Importance: MEDIUM
The maximum number of times to retry on retriable errors before failing the task.
mergeIntervalMs
-
Configuration option: mergeIntervalMs
-
Default value: 60000
-
Type: LONG
-
Valid values: Either -1 to disable or a value of at least 10000 to enable
-
Importance: LOW
-
Options that influence if or how this option can be used:
-
deleteEnabled
-
upsertEnabled
How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing , otherwise the value should be at least 10000 (10 seconds) Either mergeIntervalMs or mergeRecordsThresholdor both must be enabled
-
mergeRecordsThreshold
-
Configuration option: mergeRecordsThreshold
-
Default value: -1
-
Type: LONG
-
Valid values: Either a positive integer or -1 to disable throughput-based merging
-
Importance: LOW
How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to -1 to disable record count-based flushing. Either mergeIntervalMs or mergeRecordsThreshold, or both must be enabled.
partitionExpirationMs
-
Configuration option: partitionExpirationMs
-
Default value: none
-
Type: LONG
-
Valid values: if set the value must be at least 1
-
Importance: LOW
The amount of time, in milliseconds, after which partitions should be deleted from the tables this connector creates. If this field is set, all data in partitions in this connector's tables that are older than the specified partition expiration time will be permanently deleted. Existing tables will not be altered to use this partition expiration time.
preserveKafkaTopicPartitionOffset
-
Configuration option: preserveKafkaTopicPartitionOffset
-
Since: 2.8.0
-
Default value: none
-
Type: BOOLEAN
-
Importance: LOW
If True and Kafka v3.6 or higher is in use will use the original topic, partition, and offset values as specified before any message transformation occurs.
project
-
Configuration option: project
-
Default value: none
-
Type: STRING
-
Importance: HIGH
The BigQuery project to write to
queueSize
-
Configuration option: queueSize
-
Default value: -1
-
Type: LONG
-
Valid values: [-1,…]
-
Importance: HIGH
The maximum size (or -1 for no maximum size) of the worker queue for bigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics will be resumed once a flush is requested or the size of the queue drops under half of the maximum size.
sanitizeFieldNames
-
Configuration option: sanitizeFieldNames
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
Whether to automatically sanitize field names before using them as field names in big query. Big query specifies that field name can only contain letters, numbers, and underscores. The sanitizer will replace the invalid symbols with underscore. If the field name starts with a digit, the sanitizer will add an underscore in front of field name. Note: field a.b and a_b will have same value after sanitizing, and might cause key duplication error.
sanitizeTopics
-
Configuration option: sanitizeTopics
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
Whether to automatically sanitize topic names before using them as table names; if not enabled topic names will be used directly as table names
schemaRetriever
-
Configuration option: schemaRetriever
-
Default value: class com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
-
Type: CLASS
-
Importance: MEDIUM
A class that can be used for automatically creating tables and/or updating schemas
threadPoolSize
-
Configuration option: threadPoolSize
-
Default value: 10
-
Type: INT
-
Valid values: [1,…]
-
Importance: MEDIUM
The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.
timePartitioningType
-
Configuration option: timePartitioningType
-
Default value: DAY
-
Type: STRING
-
Valid values: day, hour, month, year, none
-
Importance: LOW
The time partitioning type to use when creating tables, or ‘NONE’ to create non-partitioned tables. Existing tables will not be altered to use this partitioning type.
timestampPartitionFieldName
-
Configuration option: timestampPartitionFieldName
-
Default value: none
-
Type: STRING
-
Valid values: non-empty string
-
Importance: LOW
The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave this configuration blank, to enable ingestion time partitioning for each table.
topic2TableMap
-
Configuration option: topic2TableMap
-
Default value: none
-
Type: STRING
-
Valid values: A list of comma separated values comprising topic:table pairs.
-
Importance: LOW
Map of topics to tables (optional). Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,... Note that topic name should not be modified using regex SMT while using this option.Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set.Lastly, if the topic2table map doesn't contain the topic for a record, a table with the same name as the topic name would be created
Topics
-
Configuration option: topics
-
Default value: none
-
Type: LIST
-
Importance: HIGH
List of topics to consume, separated by commas
Topics regex
-
Configuration option: topics.regex
-
Default value: none
-
Type: STRING
-
Importance: HIGH
Regular expression giving topics to consume. Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. Only one of topics or topics.regex should be specified.
upsertEnabled
-
Configuration option: upsertEnabled
-
Default value: none
-
Type: BOOLEAN
-
Importance: LOW
-
Options that this option influences:
-
intermediateTableSuffix
-
useStorageWriteApi
-
mergeIntervalMs
-
kafkaKeyFieldName
Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys.
-
useCredentialsProjectId
-
Configuration option: useCredentialsProjectId
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
Use the quotaProjectId from the credentials when available.
useStorageWriteApi
-
Configuration option: useStorageWriteApi
-
Default value: none
-
Type: BOOLEAN
-
Importance: MEDIUM
-
Options that influence if or how this option can be used:
-
deleteEnabled
-
upsertEnabled
-
-
Options that this option influences:
-
commitInterval
-
enableBatchMode
-
bigQueryPartitionDecorator
(Beta feature: use with caution) Use Google's New Storage Write API for data streaming. Not available for upsert/delete mode
-
variableScaleDecimalHandlingMode
-
Configuration option: variableScaleDecimalHandlingMode
-
Since: 2.7.0
-
Default value: RECORD
-
Type: STRING
-
Valid values: RECORD, FLOAT, NUMERIC, BIGNUMERIC
-
Importance: MEDIUM
Specifies the conversion strategy for io.debezium.data.VariableScaleDecimalfields.