Python API Documentation


class rohmu.BaseTransfer(prefix: Optional[str], notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, ensure_object_store_available: bool = True)
rohmu.get_class_for_notifier(notifier_config: Dict[str, Any]) Type[Notifier]
rohmu.get_class_for_storage_driver(storage_driver: StorageDriver) Type[BaseTransfer[Any]]
rohmu.get_class_for_transfer(obj_store: Dict[str, Any]) Type[BaseTransfer[Any]]
rohmu.get_notifier(notifier_config: Dict[str, Any]) Notifier
rohmu.get_transfer_from_model(model: StorageModelT, notifier: Optional[Notifier] = None, ensure_object_store_available: bool = True) BaseTransfer[StorageModelT]
rohmu.get_transfer_model(storage_config: Dict[str, Any]) StorageModel
rohmu.get_transfer(storage_config: Dict[str, Any], ensure_object_store_available: bool = True) BaseTransfer[Any]
class rohmu.Notifier

This interface allows external code to be notified about object changes.

class rohmu.ProxyInfo(*, host: str, port: int, type: ProxyType, user: Optional[str] = None, password: Optional[str] = None)
rohmu.S3AddressingStyle = <enum 'S3AddressingStyle'>

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

    >>> Color.RED
    <Color.RED: 1>
  • value lookup:

    >>> Color(1)
    <Color.RED: 1>
  • name lookup:

    >>> Color['RED']
    <Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

class rohmu.StorageDriver(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)
class rohmu.StorageModel(*, storage_type: StorageDriver, statsd_info: Optional[StatsdConfig] = None)

Delta Backups

class 'Path', missing_ok: 'bool' = True)
pydantic model

JSON-encodable progress meter of sorts

