Source code for maestral.main

"""This module defines the main API which is exposed to the CLI or GUI."""

from __future__ import annotations

import asyncio
import difflib
import gc
import logging
import mimetypes

# system imports
import os
import os.path as osp
import random
import shutil
import sqlite3
import tempfile
import time
from asyncio import AbstractEventLoop, Future
from datetime import datetime, timezone
from typing import Any, Collection, Iterator, Sequence

# external imports
import requests
from packaging.version import Version
from watchdog.events import DirDeletedEvent, FileDeletedEvent

try:
    from systemd import journal
except ImportError:
    journal = None

# local imports
from . import __version__
from .client import DropboxClient
from .config import MaestralConfig, MaestralState, validate_config_name
from .constants import (
    CONNECTING,
    DEFAULT_CONFIG_NAME,
    GITHUB_RELEASES_API,
    IDLE,
    IS_LINUX,
    IS_MACOS,
    PAUSED,
    FileStatus,
)
from .core import (
    FileMetadata,
    FullAccount,
    LinkAccessLevel,
    LinkAudience,
    Metadata,
    PersonalSpaceUsage,
    SharedLinkMetadata,
    UpdateCheckResult,
)
from .database.core import Database
from .errorhandling import CONNECTION_ERRORS, convert_api_errors
from .exceptions import (
    BusyError,
    KeyringAccessError,
    MaestralApiError,
    NoDropboxDirError,
    NotFoundError,
    NotLinkedError,
    UnsupportedFileTypeForDiff,
    UpdateCheckError,
)
from .keyring import CredentialStorage
from .logging import (
    LOG_FMT_SHORT,
    AwaitableHandler,
    CachedHandler,
    scoped_logger,
    setup_logging,
)
from .manager import SyncManager
from .models import SyncErrorEntry, SyncEvent, SyncStatus
from .notify import MaestralDesktopNotifier
from .sync import SyncDirection, SyncEngine, pf_repr
from .utils import get_newer_version
from .utils.appdirs import get_cache_path, get_data_path
from .utils.path import (
    delete,
    is_child,
    is_equal_or_child,
    isdir,
    normalize,
    to_existing_unnormalized_path,
)

__all__ = ["Maestral"]


def _sql_add_column(db: Database, table: str, column: str, affinity: str) -> None:
    try:
        db.execute(f"ALTER TABLE {table} ADD COLUMN {column} {affinity};")
    except sqlite3.OperationalError:
        # column already exists
        pass


def _sql_drop_table(db: Database, table: str) -> None:
    try:
        db.execute(f"DROP TABLE {table};")
    except sqlite3.OperationalError:
        # table does not exist
        pass


# ======================================================================================
# Main API
# ======================================================================================


