Configuration

Reference

sourcepekko.kafka.consumer = {
    poll-interval = ${?PEKKO_KAFKA_CONSUMER_POLL_INTERVAL}
    poll-timeout = ${?PEKKO_KAFKA_CONSUMER_POLL_TIMEOUT}
    stop-timeout = ${?PEKKO_KAFKA_CONSUMER_STOP_TIMEOUT}
    close-timeout = ${?PEKKO_KAFKA_CONSUMER_CLOSE_TIMEOUT}
    commit-time-warning = ${?PEKKO_KAFKA_CONSUMER_COMMIT_TIME_WARNING}
    commit-refresh-interval = ${?PEKKO_KAFKA_CONSUMER_COMMIT_REFRESH_INTERVAL}
    use-dispatcher = ${?PEKKO_KAFKA_CONSUMER_USE_DISPATCHER}
    wait-close-partition = ${?PEKKO_KAFKA_CONSUMER_WAIT_CLOSE_PARTITION}
    position-timeout = ${?PEKKO_KAFKA_CONSUMER_POSITION_TIMEOUT}
    offset-for-times-timeout = ${?PEKKO_KAFKA_OFFSET_FOR_TIMES_TIMEOUT}
    metadata-request-timeout = ${?PEKKO_KAFKA_METADATA_REQUEST_TIMEOUT}
    eos-draining-check-interval = ${?PEKKO_KAFKA_CONSUMER_EOS_DRAINING_CHECK_INTERVAL}
    connection-checker = {
        enable = ${?PEKKO_KAFKA_CONSUMER_CONNECTION_CHECKER_ENABLE}
        max-retries = ${?PEKKO_KAFKA_CONSUMER_CONNECTION_CHECKER_MAX_RETRIES}
        backoff-factor = ${?PEKKO_KAFKA_CONSUMER_CONNECTION_CHECKER_BACKOFF_FACTOR}
        check-interval = ${?PEKKO_KAFKA_CONSUMER_CONNECTION_CHECKER_CHECK_INTERVAL}
    }
    partition-handler-warning = ${?PEKKO_KAFKA_CONSUMER_PARTITION_HANDLER_WARNING}
    offset-reset-protection = {
        enable = ${?PEKKO_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_ENABLE}
        offset-threshold = ${?PEKKO_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_OFFSET_THRESHOLD}
        time-threshold = ${?PEKKO_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_TIME_THRESHOLD}
    }
}

pekko.kafka.committer = {
    max-batch = 100000
    max-batch = ${?PEKKO_KAFKA_COMMITTER_MAX_BATCH}
    max-interval = 1 hour
    max-interval = ${?PEKKO_KAFKA_COMMITTER_MAX_INTERVAL}
    parallelism = ${?PEKKO_KAFKA_COMMITTER_PARALLELISM}
    parallelism = 10000
}

backup {
    kafka-group-id = ${?BACKUP_KAFKA_GROUP_ID}
    time-configuration = {
       type = chrono-unit-slice
       type = ${?BACKUP_TIME_CONFIGURATION_TYPE}
       chrono-unit = hours
       chrono-unit = ${?BACKUP_TIME_CONFIGURATION_CHRONO_UNIT}
       duration = 1 hour
       duration = ${?BACKUP_TIME_CONFIGURATION_DURATION}
    }
    commit-timeout-buffer-window = 10 seconds
    commit-timeout-buffer-window = ${?BACKUP_COMMIT_TIMEOUT_BUFFER}
}

Scala API doc Backup

Explanation

  • pekko.kafka.consumer: See documentation
  • pekko.kafka.consumer.kafka-clients: See documentation
  • backup:
    • kafka-group-id: The group id for the Kafka consumer that’s used in restore tool
    • time-configuration: How to slice the persisted keys/files based by time
      • type: The type of time configuration. Either period-from-first or chrono-unit-slice
        • period-from-first: Guardian will split up the backup keys/files determined by the duration specified. The key/filename will be determined by the timestamp of the first message received from the Kafka consumer with each further key/filename being incremented by the configured duration. If guardian is shut down then it will terminate and complete stream with the final element in the JSON array being a null
          • This is done so it’s possible to determine if a backup has been terminated by shut down of Guardian and also because it’s not really possible to resume using arbitrary durations.
        • chrono-unit-slice: Guardian will split up the backup keys/files determined by the chrono-unit which represent intervals such as days and weeks. As such when using this setting its possible for Guardian to resume from a previous uncompleted backup.
      • duration: If configuration is period-from-first then this determines max period of time for each time slice.
      • chrono-unit: if configuration is chrono-unit-slice the chrono-unit determines
    • commit-timeout-buffer-window: Guardian sets the commit timeout of the Kafka consumer based on the time-configuration since Guardian does manual committing of cursors. The buffer gets added onto the time-configuration to give some headroom for any theoretical delays.
    • compression: The compression format to use for the data being backed up. Note that changes in compression configuration will not apply for any currently existing backups that need to be completed, only for future new backups.
    • type: Which compression to use.
      • gzip. Standard Gzip compression
    • level: The level of compression to use
The source code for this page can be found here.