Show JSON schema
   "title": "Progress",
   "description": "JSON-encodable progress meter of sorts",
   "type": "object",
   "properties": {
      "handled": {
         "title": "Handled",
         "default": 0,
         "type": "integer"
      "failed": {
         "title": "Failed",
         "default": 0,
         "type": "integer"
      "total": {
         "title": "Total",
         "default": 0,
         "type": "integer"
      "final": {
         "title": "Final",
         "default": false,
         "type": "boolean"
   "additionalProperties": false

  • extra: str = forbid

  • use_enum_values: bool = True

  • validate_all: bool = True

  • validate_assignment: bool = True

field failed: int = 0
field final: bool = False
field handled: int = 0
field total: int = 0
add_fail(n: int = 1, *, info: str = 'add_fail') None
add_success(n: int = 1, *, info: str = 'add_success') None
add_total(n: int) None
done() None
download_success(size: int) None
classmethod merge(progresses: Sequence[Progress]) Progress
progress_metrics() ProgressMetrics
start(n: int) None

Optional ‘first’ step, just for logic handling state (e.g. no progress object reuse desired)

upload_failure(hexdigest: str) None
upload_missing(hexdigest: str) None
upload_success(hexdigest: str) None
property finished_failed: bool
property finished_successfully: bool
class, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)
pydantic model

This class represents something that is to be stored in the object storage.

size is provided mainly to allow for even loading of nodes in case same hexdigest is available from multiple nodes.

Show JSON schema
   "title": "SnapshotHash",
   "description": "This class represents something that is to be stored in the object storage.\n\nsize is provided mainly to allow for even loading of nodes in case\nsame hexdigest is available from multiple nodes.",
   "type": "object",
   "properties": {
      "hexdigest": {
         "title": "Hexdigest",
         "type": "string"
      "size": {
         "title": "Size",
         "type": "integer"
   "required": [
   "additionalProperties": false

  • extra: str = forbid

  • use_enum_values: bool = True

  • validate_all: bool = True

  • validate_assignment: bool = True

field hexdigest: str [Required]
field size: int [Required] int, new_value: Optional[int] = None, *, total: Optional[int] = None) bool

Make reporting sparser and sparser as values grow larger - report every 1.1**N or so - if we know total, report every percent

class*, src: StrOrPathLike, dst: StrOrPathLike, globs: list[str], src_iterate_func: Optional[Callable[[], Iterable[Union[BackupPath, str, Path]]]] = None, parallel: int = 1, min_delta_file_size: int = 0)

Snapshotter keeps track of files on disk, and their hashes.

The hash on disk MAY change, which may require subsequent incremental snapshot and-or ignoring the files which have changed.

The output to outside is just root object’s hash, as well as list of other hashes which correspond to files referred to within the file list contained in root object.

Note that any call to public API MUST be made with snapshotter.lock held. This is because Snapshotter is process-wide utility that is shared across operations, possibly used from multiple threads, and the single-operation-only mode of operation is not exactly flawless (the ‘new operation can be started with old running’ is intentional feature but new operation should eventually replace the old). The lock itself might not need to be built-in to Snapshotter, but having it there enables asserting its state during public API calls.


class rohmu.notifier.interface.Notifier

This interface allows external code to be notified about object changes.

close() None

Method used to clean resources of the notifier, if any.

object_copied(key: str, size: Optional[int], metadata: Optional[dict[str, str]]) None

Called when an object is copied.

abstract object_created(key: str, size: Optional[int], metadata: Optional[dict[str, str]]) None

Called when an object is created.

abstract object_deleted(key: str) None

Called when an object is deleted.

Note: This may be called with each individual object as a side-effect of delete_tree, for drivers that do not support the higher level operation.

abstract tree_deleted(key: str) None

May be called when a tree is deleted.

Note: Not every driver supports this operation, for those objects will be listed and deleted individually, in that case object_deleted is called instead.

HTTP Notifier

class rohmu.notifier.http.BackgroundHTTPNotifier(url: str, stop_event_check_timeout: float = 5, session: Optional[Session] = None)
close() None

Method used to clean resources of the notifier, if any.

object_created(key: str, size: Optional[int], metadata: Optional[dict[str, str]]) None

Called when an object is created.

object_deleted(key: str) None

Called when an object is deleted.

Note: This may be called with each individual object as a side-effect of delete_tree, for drivers that do not support the higher level operation.

tree_deleted(key: str) None

May be called when a tree is deleted.

Note: Not every driver supports this operation, for those objects will be listed and deleted individually, in that case object_deleted is called instead.

class rohmu.notifier.http.HTTPNotifyJob(url: 'str', json: 'str')
class rohmu.notifier.http.Operation(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Logger Notifier

class rohmu.notifier.logger.LoggerNotifier(log: Logger)
object_created(key: str, size: Optional[int], metadata: Optional[dict[str, str]]) None

Called when an object is created.

object_deleted(key: str) None

Called when an object is deleted.

Note: This may be called with each individual object as a side-effect of delete_tree, for drivers that do not support the higher level operation.

tree_deleted(key: str) None

May be called when a tree is deleted.

Note: Not every driver supports this operation, for those objects will be listed and deleted individually, in that case object_deleted is called instead.

Null Notifier

class rohmu.notifier.null.NullNotifier

Empty implementation.

Used by default if configuration is missing to avoid None checks

object_created(key: str, size: Optional[int], metadata: Optional[dict[str, str]]) None

Called when an object is created.

object_deleted(key: str) None

Called when an object is deleted.

Note: This may be called with each individual object as a side-effect of delete_tree, for drivers that do not support the higher level operation.

tree_deleted(key: str) None

May be called when a tree is deleted.

Note: Not every driver supports this operation, for those objects will be listed and deleted individually, in that case object_deleted is called instead.

Object Storages


pydantic model rohmu.object_storage.config.AzureObjectStorageConfig

Show JSON schema
   "title": "AzureObjectStorageConfig",
   "type": "object",
   "properties": {
      "storage_type": {
         "title": "Storage Type",
         "default": "azure",
         "enum": [
         "type": "string"
      "statsd_info": {
         "$ref": "#/definitions/StatsdConfig"
      "bucket_name": {
         "title": "Bucket Name",
         "type": "string"
      "account_name": {
         "title": "Account Name",
         "type": "string"
      "account_key": {
         "title": "Account Key",
         "type": "string"
      "sas_token": {
         "title": "Sas Token",
         "type": "string"
      "prefix": {
         "title": "Prefix",
         "type": "string"
      "is_secure": {
         "title": "Is Secure",
         "default": true,
         "type": "boolean"
      "host": {
         "title": "Host",
         "type": "string"
      "port": {
         "title": "Port",
         "type": "integer"
      "azure_cloud": {
         "title": "Azure Cloud",
         "type": "string"
      "proxy_info": {
         "$ref": "#/definitions/ProxyInfo"
   "required": [
   "definitions": {
      "MessageFormat": {
         "title": "MessageFormat",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "StatsdConfig": {
         "title": "StatsdConfig",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "default": "",
               "type": "string"
            "port": {
               "title": "Port",
               "default": 8125,
               "type": "integer"
            "message_format": {
               "default": "telegraf",
               "allOf": [
                     "$ref": "#/definitions/MessageFormat"
            "tags": {
               "title": "Tags",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "anyOf": [
                        "type": "integer"
                        "type": "string"
            "operation_map": {
               "title": "Operation Map",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "type": "string"
         "additionalProperties": false
      "ProxyType": {
         "title": "ProxyType",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "ProxyInfo": {
         "title": "ProxyInfo",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "type": "string"
            "port": {
               "title": "Port",
               "type": "integer"
            "type": {
               "$ref": "#/definitions/ProxyType"
            "user": {
               "title": "User",
               "type": "string"
            "pass": {
               "title": "Pass",
               "type": "string"
         "required": [
         "additionalProperties": false

  • arbitrary_types_allowed: bool = True

  • extra_forbid: bool = True

  • use_enum_values: bool = True

field account_key: Optional[str] = None
Validated by
field account_name: str [Required]
Validated by
field azure_cloud: Optional[str] = None
Validated by
field bucket_name: Optional[str] = None
Validated by
field host: Optional[str] = None
Validated by
field is_secure: bool = True
Validated by
field port: Optional[int] = None
Validated by
field prefix: Optional[str] = None
Validated by
field proxy_info: Optional[ProxyInfo] = None
Validated by
field sas_token: Optional[str] = None
Validated by
field statsd_info: Optional[StatsdConfig] = None
Validated by
field storage_type: Literal[] =
Validated by
validator host_and_port_must_be_set_together  »  all fields
validator valid_azure_cloud_endpoint  »  azure_cloud
class str, account_name: str, account_key: Optional[str] = None, sas_token: Optional[str] = None, prefix: Optional[str] = None, is_secure: bool = True, host: Optional[str] = None, port: Optional[int] = None, azure_cloud: Optional[str] = None, proxy_info: Optional[dict[str, Union[str, int]]] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, ensure_object_store_available: bool = True)
close() None

Release all resources associated with the Transfer object.

copy_file(*, source_key: str, destination_key: str, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any) None

Performs remote copy from source key name to destination key name. Key must identify a file, trees cannot be copied with this method. If no metadata is given copies the existing metadata.

create_object_store_if_needed() None

Create the backing object store if it’s needed (e.g. creating directories, buckets, etc.).

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.

get_contents_to_fileobj(key: str, fileobj_to_store_to: BinaryIO, *, byte_range: Optional[Tuple[int, int]] = None, progress_callback: Optional[Callable[[int, int], None]] = None) Dict[str, Any]

Like get_contents_to_file() but writes to an open file-like object.

get_file_size(key: str) int

Returns an int indicating the size of the file in bytes

verify_object_storage() None

Perform read-only operations to verify the backing object store is available and accessible.

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.


pydantic model rohmu.object_storage.config.GoogleObjectStorageConfig

Show JSON schema
   "title": "GoogleObjectStorageConfig",
   "type": "object",
   "properties": {
      "storage_type": {
         "title": "Storage Type",
         "default": "google",
         "enum": [
         "type": "string"
      "statsd_info": {
         "$ref": "#/definitions/StatsdConfig"
      "project_id": {
         "title": "Project Id",
         "type": "string"
      "bucket_name": {
         "title": "Bucket Name",
         "type": "string"
      "credential_file": {
         "title": "Credential File",
         "type": "string",
         "format": "path"
      "credentials": {
         "title": "Credentials",
         "type": "object"
      "proxy_info": {
         "$ref": "#/definitions/ProxyInfo"
      "prefix": {
         "title": "Prefix",
         "type": "string"
   "required": [
   "definitions": {
      "MessageFormat": {
         "title": "MessageFormat",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "StatsdConfig": {
         "title": "StatsdConfig",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "default": "",
               "type": "string"
            "port": {
               "title": "Port",
               "default": 8125,
               "type": "integer"
            "message_format": {
               "default": "telegraf",
               "allOf": [
                     "$ref": "#/definitions/MessageFormat"
            "tags": {
               "title": "Tags",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "anyOf": [
                        "type": "integer"
                        "type": "string"
            "operation_map": {
               "title": "Operation Map",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "type": "string"
         "additionalProperties": false
      "ProxyType": {
         "title": "ProxyType",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "ProxyInfo": {
         "title": "ProxyInfo",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "type": "string"
            "port": {
               "title": "Port",
               "type": "integer"
            "type": {
               "$ref": "#/definitions/ProxyType"
            "user": {
               "title": "User",
               "type": "string"
            "pass": {
               "title": "Pass",
               "type": "string"
         "required": [
         "additionalProperties": false

  • arbitrary_types_allowed: bool = True

  • extra_forbid: bool = True

  • use_enum_values: bool = True

field bucket_name: Optional[str] = None
field credential_file: Optional[Path] = None
field credentials: Optional[Dict[str, Any]] = None
field prefix: Optional[str] = None
field project_id: str [Required]
field proxy_info: Optional[ProxyInfo] = None
field statsd_info: Optional[StatsdConfig] = None
field storage_type: Literal[] =
class str, bucket_name: str, credential_file: Optional[TextIO] = None, credentials: Optional[dict[str, Any]] = None, prefix: Optional[str] = None, proxy_info: Optional[dict[str, Union[str, int]]] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, ensure_object_store_available: bool = True)
close() None

Release all resources associated with the Transfer object.

copy_file(*, source_key: str, destination_key: str, metadata: Optional[Dict[str, Any]] = None, **_kwargs: Any) None

Performs remote copy from source key name to destination key name. Key must identify a file, trees cannot be copied with this method. If no metadata is given copies the existing metadata.

create_object_store_if_needed() None

Create the backing object store if it’s needed (e.g. creating directories, buckets, etc.).

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.

get_contents_to_fileobj(key: str, fileobj_to_store_to: BinaryIO, *, byte_range: Optional[Tuple[int, int]] = None, progress_callback: Optional[Callable[[int, int], None]] = None) Dict[str, Any]

Like get_contents_to_file() but writes to an open file-like object.

get_file_size(key: str) int

Returns an int indicating the size of the file in bytes

get_or_create_bucket(bucket_name: str) str

Deprecated: use create_object_store_if_needed() instead

verify_object_storage() None

Perform read-only operations to verify the backing object store is available and accessible.

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.

class BinaryIO, *, chunk_size: int, mime_type: str, name: str)

Support streaming arbitrary amount of data from non-seekable object supporting read method.

chunksize() int

Chunk size for resumable uploads.


Chunk size in bytes.

getbytes(begin: int, length: int) bytes

Get bytes from the media.

  • begin – int, offset from beginning of file.

  • length – int, number of bytes to read, starting at begin.


A string of bytes read. May be shorter than length if EOF was reached first.

has_stream() bool

Does the underlying upload support a streaming interface.

Streaming means it is an io.IOBase subclass that supports seek, i.e. seekable() returns True.


True if the call to stream() will return an instance of a seekable io.Base subclass.

mimetype() str

Mime type of the body.


Mime type.

peek() None

try to top up some data into _next_chunk

resumable() bool

Whether this upload is resumable.


True if resumable upload or False.

size() Optional[int]

Size of upload.


Size of the body, or None of the size is unknown.

stream() BinaryIO

A stream interface to the data being uploaded.


The returned value is an io.IOBase subclass that supports seek, i.e. seekable() returns True.

class BinaryIO, request: HttpRequest, chunksize: int = 52428800, *, byte_range: tuple[int, int])

This class is mostly a copy of the googleapiclient’s MediaIOBaseDownload class, but with the addition of the support for fetching a specific byte_range.

next_chunk() tuple[googleapiclient.http.MediaDownloadProgress, bool]

Get the next chunk of the download.


The value of done will be True when the media has been fully

downloaded or the total size of the media is unknown.

Return type

(status, done)

  • googleapiclient.errors.HttpError if the response was not a 2xx (or a 416 is received and the file is empty)

  • httplib2.HttpLib2Error if a transport error has occurred.


pydantic model rohmu.object_storage.config.LocalObjectStorageConfig

Show JSON schema
   "title": "LocalObjectStorageConfig",
   "type": "object",
   "properties": {
      "storage_type": {
         "title": "Storage Type",
         "default": "local",
         "enum": [
         "type": "string"
      "statsd_info": {
         "$ref": "#/definitions/StatsdConfig"
      "directory": {
         "title": "Directory",
         "type": "string",
         "format": "path"
      "prefix": {
         "title": "Prefix",
         "type": "string"
   "required": [
   "definitions": {
      "MessageFormat": {
         "title": "MessageFormat",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "StatsdConfig": {
         "title": "StatsdConfig",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "default": "",
               "type": "string"
            "port": {
               "title": "Port",
               "default": 8125,
               "type": "integer"
            "message_format": {
               "default": "telegraf",
               "allOf": [
                     "$ref": "#/definitions/MessageFormat"
            "tags": {
               "title": "Tags",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "anyOf": [
                        "type": "integer"
                        "type": "string"
            "operation_map": {
               "title": "Operation Map",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "type": "string"
         "additionalProperties": false

  • arbitrary_types_allowed: bool = True

  • extra_forbid: bool = True

  • use_enum_values: bool = True

field directory: Path [Required]
field prefix: Optional[str] = None
field statsd_info: Optional[StatsdConfig] = None
field storage_type: Literal[StorageDriver.local] = StorageDriver.local
class rohmu.object_storage.local.LocalTransfer(directory: Union[str, Path], prefix: Optional[str] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, ensure_object_store_available: bool = True)
copy_file(*, source_key: str, destination_key: str, metadata: Optional[Dict[str, Any]] = None, **_kwargs: Any) None

Performs remote copy from source key name to destination key name. Key must identify a file, trees cannot be copied with this method. If no metadata is given copies the existing metadata.

create_object_store_if_needed() None

No-op as there’s no need to create the directory ahead of time.

delete_tree(key: str) None

Delete all keys under given root key. Basic implementation works by just listing all available keys and deleting them individually but storage providers can implement more efficient logic.

get_contents_to_fileobj(key: str, fileobj_to_store_to: BinaryIO, *, byte_range: Optional[Tuple[int, int]] = None, progress_callback: Optional[Callable[[int, int], None]] = None) Dict[str, Any]

Like get_contents_to_file() but writes to an open file-like object.

get_file_size(key: str) int

Returns an int indicating the size of the file in bytes

verify_object_storage() None

No-op as there’s no need to check for the existence of the directory at setup time.


pydantic model rohmu.object_storage.config.S3ObjectStorageConfig

Show JSON schema
   "title": "S3ObjectStorageConfig",
   "type": "object",
   "properties": {
      "storage_type": {
         "title": "Storage Type",
         "default": "s3",
         "enum": [
         "type": "string"
      "statsd_info": {
         "$ref": "#/definitions/StatsdConfig"
      "region": {
         "title": "Region",
         "type": "string"
      "bucket_name": {
         "title": "Bucket Name",
         "type": "string"
      "aws_access_key_id": {
         "title": "Aws Access Key Id",
         "type": "string"
      "aws_secret_access_key": {
         "title": "Aws Secret Access Key",
         "type": "string"
      "prefix": {
         "title": "Prefix",
         "type": "string"
      "host": {
         "title": "Host",
         "type": "string"
      "port": {
         "title": "Port",
         "type": "string"
      "addressing_style": {
         "default": "path",
         "allOf": [
               "$ref": "#/definitions/S3AddressingStyle"
      "is_secure": {
         "title": "Is Secure",
         "default": false,
         "type": "boolean"
      "is_verify_tls": {
         "title": "Is Verify Tls",
         "default": false,
         "type": "boolean"
      "cert_path": {
         "title": "Cert Path",
         "type": "string",
         "format": "path"
      "segment_size": {
         "title": "Segment Size",
         "default": 40894464,
         "type": "integer"
      "encrypted": {
         "title": "Encrypted",
         "default": false,
         "type": "boolean"
      "proxy_info": {
         "$ref": "#/definitions/ProxyInfo"
      "connect_timeout": {
         "title": "Connect Timeout",
         "type": "string"
      "read_timeout": {
         "title": "Read Timeout",
         "type": "string"
      "aws_session_token": {
         "title": "Aws Session Token",
         "type": "string"
      "use_dualstack_endpoint": {
         "title": "Use Dualstack Endpoint",
         "default": true,
         "type": "boolean"
   "required": [
   "definitions": {
      "MessageFormat": {
         "title": "MessageFormat",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "StatsdConfig": {
         "title": "StatsdConfig",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "default": "",
               "type": "string"
            "port": {
               "title": "Port",
               "default": 8125,
               "type": "integer"
            "message_format": {
               "default": "telegraf",
               "allOf": [
                     "$ref": "#/definitions/MessageFormat"
            "tags": {
               "title": "Tags",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "anyOf": [
                        "type": "integer"
                        "type": "string"
            "operation_map": {
               "title": "Operation Map",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "type": "string"
         "additionalProperties": false
      "S3AddressingStyle": {
         "title": "S3AddressingStyle",
         "description": "An enumeration.",
         "enum": [
      "ProxyType": {
         "title": "ProxyType",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "ProxyInfo": {
         "title": "ProxyInfo",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "type": "string"
            "port": {
               "title": "Port",
               "type": "integer"
            "type": {
               "$ref": "#/definitions/ProxyType"
            "user": {
               "title": "User",
               "type": "string"
            "pass": {
               "title": "Pass",
               "type": "string"
         "required": [
         "additionalProperties": false

  • arbitrary_types_allowed: bool = True

  • extra_forbid: bool = True

  • use_enum_values: bool = True

field addressing_style: S3AddressingStyle = S3AddressingStyle.path
Validated by
field aws_access_key_id: Optional[str] = None
Validated by
field aws_secret_access_key: Optional[str] = None
Validated by
field aws_session_token: Optional[str] = None
Validated by
field bucket_name: Optional[str] = None
Validated by
field cert_path: Optional[Path] = None
Validated by
field connect_timeout: Optional[str] = None
Validated by
field encrypted: bool = False
Validated by
field host: Optional[str] = None
Validated by
field is_secure: bool = False
Validated by
field is_verify_tls: bool = False
Validated by
field port: Optional[str] = None
Validated by
field prefix: Optional[str] = None
Validated by
field proxy_info: Optional[ProxyInfo] = None
Validated by
field read_timeout: Optional[str] = None
Validated by
field region: str [Required]
Validated by
field segment_size: int = 40894464
Validated by
field statsd_info: Optional[StatsdConfig] = None
Validated by
field storage_type: Literal[StorageDriver.s3] = StorageDriver.s3
Validated by
field use_dualstack_endpoint: Optional[bool] = True
Validated by
validator validate_is_verify_tls_and_cert_path  »  all fields
class rohmu.object_storage.s3.S3Transfer(region: str, bucket_name: str, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, prefix: Optional[str] = None, host: Optional[str] = None, port: Optional[int] = None, addressing_style: S3AddressingStyle = S3AddressingStyle.path, is_secure: bool = False, is_verify_tls: bool = False, cert_path: Optional[Path] = None, segment_size: int = 40894464, encrypted: bool = False, proxy_info: Optional[dict[str, Union[str, int]]] = None, connect_timeout: Optional[float] = None, read_timeout: Optional[float] = None, notifier: Optional[Notifier] = None, aws_session_token: Optional[str] = None, use_dualstack_endpoint: Optional[bool] = True, statsd_info: Optional[StatsdConfig] = None, ensure_object_store_available: bool = True)
close() None

Release all resources associated with the Transfer object.

copy_file(*, source_key: str, destination_key: str, metadata: Optional[Dict[str, Any]] = None, **_kwargs: Any) None

Performs remote copy from source key name to destination key name. Key must identify a file, trees cannot be copied with this method. If no metadata is given copies the existing metadata.

create_object_store_if_needed() None

Create the backing object store if it’s needed (e.g. creating directories, buckets, etc.).

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.

delete_keys(keys: Collection[str]) None

Delete specified keys

get_contents_to_fileobj(key: str, fileobj_to_store_to: BinaryIO, *, byte_range: Optional[Tuple[int, int]] = None, progress_callback: Optional[Callable[[int, int], None]] = None) Dict[str, Any]

Like get_contents_to_file() but writes to an open file-like object.

get_file_size(key: str) int

Returns an int indicating the size of the file in bytes

upload_concurrent_chunk(upload: ConcurrentUpload, chunk_number: int, fd: BinaryIO, upload_progress_fn: Optional[Callable[[int], None]] = None) None

Synchronously uploads a chunk. Returns an ETag for the uploaded chunk. This method is thread-safe, so you can call it concurrently from multiple threads to upload different chunks. What happens if multiple threads try to upload the same chunk_number concurrently is unspecified.

verify_object_storage() None

Perform read-only operations to verify the backing object store is available and accessible.

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.


pydantic model rohmu.object_storage.config.SFTPObjectStorageConfig

Show JSON schema
   "title": "SFTPObjectStorageConfig",
   "type": "object",
   "properties": {
      "storage_type": {
         "title": "Storage Type",
         "default": "sftp",
         "enum": [
         "type": "string"
      "statsd_info": {
         "$ref": "#/definitions/StatsdConfig"
      "server": {
         "title": "Server",
         "type": "string"
      "port": {
         "title": "Port",
         "type": "integer"
      "username": {
         "title": "Username",
         "type": "string"
      "password": {
         "title": "Password",
         "type": "string"
      "private_key": {
         "title": "Private Key",
         "type": "string"
      "prefix": {
         "title": "Prefix",
         "type": "string"
   "required": [
   "definitions": {
      "MessageFormat": {
         "title": "MessageFormat",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "StatsdConfig": {
         "title": "StatsdConfig",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "default": "",
               "type": "string"
            "port": {
               "title": "Port",
               "default": 8125,
               "type": "integer"
            "message_format": {
               "default": "telegraf",
               "allOf": [
                     "$ref": "#/definitions/MessageFormat"
            "tags": {
               "title": "Tags",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "anyOf": [
                        "type": "integer"
                        "type": "string"
            "operation_map": {
               "title": "Operation Map",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "type": "string"
         "additionalProperties": false

  • arbitrary_types_allowed: bool = True

  • extra_forbid: bool = True

  • use_enum_values: bool = True

field password: Optional[str] = None
field port: int [Required]
field prefix: Optional[str] = None
field private_key: Optional[str] = None
field server: str [Required]
field statsd_info: Optional[StatsdConfig] = None
field storage_type: Literal[StorageDriver.sftp] = StorageDriver.sftp
field username: str [Required]
class rohmu.object_storage.sftp.SFTPTransfer(server: str, port: int, username: str, password: Optional[str] = None, private_key: Optional[str] = None, prefix: Optional[str] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, ensure_object_store_available: bool = True)
copy_file(*, source_key: str, destination_key: str, metadata: Optional[Dict[str, Any]] = None, **_kwargs: Any) None

Performs remote copy from source key name to destination key name. Key must identify a file, trees cannot be copied with this method. If no metadata is given copies the existing metadata.

create_object_store_if_needed() None

No-op as it’s not applicable to SFTP transfers

get_contents_to_fileobj(key: str, fileobj_to_store_to: BinaryIO, *, byte_range: Optional[Tuple[int, int]] = None, progress_callback: Optional[Callable[[int, int], None]] = None) Dict[str, Any]

Like get_contents_to_file() but writes to an open file-like object.

get_file_size(key: str) int

Returns an int indicating the size of the file in bytes

verify_object_storage() None

No-op for now. Eventually, the SFTP connection could be tested here instead of in the constructor.


pydantic model rohmu.object_storage.config.SwiftObjectStorageConfig

Show JSON schema
   "title": "SwiftObjectStorageConfig",
   "type": "object",
   "properties": {
      "storage_type": {
         "title": "Storage Type",
         "default": "swift",
         "enum": [
         "type": "string"
      "statsd_info": {
         "$ref": "#/definitions/StatsdConfig"
      "user": {
         "title": "User",
         "type": "string"
      "key": {
         "title": "Key",
         "type": "string"
      "container_name": {
         "title": "Container Name",
         "type": "string"
      "auth_url": {
         "title": "Auth Url",
         "type": "string"
      "auth_version": {
         "title": "Auth Version",
         "default": "2.0",
         "type": "string"
      "tenant_name": {
         "title": "Tenant Name",
         "type": "string"
      "segment_size": {
         "title": "Segment Size",
         "default": 3221225472,
         "type": "integer"
      "region_name": {
         "title": "Region Name",
         "type": "string"
      "user_id": {
         "title": "User Id",
         "type": "string"
      "user_domain_id": {
         "title": "User Domain Id",
         "type": "string"
      "user_domain_name": {
         "title": "User Domain Name",
         "type": "string"
      "tenant_id": {
         "title": "Tenant Id",
         "type": "string"
      "project_id": {
         "title": "Project Id",
         "type": "string"
      "project_name": {
         "title": "Project Name",
         "type": "string"
      "project_domain_id": {
         "title": "Project Domain Id",
         "type": "string"
      "project_domain_name": {
         "title": "Project Domain Name",
         "type": "string"
      "service_type": {
         "title": "Service Type",
         "type": "string"
      "endpoint_type": {
         "title": "Endpoint Type",
         "type": "string"
      "prefix": {
         "title": "Prefix",
         "type": "string"
   "required": [
   "definitions": {
      "MessageFormat": {
         "title": "MessageFormat",
         "description": "An enumeration.",
         "enum": [
         "type": "string"
      "StatsdConfig": {
         "title": "StatsdConfig",
         "type": "object",
         "properties": {
            "host": {
               "title": "Host",
               "default": "",
               "type": "string"
            "port": {
               "title": "Port",
               "default": 8125,
               "type": "integer"
            "message_format": {
               "default": "telegraf",
               "allOf": [
                     "$ref": "#/definitions/MessageFormat"
            "tags": {
               "title": "Tags",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "anyOf": [
                        "type": "integer"
                        "type": "string"
            "operation_map": {
               "title": "Operation Map",
               "default": {},
               "type": "object",
               "additionalProperties": {
                  "type": "string"
         "additionalProperties": false

  • arbitrary_types_allowed: bool = True

  • extra_forbid: bool = True

  • use_enum_values: bool = True

field auth_url: str [Required]
field auth_version: str = '2.0'
field container_name: str [Required]
field endpoint_type: Optional[str] = None
field key: str [Required]
field prefix: Optional[str] = None
field project_domain_id: Optional[str] = None
field project_domain_name: Optional[str] = None
field project_id: Optional[str] = None
field project_name: Optional[str] = None
field region_name: Optional[str] = None
field segment_size: int = 3221225472
field service_type: Optional[str] = None
field statsd_info: Optional[StatsdConfig] = None
field storage_type: Literal[StorageDriver.swift] = StorageDriver.swift
field tenant_id: Optional[str] = None
field tenant_name: Optional[str] = None
field user: str [Required]
field user_domain_id: Optional[str] = None
field user_domain_name: Optional[str] = None
field user_id: Optional[str] = None
class rohmu.object_storage.swift.SwiftTransfer(*, user: str, key: str, container_name: str, auth_url: str, auth_version: str = '2.0', tenant_name: Optional[str] = None, prefix: Optional[str] = None, segment_size: int = 3221225472, region_name: Optional[str] = None, user_id: Optional[str] = None, user_domain_id: Optional[str] = None, user_domain_name: Optional[str] = None, tenant_id: Optional[str] = None, project_id: Optional[str] = None, project_name: Optional[str] = None, project_domain_id: Optional[str] = None, project_domain_name: Optional[str] = None, service_type: Optional[str] = None, endpoint_type: Optional[str] = None, notifier: Optional[Notifier] = None, statsd_info: Optional[StatsdConfig] = None, ensure_object_store_available: bool = True)
copy_file(*, source_key: str, destination_key: str, metadata: Optional[Dict[str, Any]] = None, **_kwargs: Any) None

Performs remote copy from source key name to destination key name. Key must identify a file, trees cannot be copied with this method. If no metadata is given copies the existing metadata.

create_object_store_if_needed() None

Create the backing object store if it’s needed (e.g. creating directories, buckets, etc.).

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.

get_contents_to_fileobj(key: str, fileobj_to_store_to: BinaryIO, *, byte_range: Optional[Tuple[int, int]] = None, progress_callback: Optional[Callable[[int, int], None]] = None) Dict[str, Any]

Like get_contents_to_file() but writes to an open file-like object.

get_file_size(key: str) int

Returns an int indicating the size of the file in bytes

verify_object_storage() None

Perform read-only operations to verify the backing object store is available and accessible.

Raise Rohmu-specific error TransferObjectStoreInitializationError to abstract away implementation-specific details.


Rohmu - exception classes

exception rohmu.errors.ConcurrentUploadError

A generic error related to concurrent uploads

exception rohmu.errors.Error

Generic exception

exception rohmu.errors.FileNotFoundFromStorageError

File not found from remote storage

exception rohmu.errors.InvalidByteRangeError

Error specifying a content-range in a request

exception rohmu.errors.InvalidConfigurationError

Invalid configuration

exception rohmu.errors.InvalidTransferError

You tried to access a transfer object that you already returned to the pool

exception rohmu.errors.LocalFileIsRemoteFileError

File transfer operation source and destination point to the same file

exception rohmu.errors.MaybeRecoverableError

An error that may be recoverable

exception rohmu.errors.MissingLibraryError

Missing dependency library

exception rohmu.errors.StorageError

Storage exception

exception rohmu.errors.TransferObjectStoreInitializationError

Raised when a transient network or permission issue does not allow us to validate access to the object store

exception rohmu.errors.TransferObjectStoreMissingError

Raised when we know for sure the bucket is missing

exception rohmu.errors.TransferObjectStorePermissionError

Raised when a permission issue does not allow us to validate access to the object store

exception rohmu.errors.UninitializedError

Error trying to access an uninitialized resource