Source code for maestral.client

This modules contains the Dropbox API client. It wraps calls to the Dropbox Python SDK
and handles exceptions, chunked uploads or downloads, etc.

# system imports
import errno
import os
import os.path as osp
import time
import logging
import contextlib
from datetime import datetime, timezone
from typing import (

# external imports
import requests
from dropbox import (
from dropbox.common import PathRoot
from dropbox.stone_validators import ValidationError

# local imports
from . import __version__
from .oauth import OAuth2Session
from .errors import (
from .config import MaestralState
from .constants import DROPBOX_APP_KEY
from .utils import natural_size, chunks, clamp
from .utils.path import fs_max_lengths_for_path, opener_no_symlink

    from .database import SyncEvent

__all__ = [

LocalError = Union[MaestralApiError, OSError]
PaginationResultType = Union[sharing.ListSharedLinksResult, files.ListFolderResult]
FT = TypeVar("FT", bound=Callable[..., Any])

_major_minor_version = ".".join(__version__.split(".")[:2])
USER_AGENT = f"Maestral/v{_major_minor_version}"


[docs]@contextlib.contextmanager def convert_api_errors( dbx_path: Optional[str] = None, local_path: Optional[str] = None ) -> Iterator[None]: """ A context manager that catches and re-raises instances of :class:`OSError` and :class:`dropbox.exceptions.DropboxException` as :class:`maestral.errors.MaestralApiError` or :class:`ConnectionError`. :param dbx_path: Dropbox path associated with the error. :param local_path: Local path associated with the error. """ try: yield except (exceptions.DropboxException, ValidationError) as exc: raise dropbox_to_maestral_error(exc, dbx_path, local_path) # Catch connection errors first, they may inherit from OSError. except CONNECTION_ERRORS: raise DropboxConnectionError( "Cannot connect to Dropbox", "Please check you internet connection and try again later.", ) except OSError as exc: if exc.errno == errno.EPROTOTYPE: # Can occur on macOS, see raise DropboxConnectionError( "Cannot connect to Dropbox", "Please check you internet connection and try again later.", ) else: raise os_to_maestral_error(exc, dbx_path, local_path)
[docs]class DropboxClient: """Client for the Dropbox SDK This client defines basic methods to wrap Dropbox Python SDK calls, such as creating, moving, modifying and deleting files and folders on Dropbox and downloading files from Dropbox. All Dropbox SDK exceptions, OSErrors from the local file system API and connection errors will be caught and reraised as a subclass of :class:`maestral.errors.MaestralApiError`. This class can be used as a context manager to clean up any network resources from the API requests. :Example: >>> from maestral.client import DropboxClient >>> with DropboxClient("maestral") as client: ... res = client.list_folder("/") >>> print(res.entries) :param config_name: Name of config file and state file to use. :param timeout: Timeout for individual requests. Defaults to 100 sec if not given. :param session: Optional requests session to use. If not given, a new session will be created with :func:`dropbox.dropbox_client.create_session`. """ SDK_VERSION: str = "2.0" _dbx: Optional[Dropbox] def __init__( self, config_name: str, timeout: float = 100, session: Optional[requests.Session] = None, ) -> None: self.config_name = config_name self.auth = OAuth2Session(config_name) self._state = MaestralState(config_name) self._logger = logging.getLogger(__name__) self._timeout = timeout self._session = session or create_session() self._backoff_until = 0 self._dbx: Optional[Dropbox] = None self._dbx_base: Optional[Dropbox] = None self._cached_account_info: Optional[users.FullAccount] = None self._namespace_id = self._state.get("account", "path_root_nsid") self._is_team_space = self._state.get("account", "path_root_type") == "team" # ---- Linking API ----------------------------------------------------------------- @property def dbx_base(self) -> Dropbox: """The underlying Dropbox SDK instance without namespace headers.""" if not self.linked: raise NotLinkedError( "No auth token set", "Please link a Dropbox account first." ) return self._dbx_base @property def dbx(self) -> Dropbox: """The underlying Dropbox SDK instance with namespace headers.""" if not self.linked: raise NotLinkedError( "No auth token set", "Please link a Dropbox account first." ) return self._dbx @property def linked(self) -> bool: """ Indicates if the client is linked to a Dropbox account (read only). This will block until the user's keyring is unlocked to load the saved auth token. :raises KeyringAccessError: if keyring access fails. """ if self._dbx and self._dbx_base: return True elif self.auth.linked: # This will trigger keyring access on first call. if self.auth.token_access_type == "legacy": self._init_sdk_with_token(access_token=self.auth.access_token) else: self._init_sdk_with_token(refresh_token=self.auth.refresh_token) return True else: return False
[docs] def get_auth_url(self) -> str: """ Returns a URL to authorize access to a Dropbox account. To link a Dropbox account, retrieve an auth token from the URL and link Maestral by calling :meth:`link` with the provided token. :returns: URL to retrieve an OAuth token. """ return self.auth.get_auth_url()
def _init_sdk_with_token( self, refresh_token: Optional[str] = None, access_token: Optional[str] = None, access_token_expiration: Optional[datetime] = None, ) -> None: """ Sets the access tokens for the Dropbox API. This will create a new SDK instance with new tokens. :param refresh_token: Long-lived refresh token to generate new access tokens. :param access_token: Short-lived auth token. :param access_token_expiration: Expiry time of auth token. """ if refresh_token or access_token: # Initialise Dropbox SDK. self._dbx_base = Dropbox( oauth2_refresh_token=refresh_token, oauth2_access_token=access_token, oauth2_access_token_expiration=access_token_expiration, app_key=DROPBOX_APP_KEY, session=self._session, user_agent=USER_AGENT, timeout=self._timeout, ) # If namespace_id was given, use the corresponding namespace, otherwise # default to the home namespace. if self._namespace_id: root_path = PathRoot.root(self._namespace_id) self._dbx = self._dbx_base.with_path_root(root_path) else: self._dbx = self._dbx_base else: self._dbx_base = None self._dbx = None @property def account_info(self) -> users.FullAccount: """Returns cached account info. Use :meth:`get_account_info` to get the latest account info from Dropbox servers.""" if not self._cached_account_info: return self.get_account_info() else: return self._cached_account_info @property def namespace_id(self) -> str: """The namespace ID of the path root currently used by the DropboxClient. All file paths will be interpreted as relative to the root namespace. Use :meth:`update_path_root` to update the root namespace after the user joins or leaves a team with a Team Space.""" return self._namespace_id @property def is_team_space(self) -> bool: """Whether the user's Dropbox uses a Team Space. Use :meth:`update_path_root` to update the root namespace after the user joins or eaves a team with a Team Space.""" return self._is_team_space # ---- Session management ----------------------------------------------------------
[docs] def close(self) -> None: """Cleans up all resources like the request session/network connection.""" if self._dbx: self._dbx.close()
def __enter__(self) -> "DropboxClient": return self def __exit__(self, *args) -> None: self.close()
[docs] def clone( self, config_name: Optional[str] = None, timeout: Optional[float] = None, session: Optional[requests.Session] = None, ) -> "DropboxClient": """ Creates a new copy of the Dropbox client with the same defaults unless modified by arguments to :meth:`clone`. :param config_name: Name of config file and state file to use. :param timeout: Timeout for individual requests. :param session: Requests session to use. :returns: A new instance of DropboxClient. """ config_name = config_name or self.config_name timeout = timeout or self._timeout session = session or self._session client = self.__class__(config_name, timeout, session) if self._dbx: client._dbx = self._dbx.clone(session=session) if self._dbx_base: client._dbx_base = self._dbx_base.clone(session=session) return client
[docs] def clone_with_new_session(self) -> "DropboxClient": """ Creates a new copy of the Dropbox client with the same defaults but a new requests session. :returns: A new instance of DropboxClient. """ return self.clone(session=create_session())
[docs] def update_path_root(self, root_info: Optional[common.RootInfo] = None) -> None: """ Updates the root path for the Dropbox client. All files paths given as arguments to API calls such as :meth:`list_folder` or :meth:`get_metadata` will be interpreted as relative to the root path. All file paths returned by API calls, for instance in file metadata, will be relative to this root path. The root namespace will change when the user joins or leaves a Dropbox Team with Team Spaces. If this happens, API calls using the old root namespace will raise a :class:`PathRootError`. Use this method to update to the new root namespace. See and for more information on Dropbox Team namespaces and path root headers in API calls. .. note:: We don't automatically switch root namespaces because API users may want to take action when the path root has changed before making further API calls. Be prepared to handle :class:`PathRootError`s and act accordingly. :param root_info: Optional :class:`dropbox.common.RootInfo` describing the path root. If not given, the latest root info will be fetched from Dropbox servers. """ with convert_api_errors(): if not root_info: account_info = self.get_account_info() root_info = account_info.root_info root_nsid = root_info.root_namespace_id path_root = PathRoot.root(root_nsid) self._dbx = self.dbx_base.with_path_root(path_root) if isinstance(root_info, common.UserRootInfo): actual_root_type = "user" actual_home_path = "" elif isinstance(root_info, common.TeamRootInfo): actual_root_type = "team" actual_home_path = root_info.home_path else: raise MaestralApiError( "Unknown root namespace type", f"Got {root_info!r} but expected UserRootInfo or TeamRootInfo.", ) self._namespace_id = root_nsid self._is_team_space = actual_root_type == "team" self._state.set("account", "path_root_nsid", root_nsid) self._state.set("account", "path_root_type", actual_root_type) self._state.set("account", "home_path", actual_home_path) self._logger.debug("Path root type: %s", actual_root_type) self._logger.debug("Path root nsid: %s", root_info.root_namespace_id) self._logger.debug("User home path: %s", actual_home_path)
# ---- SDK wrappers ----------------------------------------------------------------
[docs] def get_account_info(self, dbid: Optional[str] = None) -> users.FullAccount: """ Gets current account information. :param dbid: Dropbox ID of account. If not given, will get the info of the currently linked account. :returns: Account info. """ with convert_api_errors(): if dbid: res = self.dbx_base.users_get_account(dbid) else: res = self.dbx_base.users_get_current_account() if not dbid: # Save our own account info to config. if res.account_type.is_basic(): account_type = "basic" elif res.account_type.is_business(): account_type = "business" elif res.account_type.is_pro(): account_type = "pro" else: account_type = "" self._state.set("account", "email", self._state.set("account", "display_name", self._state.set("account", "abbreviated_name", self._state.set("account", "type", account_type) if not self._namespace_id: home_nsid = res.root_info.home_namespace_id self._namespace_id = home_nsid self._state.set("account", "path_root_nsid", home_nsid) self._cached_account_info = res return res
[docs] def get_space_usage(self) -> users.SpaceUsage: """ :returns: The space usage of the currently linked account. """ with convert_api_errors(): res = self.dbx_base.users_get_space_usage() # Query space usage type. if res.allocation.is_team(): usage_type = "team" elif res.allocation.is_individual(): usage_type = "individual" else: usage_type = "" # Generate space usage string. if res.allocation.is_team(): used = res.allocation.get_team().used allocated = res.allocation.get_team().allocated else: used = res.used allocated = res.allocation.get_individual().allocated percent = used / allocated space_usage = f"{percent:.1%} of {natural_size(allocated)} used" # Save results to config. self._state.set("account", "usage", space_usage) self._state.set("account", "usage_type", usage_type) return res
[docs] def get_metadata(self, dbx_path: str, **kwargs) -> Optional[files.Metadata]: """ Gets metadata for an item on Dropbox or returns ``False`` if no metadata is available. Keyword arguments are passed on to Dropbox SDK files_get_metadata call. :param dbx_path: Path of folder on Dropbox. :param kwargs: Keyword arguments for Dropbox SDK files_get_metadata. :returns: Metadata of item at the given path or ``None`` if item cannot be found. """ try: with convert_api_errors(dbx_path=dbx_path): return self.dbx.files_get_metadata(dbx_path, **kwargs) except (NotFoundError, PathError): return None
[docs] def list_revisions( self, dbx_path: str, mode: str = "path", limit: int = 10 ) -> files.ListRevisionsResult: """ Lists all file revisions for the given file. :param dbx_path: Path to file on Dropbox. :param mode: Must be 'path' or 'id'. If 'id', specify the Dropbox file ID instead of the file path to get revisions across move and rename events. :param limit: Maximum number of revisions to list. :returns: File revision history. """ with convert_api_errors(dbx_path=dbx_path): mode = files.ListRevisionsMode(mode) return self.dbx.files_list_revisions(dbx_path, mode=mode, limit=limit)
[docs] def restore(self, dbx_path: str, rev: str) -> files.FileMetadata: """ Restore an old revision of a file. :param dbx_path: The path to save the restored file. :param rev: The revision to restore. Old revisions can be listed with :meth:`list_revisions`. :returns: Metadata of restored file. """ with convert_api_errors(dbx_path=dbx_path): return self.dbx.files_restore(dbx_path, rev)
[docs] def download( self, dbx_path: str, local_path: str, sync_event: Optional["SyncEvent"] = None, **kwargs, ) -> files.FileMetadata: """ Downloads a file from Dropbox to given local path. :param dbx_path: Path to file on Dropbox or rev number. :param local_path: Path to local download destination. :param sync_event: If given, the sync event will be updated with the number of downloaded bytes. :param kwargs: Keyword arguments for the Dropbox API files_download endpoint. :returns: Metadata of downloaded item. """ with convert_api_errors(dbx_path=dbx_path): dst_path_directory = osp.dirname(local_path) try: os.makedirs(dst_path_directory) except FileExistsError: pass md, http_resp = self.dbx.files_download(dbx_path, **kwargs) if md.symlink_info is not None: # Don't download but reproduce symlink locally. http_resp.close() try: os.unlink(local_path) except FileNotFoundError: pass os.symlink(, local_path) else: chunksize = 2 ** 13 with open(local_path, "wb", opener=opener_no_symlink) as f: with contextlib.closing(http_resp): for c in http_resp.iter_content(chunksize): f.write(c) if sync_event: sync_event.completed = f.tell() # Dropbox SDK provides naive datetime in UTC. client_mod = md.client_modified.replace(tzinfo=timezone.utc) server_mod = md.server_modified.replace(tzinfo=timezone.utc) # Enforce client_modified < server_modified. timestamp = min(client_mod.timestamp(), server_mod.timestamp(), time.time()) # Set mtime of downloaded file. os.utime(local_path, (time.time(), timestamp), follow_symlinks=False) return md
[docs] def upload( self, local_path: str, dbx_path: str, chunk_size: int = 5 * 10 ** 6, sync_event: Optional["SyncEvent"] = None, **kwargs, ) -> files.FileMetadata: """ Uploads local file to Dropbox. :param local_path: Path of local file to upload. :param dbx_path: Path to save file on Dropbox. :param kwargs: Keyword arguments for Dropbox SDK files_upload. :param chunk_size: Maximum size for individual uploads. If larger than 150 MB, it will be set to 150 MB. :param sync_event: If given, the sync event will be updated with the number of downloaded bytes. :returns: Metadata of uploaded file. """ chunk_size = clamp(chunk_size, 10 ** 5, 150 * 10 ** 6) with convert_api_errors(dbx_path=dbx_path, local_path=local_path): stat = os.lstat(local_path) # Dropbox SDK takes naive datetime in UTC/ mtime_dt = datetime.utcfromtimestamp(stat.st_mtime) if stat.st_size <= chunk_size: with open(local_path, "rb", opener=opener_no_symlink) as f: md = self.dbx.files_upload(, dbx_path, client_modified=mtime_dt, **kwargs ) if sync_event: sync_event.completed = f.tell() return md else: # Note: We currently do not support resuming interrupted uploads. # Dropbox keeps upload sessions open for 48h so this could be done in # the future. with open(local_path, "rb", opener=opener_no_symlink) as f: data = session_start = self.dbx.files_upload_session_start(data) uploaded = f.tell() cursor = files.UploadSessionCursor( session_id=session_start.session_id, offset=uploaded ) commit = files.CommitInfo( path=dbx_path, client_modified=mtime_dt, **kwargs ) if sync_event: sync_event.completed = uploaded while True: try: if stat.st_size - f.tell() <= chunk_size: # Finish upload session and return metadata. data = md = self.dbx.files_upload_session_finish( data, cursor, commit ) if sync_event: sync_event.completed = sync_event.size return md else: # Append to upload session. data = self.dbx.files_upload_session_append_v2(data, cursor) uploaded = f.tell() cursor.offset = uploaded if sync_event: sync_event.completed = uploaded except exceptions.DropboxException as exc: error = getattr(exc, "error", None) if ( isinstance(error, files.UploadSessionFinishError) and error.is_lookup_failed() ): lookup_error = error.get_lookup_failed() elif isinstance(error, files.UploadSessionLookupError): lookup_error = error else: raise exc if lookup_error.is_incorrect_offset(): # Reset position in file. offset_error = lookup_error.get_incorrect_offset() cursor.offset = f.tell() else: raise exc
[docs] def remove(self, dbx_path: str, **kwargs) -> files.Metadata: """ Removes a file / folder from Dropbox. :param dbx_path: Path to file on Dropbox. :param kwargs: Keyword arguments for the Dropbox API files_delete_v2 endpoint. :returns: Metadata of deleted item. """ with convert_api_errors(dbx_path=dbx_path): res = self.dbx.files_delete_v2(dbx_path, **kwargs) return res.metadata
[docs] def remove_batch( self, entries: List[Tuple[str, str]], batch_size: int = 900 ) -> List[Union[files.Metadata, MaestralApiError]]: """ Deletes multiple items on Dropbox in a batch job. :param entries: List of Dropbox paths and "rev"s to delete. If a "rev" is not None, the file will only be deleted if it matches the rev on Dropbox. This is not supported when deleting a folder. :param batch_size: Number of items to delete in each batch. Dropbox allows batches of up to 1,000 items. Larger values will be capped automatically. :returns: List of Metadata for deleted items or SyncErrors for failures. Results will be in the same order as the original input. """ batch_size = clamp(batch_size, 1, 1000) res_entries = [] result_list = [] # Up two ~ 1,000 entries allowed per batch: # for chunk in chunks(entries, n=batch_size): arg = [files.DeleteArg(e[0], e[1]) for e in chunk] with convert_api_errors(): res = self.dbx.files_delete_batch(arg) if res.is_complete(): batch_res = res.get_complete() res_entries.extend(batch_res.entries) elif res.is_async_job_id(): async_job_id = res.get_async_job_id() time.sleep(0.5) with convert_api_errors(): res = self.dbx.files_delete_batch_check(async_job_id) check_interval = round(len(chunk) / 100, 1) while res.is_in_progress(): time.sleep(check_interval) with convert_api_errors(): res = self.dbx.files_delete_batch_check(async_job_id) if res.is_complete(): batch_res = res.get_complete() res_entries.extend(batch_res.entries) elif res.is_failed(): error = res.get_failed() if error.is_too_many_write_operations(): title = "Could not delete items" text = ( "There are too many write operations happening in your " "Dropbox. Please try again later." ) raise SyncError(title, text) for i, entry in enumerate(res_entries): if entry.is_success(): result_list.append(entry.get_success().metadata) elif entry.is_failure(): exc = exceptions.ApiError( error=entry.get_failure(), user_message_text="", user_message_locale="", request_id="", ) sync_err = dropbox_to_maestral_error(exc, dbx_path=entries[i][0]) result_list.append(sync_err) return result_list
[docs] def move(self, dbx_path: str, new_path: str, **kwargs) -> files.Metadata: """ Moves / renames files or folders on Dropbox. :param dbx_path: Path to file/folder on Dropbox. :param new_path: New path on Dropbox to move to. :param kwargs: Keyword arguments for the Dropbox API files_move_v2 endpoint. :returns: Metadata of moved item. """ with convert_api_errors(dbx_path=new_path): res = self.dbx.files_move_v2( dbx_path, new_path, allow_shared_folder=True, allow_ownership_transfer=True, **kwargs, ) return res.metadata
[docs] def make_dir(self, dbx_path: str, **kwargs) -> files.FolderMetadata: """ Creates a folder on Dropbox. :param dbx_path: Path of Dropbox folder. :param kwargs: Keyword arguments for the Dropbox API files_create_folder_v2 endpoint. :returns: Metadata of created folder. """ with convert_api_errors(dbx_path=dbx_path): res = self.dbx.files_create_folder_v2(dbx_path, **kwargs) return res.metadata
[docs] def make_dir_batch( self, dbx_paths: List[str], batch_size: int = 900, **kwargs ) -> List[Union[files.Metadata, MaestralApiError]]: """ Creates multiple folders on Dropbox in a batch job. :param dbx_paths: List of dropbox folder paths. :param batch_size: Number of folders to create in each batch. Dropbox allows batches of up to 1,000 folders. Larger values will be capped automatically. :param kwargs: Keyword arguments for the Dropbox API files/create_folder_batch endpoint. :returns: List of Metadata for created folders or SyncError for failures. Entries will be in the same order as given paths. """ batch_size = clamp(batch_size, 1, 1000) entries = [] result_list = [] with convert_api_errors(): # Up two ~ 1,000 entries allowed per batch: # for chunk in chunks(dbx_paths, n=batch_size): res = self.dbx.files_create_folder_batch(chunk, **kwargs) if res.is_complete(): batch_res = res.get_complete() entries.extend(batch_res.entries) elif res.is_async_job_id(): async_job_id = res.get_async_job_id() time.sleep(0.5) res = self.dbx.files_create_folder_batch_check(async_job_id) check_interval = round(len(chunk) / 100, 1) while res.is_in_progress(): time.sleep(check_interval) res = self.dbx.files_create_folder_batch_check(async_job_id) if res.is_complete(): batch_res = res.get_complete() entries.extend(batch_res.entries) elif res.is_failed(): error = res.get_failed() if error.is_too_many_files(): res_list = self.make_dir_batch( chunk, batch_size=round(batch_size / 2), **kwargs ) result_list.extend(res_list) for i, entry in enumerate(entries): if entry.is_success(): result_list.append(entry.get_success().metadata) elif entry.is_failure(): exc = exceptions.ApiError( error=entry.get_failure(), user_message_text="", user_message_locale="", request_id="", ) sync_err = dropbox_to_maestral_error(exc, dbx_path=dbx_paths[i]) result_list.append(sync_err) return result_list
[docs] def share_dir(self, dbx_path: str, **kwargs) -> sharing.SharedFolderMetadata: """ Converts a Dropbox folder to a shared folder. Creates the folder if it does not exist. :param dbx_path: Path of Dropbox folder. :param kwargs: Keyword arguments for the Dropbox API files_create_folder_v2 endpoint. :returns: Metadata of shared folder. """ dbx_path = "" if dbx_path == "/" else dbx_path with convert_api_errors(dbx_path=dbx_path): res = self.dbx.sharing_share_folder(dbx_path, **kwargs) if res.is_complete(): return res.get_complete() elif res.is_async_job_id(): async_job_id = res.get_async_job_id() time.sleep(0.2) with convert_api_errors(dbx_path=dbx_path): job_status = self.dbx.sharing_check_share_job_status(async_job_id) while job_status.is_in_progress(): time.sleep(0.2) with convert_api_errors(dbx_path=dbx_path): job_status = self.dbx.sharing_check_share_job_status(async_job_id) if job_status.is_complete(): return job_status.get_complete() elif job_status.is_failed(): error = job_status.get_failed() exc = exceptions.ApiError( error=error, user_message_locale="", user_message_text="", request_id="", ) raise dropbox_to_maestral_error(exc)
[docs] def get_latest_cursor( self, dbx_path: str, include_non_downloadable_files: bool = False, **kwargs ) -> str: """ Gets the latest cursor for the given folder and subfolders. :param dbx_path: Path of folder on Dropbox. :param include_non_downloadable_files: If ``True``, files that cannot be downloaded (at the moment only G-suite files on Dropbox) will be included. :param kwargs: Additional keyword arguments for Dropbox API files/list_folder/get_latest_cursor endpoint. :returns: The latest cursor representing a state of a folder and its subfolders. """ dbx_path = "" if dbx_path == "/" else dbx_path with convert_api_errors(dbx_path=dbx_path): res = self.dbx.files_list_folder_get_latest_cursor( dbx_path, include_non_downloadable_files=include_non_downloadable_files, recursive=True, **kwargs, ) return res.cursor
[docs] def list_folder( self, dbx_path: str, max_retries_on_timeout: int = 4, include_non_downloadable_files: bool = False, **kwargs, ) -> files.ListFolderResult: """ Lists the contents of a folder on Dropbox. Similar to :meth:`list_folder_iterator` but returns all entries in a single :class:`dropbox.files.ListFolderResult` instance. :param dbx_path: Path of folder on Dropbox. :param max_retries_on_timeout: Number of times to try again if Dropbox servers do not respond within the timeout. Occasional timeouts may occur for very large Dropbox folders. :param include_non_downloadable_files: If ``True``, files that cannot be downloaded (at the moment only G-suite files on Dropbox) will be included. :param kwargs: Additional keyword arguments for Dropbox API files/list_folder endpoint. :returns: Content of given folder. """ iterator = self.list_folder_iterator( dbx_path, max_retries_on_timeout, include_non_downloadable_files, **kwargs, ) return self.flatten_results(list(iterator), attribute_name="entries")
[docs] def list_folder_iterator( self, dbx_path: str, max_retries_on_timeout: int = 4, include_non_downloadable_files: bool = False, **kwargs, ) -> Iterator[files.ListFolderResult]: """ Lists the contents of a folder on Dropbox. Returns an iterator yielding :class:`dropbox.files.ListFolderResult` instances. The number of entries returned in each iteration corresponds to the number of entries returned by a single Dropbox API call and will be typically around 500. :param dbx_path: Path of folder on Dropbox. :param max_retries_on_timeout: Number of times to try again if Dropbox servers do not respond within the timeout. Occasional timeouts may occur for very large Dropbox folders. :param include_non_downloadable_files: If ``True``, files that cannot be downloaded (at the moment only G-suite files on Dropbox) will be included. :param kwargs: Additional keyword arguments for the Dropbox API files/list_folder endpoint. :returns: Iterator over content of given folder. """ with convert_api_errors(dbx_path): dbx_path = "" if dbx_path == "/" else dbx_path res = self.dbx.files_list_folder( dbx_path, include_non_downloadable_files=include_non_downloadable_files, **kwargs, ) yield res while res.has_more: attempt = 0 while True: try: res = self.dbx.files_list_folder_continue(res.cursor) yield res break except requests.exceptions.ReadTimeout: attempt += 1 if attempt <= max_retries_on_timeout: time.sleep(5.0) else: raise
[docs] def wait_for_remote_changes(self, last_cursor: str, timeout: int = 40) -> bool: """ Waits for remote changes since ``last_cursor``. Call this method after starting the Dropbox client and periodically to get the latest updates. :param last_cursor: Last to cursor to compare for changes. :param timeout: Seconds to wait until timeout. Must be between 30 and 480. The Dropbox API will add a random jitter of up to 60 sec to this value. :returns: ``True`` if changes are available, ``False`` otherwise. """ if not 30 <= timeout <= 480: raise ValueError("Timeout must be in range [30, 480]") # Honour last request to back off. time_to_backoff = max(self._backoff_until - time.time(), 0) time.sleep(time_to_backoff) with convert_api_errors(): res = self.dbx.files_list_folder_longpoll(last_cursor, timeout=timeout) # Keep track of last longpoll, back off if requested by API. if res.backoff: self._logger.debug("Backoff requested for %s sec", res.backoff) self._backoff_until = time.time() + res.backoff + 5.0 else: self._backoff_until = 0 return res.changes
[docs] def list_remote_changes(self, last_cursor: str) -> files.ListFolderResult: """ Lists changes to remote Dropbox since ``last_cursor``. Same as :meth:`list_remote_changes_iterator` but fetches all changes first and returns a single :class:`dropbox.files.ListFolderResult`. This may be useful if you want to fetch all changes in advance before starting to process them. :param last_cursor: Last to cursor to compare for changes. :returns: Remote changes since given cursor. """ iterator = self.list_remote_changes_iterator(last_cursor) return self.flatten_results(list(iterator), attribute_name="entries")
[docs] def list_remote_changes_iterator( self, last_cursor: str ) -> Iterator[files.ListFolderResult]: """ Lists changes to the remote Dropbox since ``last_cursor``. Returns an iterator yielding :class:`dropbox.files.ListFolderResult` instances. The number of entries returned in each iteration corresponds to the number of entries returned by a single Dropbox API call and will be typically around 500. Call this after :meth:`wait_for_remote_changes` returns ``True``. :param last_cursor: Last to cursor to compare for changes. :returns: Iterator over remote changes since given cursor. """ with convert_api_errors(): result = self.dbx.files_list_folder_continue(last_cursor) yield result while result.has_more: result = self.dbx.files_list_folder_continue(result.cursor) yield result
[docs] @staticmethod def flatten_results( results: List[PaginationResultType], attribute_name: str ) -> PaginationResultType: """ Flattens a list of Dropbox API results from a pagination to a single result with the cursor of the last result in the list. :param results: List of :results to flatten. :param attribute_name: Name of attribute to flatten. :returns: Flattened result. """ all_entries = [] for result in results: all_entries += getattr(result, attribute_name) kwargs = { attribute_name: all_entries, "cursor": results[-1].cursor, "has_more": False, } result_cls = type(results[0]) results_flattened = result_cls(**kwargs) return results_flattened
# ==== Conversion functions to generate error messages and types =======================
[docs]def os_to_maestral_error( exc: OSError, dbx_path: Optional[str] = None, local_path: Optional[str] = None ) -> LocalError: """ Converts a :class:`OSError` to a :class:`maestral.errors.MaestralApiError` and tries to add a reasonably informative error title and message. :param exc: Original OSError. :param dbx_path: Dropbox path associated with the error. :param local_path: Local path associated with the error. :returns: Converted exception. """ title = "Could not sync file or folder" err_cls: Type[MaestralApiError] if isinstance(exc, PermissionError): err_cls = InsufficientPermissionsError # subclass of SyncError text = "Insufficient read or write permissions for this location." elif isinstance(exc, FileNotFoundError): err_cls = NotFoundError # subclass of SyncError text = "The given path does not exist." elif isinstance(exc, FileExistsError): err_cls = ConflictError # subclass of SyncError title = "Could not download file" text = "There already is an item at the given path." elif isinstance(exc, IsADirectoryError): err_cls = IsAFolderError # subclass of SyncError title = "Could not create local file" text = "The given path refers to a folder." elif isinstance(exc, NotADirectoryError): err_cls = NotAFolderError # subclass of SyncError title = "Could not create local folder" text = "The given path refers to a file." elif exc.errno == errno.ENAMETOOLONG: err_cls = PathError # subclass of SyncError title = "Could not create local file" try: max_name, max_path = fs_max_lengths_for_path(local_path or "/") except RuntimeError: text = "The file name or path is too long." else: text = ( "The file name is too long. File names and paths must be shorter " f"than {max_name} and {max_path} characters on your file system, " f"respectively." ) elif exc.errno == errno.EINVAL: err_cls = PathError # subclass of SyncError title = "Could not create local file" text = ( "The file name contains characters which are not allowed on your file " "system. This could be for instance a colon or a trailing period." ) elif exc.errno == errno.EFBIG: err_cls = FileSizeError # subclass of SyncError title = "Could not download file" text = "The file size too large." elif exc.errno == errno.ELOOP: err_cls = SymlinkError # subclass of SyncError title = "Cannot upload symlink" text = "Symlinks are not currently supported by the public Dropbox API." elif exc.errno == errno.ENOSPC: err_cls = InsufficientSpaceError # subclass of SyncError title = "Could not download file" text = "There is not enough space left on the selected drive." elif exc.errno == errno.ENOMEM: err_cls = OutOfMemoryError # subclass of MaestralApiError text = "Out of memory. Please reduce the number of memory consuming processes." elif exc.errno is not None: err_cls = FileReadError text = f"Could not access file. Errno {exc.errno}: {os.strerror(exc.errno)}." else: err_cls = MaestralApiError text = str(exc) local_path = local_path or exc.filename maestral_exc = err_cls(title, text, dbx_path=dbx_path, local_path=local_path) maestral_exc.__cause__ = exc return maestral_exc
[docs]def dropbox_to_maestral_error( exc: Union[exceptions.DropboxException, ValidationError, requests.HTTPError], dbx_path: Optional[str] = None, local_path: Optional[str] = None, ) -> MaestralApiError: """ Converts a Dropbox SDK exception to a :class:`maestral.errors.MaestralApiError` and tries to add a reasonably informative error title and message. :param exc: Dropbox SDK exception.. :param dbx_path: Dropbox path associated with the error. :param local_path: Local path associated with the error. :returns: Converted exception. """ title = "An unexpected error occurred" text = "Please contact the developer with the traceback information from the logs." err_cls = MaestralApiError # ---- Dropbox API Errors ---------------------------------------------------------- if isinstance(exc, exceptions.ApiError): error = exc.error if isinstance(error, files.RelocationError): title = "Could not move file or folder" if error.is_cant_copy_shared_folder(): text = "Shared folders can’t be copied." err_cls = SyncError elif error.is_cant_move_folder_into_itself(): text = "You cannot move a folder into itself." err_cls = ConflictError elif error.is_cant_move_shared_folder(): text = "You cannot move the shared folder to the given destination." err_cls = PathError elif error.is_cant_nest_shared_folder(): text = ( "Your move operation would result in nested shared folders. " "This is not allowed." ) err_cls = PathError elif error.is_cant_transfer_ownership(): text = ( "Your move operation would result in an ownership transfer. " "Maestral does not currently support this. Please carry out " "the move on the Dropbox website instead." ) err_cls = PathError elif error.is_duplicated_or_nested_paths(): text = ( "There are duplicated/nested paths among the target and " "destination folders." ) err_cls = PathError elif error.is_from_lookup(): lookup_error = error.get_from_lookup() text, err_cls = _get_lookup_error_msg(lookup_error) elif error.is_from_write(): write_error = error.get_from_write() text, err_cls = _get_write_error_msg(write_error) elif error.is_insufficient_quota(): text = ( "You do not have enough space on Dropbox to move " "or copy the files." ) err_cls = InsufficientSpaceError elif error.is_internal_error(): text = "Something went on Dropbox’s end. Please try again later." err_cls = DropboxServerError elif error.is_to(): to_error = error.get_to() text, err_cls = _get_write_error_msg(to_error) elif error.is_too_many_files(): text = ( "There are more than 10,000 files and folders in one " "request. Please try to move fewer items at once." ) err_cls = SyncError elif isinstance(error, (files.CreateFolderError, files.CreateFolderEntryError)): title = "Could not create folder" if error.is_path(): write_error = error.get_path() text, err_cls = _get_write_error_msg(write_error) elif isinstance(error, files.DeleteError): title = "Could not delete item" if error.is_path_lookup(): lookup_error = error.get_path_lookup() text, err_cls = _get_lookup_error_msg(lookup_error) elif error.is_path_write(): write_error = error.get_path_write() text, err_cls = _get_write_error_msg(write_error) elif error.is_too_many_files(): text = ( "There are more than 10,000 files and folders in one " "request. Please try to delete fewer items at once." ) err_cls = SyncError elif error.is_too_many_write_operations(): text = ( "There are too many write operations happening in your " "Dropbox. Please try again later." ) err_cls = SyncError elif isinstance(error, files.UploadError): title = "Could not upload file" if error.is_path(): write_error = error.get_path().reason # Returns UploadWriteFailed. text, err_cls = _get_write_error_msg(write_error) elif error.is_properties_error(): # Occurs only for programming error in maestral. text = "Invalid property group provided." elif isinstance(error, files.UploadSessionStartError): title = "Could not upload file" if error.is_concurrent_session_close_not_allowed(): # Occurs only for programming error in maestral. text = "Can not start a closed concurrent upload session." elif error.is_concurrent_session_data_not_allowed(): # Occurs only for programming error in maestral. text = ( "Uploading data not allowed when starting concurrent upload " "session." ) elif isinstance(error, files.UploadSessionFinishError): title = "Could not upload file" if error.is_lookup_failed(): session_lookup_error = error.get_lookup_failed() text, err_cls = _get_session_lookup_error_msg(session_lookup_error) elif error.is_path(): write_error = error.get_path() text, err_cls = _get_write_error_msg(write_error) elif error.is_properties_error(): # Occurs only for programming error in maestral. text = "Invalid property group provided." elif error.is_too_many_write_operations(): text = ( "There are too many write operations happening in your " "Dropbox. Please retry again later." ) err_cls = SyncError elif isinstance(error, files.UploadSessionLookupError): title = "Could not upload file" text, err_cls = _get_session_lookup_error_msg(error) elif isinstance(error, files.DownloadError): title = "Could not download file" if error.is_path(): lookup_error = error.get_path() text, err_cls = _get_lookup_error_msg(lookup_error) elif error.is_unsupported_file(): text = "This file type cannot be downloaded but must be exported." err_cls = UnsupportedFileError elif isinstance(error, files.ListFolderError): title = "Could not list folder contents" if error.is_path(): lookup_error = error.get_path() text, err_cls = _get_lookup_error_msg(lookup_error) elif isinstance(error, files.ListFolderContinueError): title = "Could not list folder contents" if error.is_path(): lookup_error = error.get_path() text, err_cls = _get_lookup_error_msg(lookup_error) elif error.is_reset(): text = ( "Dropbox has reset its sync state. Please rebuild " "Maestral's index to re-sync your Dropbox." ) err_cls = CursorResetError elif isinstance(error, files.ListFolderLongpollError): title = "Could not get Dropbox changes" if error.is_reset(): text = ( "Dropbox has reset its sync state. Please rebuild " "Maestral's index to re-sync your Dropbox." ) err_cls = CursorResetError elif isinstance(error, async_.PollError): title = "Could not get status of batch job" if error.is_internal_error(): text = ( "Something went wrong with the job on Dropbox’s end. Please " "verify on the Dropbox website if the job succeeded and try " "again if it failed." ) err_cls = DropboxServerError else: # Other tags include invalid_async_job_id. # Neither should occur in our SDK usage. pass elif isinstance(error, files.ListRevisionsError): title = "Could not list file revisions" if error.is_path(): lookup_error = error.get_path() text, err_cls = _get_lookup_error_msg(lookup_error) elif isinstance(error, files.RestoreError): title = "Could not restore file" if error.is_invalid_revision(): text = "Invalid revision." err_cls = NotFoundError elif error.is_path_lookup(): lookup_error = error.get_path_lookup() text, err_cls = _get_lookup_error_msg(lookup_error) elif error.is_path_write(): write_error = error.get_path_write() text, err_cls = _get_write_error_msg(write_error) elif error.is_in_progress(): title = "Restore in progress" text = "Please check again later if the restore completed" err_cls = SyncError elif isinstance(error, files.GetMetadataError): title = "Could not get metadata" if error.is_path(): lookup_error = error.get_path() text, err_cls = _get_lookup_error_msg(lookup_error) elif isinstance(error, users.GetAccountError): title = "Could not get account info" if error.is_no_account(): text = ( "An account with the given Dropbox ID does not " "exist or has been deleted" ) err_cls = InvalidDbidError elif isinstance(error, sharing.CreateSharedLinkWithSettingsError): title = "Could not create shared link" if error.is_access_denied(): text = "You do not have access to create shared links for this path." err_cls = InsufficientPermissionsError elif error.is_email_not_verified(): text = "Please verify you email address before creating shared links" err_cls = SharedLinkError elif error.is_path(): lookup_error = error.get_path() text, err_cls = _get_lookup_error_msg(lookup_error) elif error.is_settings_error(): settings_error = error.get_settings_error() err_cls = SharedLinkError if settings_error.is_invalid_settings(): text = "Please check if the settings are valid." elif settings_error.is_not_authorized(): text = "Basic accounts do not support passwords or expiry dates." elif error.is_shared_link_already_exists(): text = "The shared link already exists." err_cls = SharedLinkError elif isinstance(error, sharing.RevokeSharedLinkError): title = "Could not revoke shared link" if error.is_shared_link_malformed(): text = "The shared link is malformed." err_cls = SharedLinkError elif error.is_shared_link_not_found(): text = "The given link does not exist." err_cls = NotFoundError elif error.is_shared_link_access_denied(): text = "You do not have access to revoke the shared link." err_cls = InsufficientPermissionsError elif error.is_unsupported_link_type(): text = "The link type is not supported." err_cls = SharedLinkError elif isinstance(error, sharing.ListSharedLinksError): title = "Could not list shared links" if error.is_path(): lookup_error = error.get_path() text, err_cls = _get_lookup_error_msg(lookup_error) elif error.is_reset(): text = "Please try again later." err_cls = SharedLinkError elif isinstance(error, common.PathRootError): if error.is_no_permission(): text = "You don't have permission to access this namespace." err_cls = InsufficientPermissionsError elif error.is_invalid_root(): text = "Invalid root namespace." err_cls = SyncError elif isinstance(error, sharing.ShareFolderErrorBase): title = "Could not share folder" if error.is_email_unverified(): text = ( "You need to verify your email address before creating shared " "folders." ) err_cls = SyncError elif error.is_bad_path(): path_error = error.get_bad_path() text, err_cls = _get_bad_path_error_msg(path_error) elif error.is_team_policy_disallows_member_policy(): text = "Team policy does not allow sharing with the specified members." err_cls = InsufficientPermissionsError elif error.is_disallowed_shared_link_policy(): text = "Team policy does not allow creating the specified shared link." err_cls = InsufficientPermissionsError elif ( isinstance(error, sharing.ShareFolderError) and error.is_no_permission() ): text = "You don't have permissions to share this folder." err_cls = InsufficientPermissionsError # ---- Authentication errors ------------------------------------------------------- elif isinstance(exc, exceptions.AuthError): error = exc.error if isinstance(error, auth.AuthError): if error.is_expired_access_token(): err_cls = TokenExpiredError title = "Authentication error" text = ( "Maestral's access to your Dropbox has expired. Please relink " "to continue syncing." ) elif error.is_invalid_access_token(): err_cls = TokenRevokedError title = "Authentication error" text = ( "Maestral's access to your Dropbox has been revoked. Please " "relink to continue syncing." ) elif error.is_user_suspended(): err_cls = DropboxAuthError title = "Authentication error" text = "Your user account has been suspended." elif error.is_missing_scope(): scope_error = error.get_missing_scope() required_scope = scope_error.required_scope err_cls = InsufficientPermissionsError title = "Insufficient permissions" text = f"Performing this action requires the {required_scope} scope." else: # Other tags are invalid_select_admin, invalid_select_user, # route_access_denied. Neither should occur in our SDK # usage. pass else: err_cls = DropboxAuthError title = "Authentication error" text = "Please check if you can log in on the Dropbox website." # ---- Namespace Errors ------------------------------------------------------------ elif isinstance(exc, exceptions.PathRootError): error = exc.error err_cls = PathRootError title = "Invalid root namespace" if isinstance(error, common.PathRootError): if error.is_no_permission(): text = "You don't have permission to access this namespace." elif error.is_invalid_root(): text = "The given namespace does not exist." elif error.is_other(): text = "An unexpected error occurred with the given namespace." # ---- Bad input errors ------------------------------------------------------------ # Should only occur due to user input from console scripts. elif isinstance(exc, (exceptions.BadInputError, ValidationError)): err_cls = BadInputError title = "Bad input to API call" text = exc.message # ---- Internal Dropbox error ------------------------------------------------------ elif isinstance(exc, exceptions.InternalServerError): err_cls = DropboxServerError title = "Dropbox server error" text = ( "Something went wrong on Dropbox’s end. Please check on " "if their services are up and running and try again later." ) # ---- Errors which are passed through by the SDK ---------------------------------- elif isinstance(exc, exceptions.HttpError): text = exc.body maestral_exc = err_cls(title, text, dbx_path=dbx_path, local_path=local_path) maestral_exc.__cause__ = exc return maestral_exc
def _get_write_error_msg(write_error: files.WriteError) -> Tuple[str, Type[SyncError]]: text = "" err_cls = SyncError if write_error.is_conflict(): conflict = write_error.get_conflict() if conflict.is_file(): text = ( "Could not write to the target path because another file " "was in the way." ) err_cls = FileConflictError elif conflict.is_folder(): text = ( "Could not write to the target path because another folder " "was in the way." ) err_cls = FolderConflictError elif conflict.is_file_ancestor(): text = ( "Could not create parent folders because another file " "was in the way." ) err_cls = FileConflictError else: text = ( "Could not write to the target path because another file or " "folder was in the way." ) err_cls = ConflictError elif write_error.is_disallowed_name(): text = "Dropbox will not save the file or folder because of its name." err_cls = PathError elif write_error.is_insufficient_space(): text = "You do not have enough space on Dropbox to move or copy the files." err_cls = InsufficientSpaceError elif write_error.is_malformed_path(): text = ( "The destination path is invalid. Paths may not end with a slash or " "whitespace." ) err_cls = PathError elif write_error.is_no_write_permission(): text = "You do not have permissions to write to the target location." err_cls = InsufficientPermissionsError elif write_error.is_team_folder(): text = "You cannot move or delete team folders through Maestral." elif write_error.is_too_many_write_operations(): text = ( "There are too many write operations in your Dropbox. Please " "try again later." ) elif write_error.is_operation_suppressed(): text = "This file operation is not allowed at this path." return text, err_cls def _get_lookup_error_msg( lookup_error: files.LookupError, ) -> Tuple[str, Type[SyncError]]: text = "" err_cls = SyncError if lookup_error.is_malformed_path(): text = "The path is invalid. Paths may not end with a slash or whitespace." err_cls = PathError elif lookup_error.is_not_file(): text = "The given path refers to a folder." err_cls = IsAFolderError elif lookup_error.is_not_folder(): text = "The given path refers to a file." err_cls = NotAFolderError elif lookup_error.is_not_found(): text = "There is nothing at the given path." err_cls = NotFoundError elif lookup_error.is_restricted_content(): text = ( "The file cannot be transferred because the content is restricted. For " "example, sometimes there are legal restrictions due to copyright " "claims." ) err_cls = RestrictedContentError elif lookup_error.is_unsupported_content_type(): text = "This file type is currently not supported for syncing." err_cls = UnsupportedFileError elif lookup_error.is_locked(): text = "The given path is locked." err_cls = InsufficientPermissionsError return text, err_cls def _get_session_lookup_error_msg( session_lookup_error: files.UploadSessionLookupError, ) -> Tuple[str, Type[SyncError]]: text = "" err_cls = SyncError if session_lookup_error.is_closed(): # Occurs when trying to append data to a closed session. # This is caused by internal Maestral errors. pass elif session_lookup_error.is_incorrect_offset(): text = "A network error occurred during the upload session." elif session_lookup_error.is_not_closed(): # Occurs when trying to finish an open session. # This is caused by internal Maestral errors. pass elif session_lookup_error.is_not_found(): text = ( "The upload session ID was not found or has expired. " "Upload sessions are valid for 48 hours." ) elif session_lookup_error.is_too_large(): text = "You can only upload files up to 350 GB." err_cls = FileSizeError return text, err_cls def _get_bad_path_error_msg( path_error: sharing.SharePathError, ) -> Tuple[str, Type[SyncError]]: text = "" err_cls = SyncError if path_error.is_is_file(): text = "A file is at the specified path." err_cls = FileConflictError elif path_error.is_inside_shared_folder(): text = "Cannot share a folder inside a shared folder." elif path_error.is_contains_shared_folder(): text = "Cannot share a folder that contains a shared folder." elif path_error.is_contains_app_folder(): text = "Cannot share a folder that contains an app folder." elif path_error.is_contains_team_folder(): text = "Cannot share a folder that contains a team folder." elif path_error.is_is_app_folder(): text = "Cannot share app folders." elif path_error.is_inside_app_folder(): text = "Cannot share a folder inside an app folder." elif path_error.is_is_public_folder(): text = "A public folder can't be shared this way. Use a public link instead." elif path_error.is_inside_public_folder(): text = ( "A folder inside a public folder can't be shared this way. Use a public " "link instead." ) elif path_error.is_already_shared(): err_cls = FolderConflictError text = "The folder is already shared." elif path_error.is_invalid_path(): text = "The path is not valid." elif path_error.is_is_osx_package(): text = "Cannot share macOS packages." elif path_error.is_inside_osx_package(): text = "Cannot share folders inside macOS packages." elif path_error.is_is_vault(): text = "Cannot share the Vault folder." elif path_error.is_is_family(): text = "Cannot share the Family folder." return text, err_cls