[docs] class Maestral: """The public API All methods and properties return objects or raise exceptions which can safely be serialized, i.e., pure Python types. The only exception are instances of :exc:`maestral.exceptions.MaestralApiError`: they need to be registered explicitly with the serpent serializer which is used for communication to frontends. Sync errors and fatal errors which occur in the sync threads can be read with the properties :attr:`sync_errors` and :attr:`fatal_errors`, respectively. :Example: First create an instance with a new config_name. In this example, we choose "private" to sync a private Dropbox account. Then link the created config to an existing Dropbox account and set up the local Dropbox folder. If successful, invoke :meth:`start_sync` to start syncing. >>> from maestral.main import Maestral >>> m = Maestral(config_name='private') >>> url = m.get_auth_url() # get token from Dropbox website >>> print(f'Please go to {url} to retrieve a Dropbox authorization token.') >>> token = input('Enter auth token: ') >>> res = m.link(token) >>> if res == 0: ... m.create_dropbox_directory('~/Dropbox (Private)') ... m.start_sync() :param config_name: Name of maestral configuration to run. Must not contain any whitespace. If the given config file does exist, it will be created. :param log_to_stderr: If ``True``, Maestral will print log messages to stderr. When started as a systemd services, this can result in duplicate log messages in the systemd journal. Defaults to ``False``. :param event_loop: Event loop to use for any features that require an asyncio event loop. If not given, those features will be disabled. This currently only affects desktop notifications. :param shutdown_future: Feature to set a result when shutdown is complete. Used to inform the caller if an API client calls :method:`shutdown_daemon`. The event loop associated with the Future must be the same as ``event_loop``. """ _external_log_handlers: Sequence[logging.Handler] _log_handler_status_longpoll: AwaitableHandler _log_handler_info_cache: CachedHandler _log_handler_error_cache: CachedHandler def __init__( self, config_name: str = DEFAULT_CONFIG_NAME, log_to_stderr: bool = False, event_loop: AbstractEventLoop | None = None, shutdown_future: Future[bool] | None = None, ) -> None: # Check system compatibility. self._check_system_compatibility() self._loop = event_loop self._config_name = validate_config_name(config_name) self._conf = MaestralConfig(self.config_name) self._state = MaestralState(self.config_name) self._logger = scoped_logger(__name__, self.config_name)
[docs] self.cred_storage = CredentialStorage(self.config_name)
# Set up logging. self._log_to_stderr = log_to_stderr self._root_logger = scoped_logger("maestral", self.config_name) self._root_logger.setLevel(min(self.log_level, logging.INFO)) self._root_logger.handlers.clear() self._setup_logging_external() self._setup_logging_internal() # Run update scripts after init of loggers and config / state. self._check_and_run_post_update_scripts() # Set up desktop notifier using event loop. self._dn: MaestralDesktopNotifier | None = None if self._loop: self._dn = MaestralDesktopNotifier(self._config_name, self._loop) # Set up sync infrastructure.
[docs] self.client = DropboxClient( self.config_name, self.cred_storage, bandwidth_limit_up=self.bandwidth_limit_up, bandwidth_limit_down=self.bandwidth_limit_down, )
[docs] self.sync = SyncEngine(self.client, self._dn)
[docs] self.manager = SyncManager(self.sync, self._dn)
# Create a future which will return once `shutdown_daemon` is called. # This can be used by an event loop to wait until maestral has been stopped. if shutdown_future and not shutdown_future.get_loop() is self._loop: raise RuntimeError("'shutdown_future' must use the passed event loop.")
[docs] self.shutdown_future = shutdown_future
@staticmethod def _check_system_compatibility() -> None: if not (IS_MACOS or IS_LINUX): raise RuntimeError("Only macOS and Linux are supported") def _setup_logging_external(self) -> None: """ Sets up logging to external channels: * Log files. * The systemd journal, if started by systemd. * The systemd notify status, if started by systemd. * Stderr, if requested. """ self._external_log_handlers = setup_logging( self.config_name, stderr=self._log_to_stderr ) def _setup_logging_internal(self) -> None: """Sets up logging to internal info and error caches.""" # Log to cached handlers for status and error APIs. self._log_handler_info_cache = CachedHandler(maxlen=1) self._log_handler_info_cache.setFormatter(LOG_FMT_SHORT) self._log_handler_info_cache.setLevel(logging.INFO) self._root_logger.addHandler(self._log_handler_info_cache) self._log_handler_error_cache = CachedHandler() self._log_handler_error_cache.setFormatter(LOG_FMT_SHORT) self._log_handler_error_cache.setLevel(logging.ERROR) self._root_logger.addHandler(self._log_handler_error_cache) self._log_handler_status_longpoll = AwaitableHandler(max_unblock_per_second=1) self._log_handler_status_longpoll.setFormatter(LOG_FMT_SHORT) self._log_handler_status_longpoll.setLevel(logging.INFO) self._root_logger.addHandler(self._log_handler_status_longpoll) @property
[docs] def version(self) -> str: """Returns the current Maestral version.""" return __version__
[docs] def get_auth_url(self) -> str: """ Returns a URL to authorize access to a Dropbox account. To link a Dropbox account, retrieve an authorization code from the URL and link Maestral by calling :meth:`link` with the provided code. :returns: URL to retrieve an authorization code. """ return self.client.get_auth_url()
# ==== Methods to access config and saved state ==================================== @property
[docs] def config_name(self) -> str: """The selected configuration.""" return self._config_name
[docs] def set_conf(self, section: str, name: str, value: Any) -> None: """ Sets a configuration option. :param section: Name of section in config file. :param name: Name of config option. :param value: Config value. May be any type accepted by :obj:`ast.literal_eval`. """ self._conf.set(section, name, value)
[docs] def get_conf(self, section: str, name: str) -> Any: """ Gets a configuration option. :param section: Name of section in config file. :param name: Name of config option. :returns: Config value. May be any type accepted by :obj:`ast.literal_eval`. """ return self._conf.get(section, name)
[docs] def set_state(self, section: str, name: str, value: Any) -> None: """ Sets a state value. :param section: Name of section in state file. :param name: Name of state variable. :param value: State value. May be any type accepted by :obj:`ast.literal_eval`. """ self._state.set(section, name, value)
[docs] def get_state(self, section: str, name: str) -> Any: """ Gets a state value. :param section: Name of section in state file. :param name: Name of state variable. :returns: State value. May be any type accepted by :obj:`ast.literal_eval`. """ return self._state.get(section, name)
# ==== Getters / setters for config with side effects ============================== @property
[docs] def dropbox_path(self) -> str: """ Returns the path to the local Dropbox folder (read only). This will be an empty string if not Dropbox folder has been set up yet. Use :meth:`create_dropbox_directory` or :meth:`move_dropbox_directory` to set or change the Dropbox directory location instead. :raises NotLinkedError: if no Dropbox account is linked. """ if self.pending_link: return "" else: return self.sync.dropbox_path
@property
[docs] def excluded_items(self) -> set[str]: """ The list of files and folders excluded by selective sync. Any changes to this list will be applied immediately if we have already performed the initial sync. Paths which have been added to the list will be deleted from the local drive and paths which have been removed will be downloaded. If the initial was not yet performed, changes will only be saved for the initial sync. Use :meth:`exclude_item` and :meth:`include_item` to add or remove individual items from selective sync. """ if self.pending_link: return set() else: return self.sync.excluded_items
@excluded_items.setter def excluded_items(self, items: Collection[str]) -> None: """Setter: excluded_items""" excluded_items = self.sync.clean_excluded_items_list(items) old_excluded_items = self.excluded_items added_excluded_items = excluded_items - old_excluded_items added_included_items = old_excluded_items - excluded_items has_changes = len(added_excluded_items) > 0 or len(added_included_items) > 0 if has_changes: if self.sync.sync_lock.acquire(blocking=False): try: self.sync.excluded_items = excluded_items if self.pending_first_download: return # Apply changes. for path in added_excluded_items: self._logger.info("Excluded %s", path) self._remove_after_excluded(path) for path in added_included_items: if not self.sync.is_excluded_by_user(path): self._logger.info("Included %s", path) self.manager.download_queue.put(path) self._logger.info(IDLE) finally: self.sync.sync_lock.release() else: raise BusyError( "Cannot set excluded items", "Please try again when idle." ) @property
[docs] def log_level(self) -> int: """Log level for log files, stderr and the systemd journal.""" return self._conf.get("app", "log_level")
@log_level.setter def log_level(self, level: int) -> None: """Setter: log_level.""" self._root_logger.setLevel(min(level, logging.INFO)) for handler in self._external_log_handlers: handler.setLevel(level) self._conf.set("app", "log_level", level) @property
[docs] def notification_snooze(self) -> float: """Snooze time for desktop notifications in minutes. Defaults to 0 if notifications are not snoozed.""" if not self._dn: raise RuntimeError("Desktop notifications require an event loop") return self._dn.snoozed
@notification_snooze.setter def notification_snooze(self, minutes: float) -> None: """Setter: notification_snooze.""" if not self._dn: raise RuntimeError("Desktop notifications require an event loop") self._dn.snoozed = minutes @property
[docs] def notification_level(self) -> int: """Level for desktop notifications. See :mod:`notify` for level definitions.""" if not self._dn: raise RuntimeError("Desktop notifications require an event loop") return self._dn.notify_level
@notification_level.setter def notification_level(self, level: int) -> None: """Setter: notification_level.""" if not self._dn: raise RuntimeError("Desktop notifications require an event loop") self._dn.notify_level = level @property
[docs] def bandwidth_limit_down(self) -> float: """Maximum download bandwidth to use in bytes per second.""" return self._conf.get("app", "bandwidth_limit_down")
@bandwidth_limit_down.setter def bandwidth_limit_down(self, value: float) -> None: """Setter: bandwidth_limit_down.""" self.client.bandwidth_limit_down = value self._conf.set("app", "bandwidth_limit_down", value) @property
[docs] def bandwidth_limit_up(self) -> float: """Maximum download bandwidth to use in bytes per second.""" return self._conf.get("app", "bandwidth_limit_up")
@bandwidth_limit_up.setter def bandwidth_limit_up(self, value: float) -> None: """Setter: bandwidth_limit_up.""" self.client.bandwidth_limit_up = value self._conf.set("app", "bandwidth_limit_up", value) # ==== State information ==========================================================
[docs] def status_change_longpoll(self, timeout: float | None = 60) -> bool: """ Blocks until there is a change in status or until a timeout occurs. This method can be used by frontends to wait for status changes without constant polling. Status changes are for example transitions from syncing to idle or vice-versa, new errors, or connection status changes. Will unblock at most once per second. :param timeout: Maximum time to block before returning, even if there is no status change. :returns: Whether there was a status change within the timeout. .. versionadded:: 1.3.0 """ return self._log_handler_status_longpoll.wait_for_emit(timeout)
@property @property
[docs] def pending_dropbox_folder(self) -> bool: """Whether a local Dropbox directory has been configured (read only). This will not check if the configured directory actually exists, starting the sync may still raise a :exc:`maestral.exceptions.NoDropboxDirError`.""" return not self.sync.dropbox_path
@property
[docs] def pending_first_download(self) -> bool: """Whether the initial download has already started (read only).""" return self.sync.local_cursor == 0 or self.sync.remote_cursor == ""
@property
[docs] def paused(self) -> bool: """Whether syncing is paused by the user (read only). Use :meth:`start_sync` and :meth:`stop_sync` to start and stop syncing, respectively.""" return not self.manager.autostart.is_set() and not self.sync.busy()
@property
[docs] def running(self) -> bool: """Whether sync threads are running (read only). This is similar to :attr:`paused` but also returns False if syncing is paused because we cannot connect to Dropbox servers..""" return self.manager.running.is_set() or self.sync.busy()
@property
[docs] def connected(self) -> bool: """Whether Dropbox servers can be reached (read only).""" if self.pending_link: return False else: return self.manager.connected
@property
[docs] def status(self) -> str: """The last status message (read only). This can be displayed as information to the user but should not be relied on otherwise.""" if self.paused: return PAUSED elif not self.connected: return CONNECTING else: return self._log_handler_info_cache.get_last_message()
@property
[docs] def sync_errors(self) -> list[SyncErrorEntry]: """ A list of current sync errors as dicts (read only). This list is populated by the sync threads. The following keys will always be present but may contain empty values: "type", "inherits", "title", "traceback", "title", "message", "local_path", "dbx_path". :raises NotLinkedError: if no Dropbox account is linked. """ return self.sync.sync_errors
@property
[docs] def fatal_errors(self) -> list[MaestralApiError]: """ Returns a list of fatal errors as dicts (read only). This does not include lost internet connections or file sync errors which only emit warnings and are tracked and cleared separately. Errors listed here must be acted upon for Maestral to continue syncing. The following keys will always be present but may contain empty values: "type", "inherits", "title", "traceback", "title", and "message".s This list is populated from all log messages with level ERROR or higher that have ``exc_info`` attached. """ errors: list[MaestralApiError] = [] for r in self._log_handler_error_cache.cached_records: if r.exc_info: err = r.exc_info[1] if isinstance(err, MaestralApiError): errors.append(err) return errors
[docs] def clear_fatal_errors(self) -> None: """ Manually clears all fatal errors. This should be used after they have been resolved by the user through the GUI or CLI. """ self._log_handler_error_cache.clear()
@property
[docs] def account_profile_pic_path(self) -> str: """ The path of the current account's profile picture (read only). There may not be an actual file at that path if the user did not set a profile picture or the picture has not yet been downloaded. """ return get_cache_path("maestral", f"{self._config_name}_profile_pic.jpeg")
[docs] def get_file_status(self, local_path: str) -> str: """ Returns the sync status of a file or folder. The returned status is recursive for folders. * "uploading" if any file inside the folder is being uploaded. * "downloading" if any file inside the folder is being downloaded. * "error" if any item inside the folder failed to sync and none are currently being uploaded or downloaded. * "up to date" if all items are successfully synced. * "unwatched" if syncing is paused or for items outside the Dropbox directory. .. versionadded:: 1.4.4 Recursive behavior. Previous versions would return "up to date" for a folder, even if some contained files would be syncing. :param local_path: Path to file on the local drive. May be relative to the current working directory. :returns: String indicating the sync status. Can be 'uploading', 'downloading', 'up to date', 'error', or 'unwatched'. """ if not self.running: return FileStatus.Unwatched.value local_path = osp.realpath(local_path) try: dbx_path_cased = self.sync.to_dbx_path(local_path) except ValueError: return FileStatus.Unwatched.value # Find any sync activity for the local path. node = self.sync.activity.get_node(dbx_path_cased) if not node: # Always return synced for the root folder in the absense of sync activity. if dbx_path_cased == "/": return FileStatus.Synced.value # Check if the path is in our index. If yes, it is fully synced, otherwise # it is unwatched. if self.sync.get_index_entry_for_local_path(local_path): return FileStatus.Synced.value return FileStatus.Unwatched.value # Return effective status of item and its children. Syncing items take # precedence over Failed which take precedence over Synced. Note that Up and # Down are mutually exclusive because they are performed in alternating cycles. file_status = FileStatus.Synced for event in node.sync_events: if event.status is SyncStatus.Syncing: if event.direction is SyncDirection.Up: return FileStatus.Uploading.value elif event.direction is SyncDirection.Down: return FileStatus.Downloading.value elif event.status is SyncStatus.Failed: file_status = FileStatus.Error return file_status.value
[docs] def get_activity(self, limit: int | None = 100) -> list[SyncEvent]: """ Returns the current upload / download activity. :param limit: Maximum number of items to return. If None, all entries will be returned. :returns: A lists of all sync events currently queued for or being uploaded or downloaded with the events the furthest up in the queue coming first. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() return list(self.sync.activity.sync_events)[:limit]
[docs] def get_history( self, dbx_path: str | None = None, limit: int | None = 100 ) -> list[SyncEvent]: """ Returns the historic upload / download activity. Up to 1,000 sync events are kept in the database. Any events which occurred before the interval specified by the ``keep_history`` config value are discarded. :param dbx_path: If given, show sync history for the specified Dropbox path only. :param limit: Maximum number of items to return. If None, all entries will be returned. :returns: A lists of all sync events since ``keep_history`` sorted by time with the oldest event first. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() events = self.sync.get_history(dbx_path=dbx_path) return events[-limit:] if limit else events
[docs] def get_account_info(self) -> FullAccount: """ Returns the account information from Dropbox and returns it as a dictionary. :returns: Dropbox account information. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() return self.client.get_account_info()
[docs] def get_space_usage(self) -> PersonalSpaceUsage: """ Gets the space usage from Dropbox and returns it as a dictionary. :returns: Dropbox space usage information. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() return self.client.get_space_usage()
# ==== Control methods for front ends ==============================================
[docs] def get_profile_pic(self) -> str | None: """ Attempts to download the user's profile picture from Dropbox. The picture is saved in Maestral's cache directory for retrieval when there is no internet connection. Check :attr:`account_profile_pic_path` for cached profile pics. :returns: Path to saved profile picture or ``None`` if no profile picture was downloaded. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() account_info = self.client.get_account_info() if account_info.profile_photo_url: with convert_api_errors(): res = requests.get(account_info.profile_photo_url) with open(self.account_profile_pic_path, "wb") as f: f.write(res.content) return self.account_profile_pic_path else: self._delete_old_profile_pics() return None
[docs] def get_metadata(self, dbx_path: str) -> Metadata | None: """ Returns metadata for a file or folder on Dropbox. :param dbx_path: Path to file or folder on Dropbox. :returns: Dropbox item metadata as dict. See :class:`dropbox.files.Metadata` for keys and values. :raises NotFoundError: if there is nothing at the given path. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() return self.client.get_metadata(dbx_path)
[docs] def list_folder( self, dbx_path: str, recursive: bool = False, include_deleted: bool = False, include_mounted_folders: bool = True, include_non_downloadable_files: bool = False, ) -> list[Metadata]: """ List all items inside the folder given by ``dbx_path``. Keyword arguments are passed on the Dropbox API call :meth:`client.DropboxClient.list_folder`. :param dbx_path: Path to folder on Dropbox. :param recursive: If true, the list folder operation will be applied recursively to all subfolders and the response will contain contents of all subfolders. :param include_deleted: If true, the results will include entries for files and folders that used to exist but were deleted. :param bool include_mounted_folders: If true, the results will include entries under mounted folders which includes app folder, shared folder and team folder. :param bool include_non_downloadable_files: If true, include files that are not downloadable, i.e. Google Docs. :raises NotFoundError: if there is nothing at the given path. :raises NotAFolderError: if the given path refers to a file. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() res = self.client.list_folder( dbx_path, recursive=recursive, include_deleted=include_deleted, include_mounted_folders=include_mounted_folders, include_non_downloadable_files=include_non_downloadable_files, ) return res.entries
[docs] def list_folder_iterator( self, dbx_path: str, recursive: bool = False, include_deleted: bool = False, include_mounted_folders: bool = True, limit: int | None = None, include_non_downloadable_files: bool = False, ) -> Iterator[list[Metadata]]: """ Returns an iterator over items inside the folder given by ``dbx_path``. Keyword arguments are passed on the client call :meth:`client.DropboxClient.list_folder_iterator`. Each iteration will yield a list of approximately 500 entries, depending on the number of entries returned by an individual API call. :param dbx_path: Path to folder on Dropbox. :param recursive: If true, the list folder operation will be applied recursively to all subfolders and the response will contain contents of all subfolders. :param include_deleted: If true, the results will include entries for files and folders that used to exist but were deleted. :param bool include_mounted_folders: If true, the results will include entries under mounted folders which includes app folder, shared folder and team folder. :param Nullable[int] limit: The maximum number of results to return per request. Note: This is an approximate number and there can be slightly more entries returned in some cases. :param bool include_non_downloadable_files: If true, include files that are not downloadable, i.e. Google Docs. :returns: Iterator over list of Dropbox item metadata as dicts. See :class:`dropbox.files.Metadata` for keys and values. :raises NotFoundError: if there is nothing at the given path. :raises NotAFolderError: if the given path refers to a file. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() res_iter = self.client.list_folder_iterator( dbx_path, recursive=recursive, include_deleted=include_deleted, include_mounted_folders=include_mounted_folders, limit=limit, include_non_downloadable_files=include_non_downloadable_files, ) for res in res_iter: yield res.entries del res gc.collect()
[docs] def list_revisions(self, dbx_path: str, limit: int = 10) -> list[FileMetadata]: """ List revisions of old files at the given path ``dbx_path``. This will also return revisions if the file has already been deleted. :param dbx_path: Path to file on Dropbox. :param limit: Maximum number of revisions to list. :returns: List of Dropbox file metadata as dicts. See :class:`dropbox.files.Metadata` for keys and values. :raises NotFoundError: if there never was a file at the given path. :raises IsAFolderError: if the given path refers to a folder :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. """ self._check_linked() return self.client.list_revisions(dbx_path, limit=limit)
[docs] def get_file_diff(self, old_rev: str, new_rev: str | None = None) -> list[str]: """ Compare to revisions of a text file using Python's difflib. The versions will be downloaded to temporary files. If new_rev is None, the old revision will be compared to the corresponding local file, if any. :param old_rev: Identifier of old revision. :param new_rev: Identifier of new revision. :returns: Diff as a list of strings (lines). :raises UnsupportedFileTypeForDiff: if file type is not supported. :raises UnsupportedFileTypeForDiff: if file content could not be decoded. :raises MaestralApiError: if file could not be read for any other reason. """ def str_from_date(d: datetime) -> str: """Convert 'client_modified' metadata to string in local timezone""" tz_date = d.replace(tzinfo=timezone.utc).astimezone() return tz_date.strftime("%d %b %Y at %H:%M") def download_rev(rev: str) -> tuple[list[str], FileMetadata]: """ Download a rev to a tmp file, read it and return the content + metadata. """ with tempfile.NamedTemporaryFile(mode="w+") as f: md = self.client.download(f"rev:{rev}", f.name) # Read from the file. try: with convert_api_errors(md.path_display, f.name): content = f.readlines() except UnicodeDecodeError: raise UnsupportedFileTypeForDiff( "Failed to decode the file", "Only UTF-8 plain text files are currently supported.", ) return content, md # Get the metadata for old_rev before attempting to download. This is used # to guess the file type and fail early for unsupported files. md_old = self.client.get_metadata(f"rev:{old_rev}", include_deleted=True) if md_old is None: raise NotFoundError( f"Could not a file with revision {old_rev}", "Use 'list_revisions' to list past revisions of a file.", ) dbx_path = self.sync.correct_case(md_old.path_display) local_path = self.sync.to_local_path(md_old.path_display) # Check if a diff is possible. # If mime is None, proceed because most files without # an extension are just text files. mime, _ = mimetypes.guess_type(dbx_path) if mime is not None and not mime.startswith("text/"): raise UnsupportedFileTypeForDiff( f"Bad file type: '{mime}'", "Only files of type 'text/*' are supported." ) if new_rev: content_new, md_new = download_rev(new_rev) date_str_new = str_from_date(md_new.client_modified) else: # Use the local file if new_rev is None. new_rev = "local version" try: with convert_api_errors(dbx_path=dbx_path, local_path=local_path): mtime = time.localtime(osp.getmtime(local_path)) date_str_new = time.strftime("%d %b %Y at %H:%M", mtime) with open(local_path) as f: content_new = f.readlines() except UnicodeDecodeError: raise UnsupportedFileTypeForDiff( "Failed to decode the file", "Only UTF-8 plain text files are currently supported.", ) content_old, md_old = download_rev(old_rev) date_str_old = str_from_date(md_old.client_modified) return list( difflib.unified_diff( content_old, content_new, fromfile=f"{dbx_path} ({old_rev})", tofile=f"{dbx_path} ({new_rev})", fromfiledate=date_str_old, tofiledate=date_str_new, ) )
[docs] def restore(self, dbx_path: str, rev: str) -> FileMetadata: """ Restore an old revision of a file. :param dbx_path: The path to save the restored file. :param rev: The revision to restore. Old revisions can be listed with :meth:`list_revisions`. :returns: Metadata of the returned file. See :class:`dropbox.files.FileMetadata` for keys and values. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. """ self._check_linked() self._logger.info(f"Restoring '{dbx_path} to {rev}'") res = self.client.restore(dbx_path, rev) self._logger.info(f"Restored '{dbx_path} to {rev}'") self._logger.info(IDLE) return res
def _delete_old_profile_pics(self) -> None: for file in os.listdir(get_cache_path("maestral")): if file.startswith(f"{self._config_name}_profile_pic"): try: os.unlink(osp.join(get_cache_path("maestral"), file)) except OSError: pass
[docs] def rebuild_index(self) -> None: """ Rebuilds the rev file by comparing remote with local files and updating rev numbers from the Dropbox server. Files are compared by their content hashes and conflicting copies are created if the contents differ. File changes during the rebuild process will be queued and uploaded once rebuilding has completed. Rebuilding will be performed asynchronously and errors can be accessed through :attr:`sync_errors` or :attr:`maestral_errors`. :raises NotLinkedError: if no Dropbox account is linked. :raises NoDropboxDirError: if local Dropbox folder is not set up. """ self._check_linked() self._check_dropbox_dir() self.manager.rebuild_index()
[docs] def start_sync(self) -> None: """ Creates syncing threads and starts syncing. :raises NotLinkedError: if no Dropbox account is linked. :raises NoDropboxDirError: if local Dropbox folder is not set up. """ self._check_linked() self._check_dropbox_dir() self.manager.start()
[docs] def stop_sync(self) -> None: """ Stops all syncing threads if running. Call :meth:`start_sync` to restart syncing. """ self.manager.stop()
[docs] def reset_sync_state(self) -> None: """ Resets the sync index and state. Only call this to clean up leftover state information if a Dropbox was improperly unlinked (e.g., auth token has been manually deleted). Otherwise, leave state management to Maestral. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() self.manager.reset_sync_state()
[docs] def exclude_items(self, *dbx_paths: str) -> None: """ Excludes files or folders from sync and deletes it locally. It is safe to call this method with items which have already been excluded. :param dbx_paths: Dropbox paths of items to exclude. :raises NotFoundError: if there is nothing at the given path. :raises ConnectionError: if the connection to Dropbox fails. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. :raises NotLinkedError: if no Dropbox account is linked. :raises NoDropboxDirError: if local Dropbox folder is not set up. """ self._check_linked() self._check_dropbox_dir() dbx_paths_lower = {normalize(p.rstrip("/")) for p in dbx_paths} # ---- input cleanup ----------------------------------------------------------- excluded_items_old = self.sync.excluded_items excluded_items_new = self.sync.clean_excluded_items_list( excluded_items_old | dbx_paths_lower ) excluded_items_added = excluded_items_new - excluded_items_old if len(excluded_items_added) == 0: return # ---- apply changes ----------------------------------------------------------- if self.sync.sync_lock.acquire(blocking=False): try: self.sync.excluded_items = excluded_items_new for dbx_path_lower in excluded_items_added: self._remove_after_excluded(dbx_path_lower) self._logger.info("Excluded %s", dbx_path_lower) self._logger.info(IDLE) finally: self.sync.sync_lock.release() else: raise BusyError("Cannot add excluded items", "Please try again when idle.")
def _remove_after_excluded(self, dbx_path_lower: str) -> None: # Perform housekeeping. self.sync.remove_node_from_index(dbx_path_lower) self.sync.clear_sync_errors_for_path(dbx_path_lower, recursive=True) # Remove item from local drive. local_path_uncased = f"{self.dropbox_path}{dbx_path_lower}" try: local_path = to_existing_unnormalized_path(local_path_uncased) except FileNotFoundError: return event_cls = DirDeletedEvent if isdir(local_path) else FileDeletedEvent with self.manager.sync.fs_events.ignore(event_cls(local_path)): delete(local_path)
[docs] def include_items(self, *dbx_paths: str) -> None: """ Includes files or folders in sync and downloads it in the background. It is safe to call this method with items which have already been included, they will not be downloaded again. If the path lies inside an excluded folder, all its immediate parents will be included. Other children of the excluded folder will remain excluded. If any children of dbx_path were excluded, they will now be included. Any downloads will be carried out by the sync threads. Errors during the download can be accessed through :attr:`sync_errors` or :attr:`maestral_errors`. :param dbx_paths: Dropbox paths of items to include. :raises NotFoundError: if there is nothing at the given path. :raises DropboxAuthError: in case of an invalid access token. :raises DropboxServerError: for internal Dropbox errors. :raises ConnectionError: if the connection to Dropbox fails. :raises NotLinkedError: if no Dropbox account is linked. :raises NoDropboxDirError: if local Dropbox folder is not set up. """ self._check_linked() self._check_dropbox_dir() dbx_paths_lower = {normalize(p.rstrip("/")) for p in dbx_paths} # ---- input validation -------------------------------------------------------- excluded_items = self.sync.excluded_items newly_included_items = { p for p in dbx_paths_lower if self.sync.is_excluded_by_user(p) } if len(newly_included_items) == 0: return # ---- update excluded items list ---------------------------------------------- excluded_items.difference_update(newly_included_items) # Find parent folders that should also be newly included. for dbx_path_lower in newly_included_items.copy(): for folder in excluded_items.copy(): # Include all parents which are required to download dbx_path_lower. if is_child(dbx_path_lower, folder): # Remove parent folder from excluded list. excluded_items.discard(folder) # Re-add their children (except parents of dbx_path_lower). for res in self.client.list_folder_iterator(folder): for entry in res.entries: if not is_equal_or_child(dbx_path_lower, entry.path_lower): excluded_items.add(entry.path_lower) # Add the parent to the download list. newly_included_items.add(folder) # Include all children of dbx_path_lower. if is_child(folder, dbx_path_lower): excluded_items.discard(folder) # ---- download items from Dropbox --------------------------------------------- if self.sync.sync_lock.acquire(blocking=False): self._logger.debug("Excluded items old: %s", pf_repr(self.excluded_items)) self._logger.debug("Excluded items new: %s", pf_repr(excluded_items)) try: self.sync.excluded_items = excluded_items for dbx_path_lower in newly_included_items: self._logger.info("Included '%s'", dbx_path_lower) self.manager.download_queue.put(dbx_path_lower) finally: self.sync.sync_lock.release() else: raise BusyError("Cannot include item", "Please try again when idle.")
[docs] def excluded_status(self, dbx_path: str) -> str: """ Returns 'excluded', 'partially excluded' or 'included'. This function will not check if the item actually exists on Dropbox. :param dbx_path: Path to item on Dropbox. :returns: Excluded status. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() dbx_path_lower = normalize(dbx_path.rstrip("/")) if any(is_equal_or_child(dbx_path_lower, f) for f in self.sync.excluded_items): return "excluded" elif any(is_child(f, dbx_path_lower) for f in self.sync.excluded_items): return "partially excluded" else: return "included"
[docs] def move_dropbox_directory(self, new_path: str) -> None: """ Sets the local Dropbox directory. This moves all local files to the new location and resumes syncing afterwards. :param new_path: Full path to local Dropbox folder. "~" will be expanded to the user's home directory. :raises OSError: if moving the directory fails. :raises NotLinkedError: if no Dropbox account is linked. :raises NoDropboxDirError: if local Dropbox folder is not set up. """ self._check_linked() self._check_dropbox_dir() self._logger.info("Moving Dropbox folder...") old_path = self.sync.dropbox_path new_path = osp.realpath(osp.expanduser(new_path)) try: if osp.samefile(old_path, new_path): self._logger.info(f'Dropbox folder moved to "{new_path}"') return except FileNotFoundError: pass if osp.exists(new_path): raise FileExistsError(f'Path "{new_path}" already exists.') # Pause syncing. was_syncing = self.running self.stop_sync() if osp.isdir(old_path): # Will also create ancestors of new_path if required. shutil.move(old_path, new_path) else: os.makedirs(new_path) # Update config file and client. self.sync.dropbox_path = new_path self._logger.info(f'Dropbox folder moved to "{new_path}"') # Resume syncing. if was_syncing: self.start_sync()
[docs] def create_dropbox_directory(self, path: str) -> None: """ Creates a new Dropbox directory. Only call this during setup. :param path: Full path to local Dropbox folder. "~" will be expanded to the user's home directory. :raises OSError: if creation fails. :raises NotLinkedError: if no Dropbox account is linked. """ self._check_linked() # Pause syncing. resume = False if self.running: self.stop_sync() resume = True # Perform housekeeping. path = osp.realpath(osp.expanduser(path)) self.manager.reset_sync_state() # Create new folder. os.makedirs(path, exist_ok=True) # Update config file and client. self.sync.dropbox_path = path # Resume syncing. if resume: self.start_sync()
# ==== Utility methods for front ends ==============================================
[docs] def to_local_path(self, dbx_path: str) -> str: """ Converts a path relative to the Dropbox folder to a correctly cased local file system path. :param dbx_path: Path relative to Dropbox root. :returns: Corresponding path on local hard drive. :raises NotLinkedError: if no Dropbox account is linked. :raises NoDropboxDirError: if local Dropbox folder is not set up. """ self._check_linked() self._check_dropbox_dir() return self.sync.to_local_path(dbx_path)
[docs] def check_for_updates(self) -> UpdateCheckResult: """ Checks if an update is available. :returns: A dictionary with information about the latest release with the fields 'update_available' (bool), 'latest_release' (str), 'release_notes' (str) and 'error' (str or None). :raises UpdateCheckError: if checking for an update fails. """ current_version = __version__.lstrip("v") update_release_notes = "" try: resp = requests.get(GITHUB_RELEASES_API) resp.raise_for_status() data = resp.json() releases = [] release_notes = [] # Remove? The GitHub API already returns sorted entries. data.sort(key=lambda x: Version(x["tag_name"]), reverse=True) for item in data: v = item["tag_name"].lstrip("v") if not Version(v).is_prerelease: releases.append(v) release_notes.append("### {tag_name}\n\n{body}".format(**item)) new_version = get_newer_version(current_version, releases) if new_version: # closest_release == current_version if current_version appears in the # release list. Otherwise closest_release < current_version closest_release = next( v for v in releases if Version(v) <= Version(current_version) ) closest_release_idx = releases.index(closest_release) update_release_notes_list = release_notes[0:closest_release_idx] update_release_notes = "\n".join(update_release_notes_list) except CONNECTION_ERRORS: raise UpdateCheckError( "Could not check for updates", "No internet connection. Please try again later.", ) except Exception as e: raise UpdateCheckError( "Could not check for updates", f"Unable to retrieve information: {e}", ) return UpdateCheckResult( update_available=bool(new_version), latest_release=new_version or current_version, release_notes=update_release_notes, )
[docs] def shutdown_daemon(self) -> None: """ Stop syncing and notify anyone monitoring ``shutdown_future`` that we are done. """ self.stop_sync() if self.shutdown_future and self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe(self.shutdown_future.set_result, True) self._logger.info("Shutting down")
# ==== Verifiers =================================================================== def _check_linked(self) -> None: if not self.client.linked: raise NotLinkedError( "No Dropbox account linked", "Please link an account using the GUI or CLI.", ) def _check_dropbox_dir(self) -> None: if self.pending_dropbox_folder: raise NoDropboxDirError( "No local Dropbox directory", "Please set up a local Dropbox directory using the GUI or CLI.", ) # ==== Housekeeping on update ===================================================== def _check_and_run_post_update_scripts(self) -> None: """ Runs post-update scripts if necessary. """ updated_from = self._state.get("app", "updated_scripts_completed") if Version(updated_from) < Version("1.2.1"): self._update_from_pre_v1_2_1() if Version(updated_from) < Version("1.3.2"): self._update_from_pre_v1_3_2() if Version(updated_from) < Version("1.4.8"): self._update_from_pre_v1_4_8() if Version(updated_from) < Version("1.6.0.dev0"): self._update_from_pre_v1_6_0() self._state.set("app", "updated_scripts_completed", __version__) self._conf.remove_deprecated_options() self._state.remove_deprecated_options() def _update_from_pre_v1_2_1(self) -> None: raise RuntimeError("Cannot upgrade from version before v1.2.1") def _update_from_pre_v1_3_2(self) -> None: if self._conf.get("app", "keyring") == "keyring.backends.OS_X.Keyring": self._logger.info("Migrating keyring after update from pre v1.3.2") self._conf.set("app", "keyring", "keyring.backends.macOS.Keyring") def _update_from_pre_v1_4_8(self) -> None: # Migrate config and state keys to new sections. self._logger.info("Migrating config after update from pre v1.4.8") mapping = { "path": {"old": "main", "new": "sync"}, "excluded_items": {"old": "main", "new": "sync"}, "keyring": {"old": "app", "new": "auth"}, "account_id": {"old": "account", "new": "auth"}, } for key, sections in mapping.items(): if self._conf.has_option(sections["old"], key): value = self._conf.get(sections["old"], key) self._conf.set(sections["new"], key, value) self._logger.info("Migrating state after update from pre v1.4.8") mapping = { "token_access_type": {"old": "account", "new": "auth"}, } for key, sections in mapping.items(): if self._state.has_option(sections["old"], key): value = self._state.get(sections["old"], key) self._state.set(sections["new"], key, value) def _update_from_pre_v1_6_0(self) -> None: self._logger.info("Scheduling reindex after update from pre v1.6.0") db_path = get_data_path("maestral", f"{self.config_name}.db") connection = sqlite3.connect(db_path, check_same_thread=False) db = Database(connection) _sql_drop_table(db, "hash_cache") _sql_drop_table(db, "'index'") _sql_drop_table(db, "'history'") self._state.reset_to_defaults("sync") db.close() # ==== Periodic async jobs ========================================================= def __repr__(self) -> str: email = self._state.get("account", "email") account_type = self._state.get("account", "type") return ( f"<{self.__class__.__name__}(config={self._config_name!r}, " f"account=({email!r}, {account_type!r}))>" )
async def sleep_rand(target: float, jitter: float = 60) -> None: await asyncio.sleep(target + random.random() * jitter)