Source code for maestral.oauth

"""
This module is responsible for authorization and token store in the system keyring.
"""

# system imports
import logging
from threading import RLock
from typing import Optional
from datetime import datetime

# external imports
import requests
import keyring.backends
import keyring.backends.macOS
import keyring.backends.SecretService
import keyrings.alt.file
import keyring.backends.kwallet
from keyring.backend import KeyringBackend
from keyring.core import load_keyring
from keyring.errors import KeyringLocked, PasswordDeleteError, InitError
from dropbox.oauth import DropboxOAuth2FlowNoRedirect

# local imports
from .config import MaestralConfig, MaestralState
from .constants import DROPBOX_APP_KEY
from .errors import KeyringAccessError
from .utils import cli, exc_info_tuple


__all__ = ["OAuth2Session"]


supported_keyring_backends = (
    keyring.backends.macOS.Keyring,
    keyring.backends.SecretService.Keyring,
    keyring.backends.kwallet.DBusKeyring,
    keyring.backends.kwallet.DBusKeyringKWallet4,
    keyrings.alt.file.PlaintextKeyring,
)

CONNECTION_ERRORS = (
    requests.exceptions.Timeout,
    requests.exceptions.RetryError,
    requests.exceptions.ChunkedEncodingError,
    requests.exceptions.ConnectionError,
    ConnectionError,
)


[docs]class OAuth2Session: """Provides Dropbox OAuth flow and key store interface OAuth2Session provides OAuth 2 login and token store in the preferred system keyring. To authenticate with Dropbox, run :meth:`get_auth_url` first and direct the user to visit that URL and retrieve an auth token. Verify the provided auth token with :meth:`verify_auth_token` and save it in the system keyring together with the corresponding Dropbox ID by calling :meth:`save_creds`. Supported keyring backends are, in order of preference: * macOS Keychain * Any keyring implementing the SecretService Dbus specification * KWallet * Plain text storage When the auth flow is completed, a short-lived access token and a long-lived refresh token are generated. They can be accessed through the properties :attr:`access_token` and :attr:`refresh_token`. Only the long-lived refresh token will be saved in the system keychain for future sessions, the Dropbox SDK will use it to generate short-lived access tokens as needed. If the auth flow was previously completed before Dropbox migrated to short-lived tokens, the :attr:`token_access_type` will be 'legacy' and only a long-lived access token will be available. .. note:: Once the token has been stored with a keyring backend, that backend will be saved in the config file and remembered until the user unlinks the account. This module will therefore never switch keyring backends while linked. .. warning:: Unlike macOS Keychain, Gnome Keyring and KWallet do not support app-specific access to passwords. If the user unlocks those keyrings, we and any other application in the same user session get access to *all* saved passwords. :param config_name: Name of maestral config. :param app_key: Public key of the app, as registered with Dropbox. Used for the PKCE OAuth 2.0 flow. """ Success = 0 """Exit code for successful auth.""" InvalidToken = 1 """Exit code for invalid token.""" ConnectionFailed = 2 """Exit code for connection errors.""" default_token_access_type = "offline" _lock = RLock() def __init__(self, config_name: str, app_key: str = DROPBOX_APP_KEY) -> None: self._app_key = app_key self._config_name = config_name self._logger = logging.getLogger(__name__) self._conf = MaestralConfig(config_name) self._state = MaestralState(config_name) self._auth_flow = DropboxOAuth2FlowNoRedirect( self._app_key, use_pkce=True, token_access_type=self.default_token_access_type, ) self._account_id: Optional[str] = self._conf.get("auth", "account_id") or None self._token_access_type: Optional[str] = ( self._state.get("auth", "token_access_type") or None ) # defer keyring access until token requested by user self.loaded = False self._keyring: Optional[KeyringBackend] = None self._access_token: Optional[str] = None self._refresh_token: Optional[str] = None self._expires_at: Optional[datetime] = None @property def keyring(self) -> KeyringBackend: """ The keyring backend currently being used to store auth tokens. If set to None, the best viable backend will be determined automatically. """ if not self._keyring: self._keyring = self._get_keyring_backend() return self._keyring @keyring.setter def keyring(self, ring: Optional[KeyringBackend]) -> None: if not ring: self._conf.set("auth", "keyring", "automatic") else: self._conf.set( "auth", "keyring", f"{ring.__class__.__module__}.{ring.__class__.__name__}", ) self._keyring = ring def _get_keyring_backend(self) -> KeyringBackend: """ Returns the keyring backend currently used. If none is used because we are not yet linked, use the backend specified in the config file (if valid) or choose the most secure of the available and supported keyring backends. """ import keyring.backends keyring_class: str = self._conf.get("auth", "keyring").strip() if self._account_id and keyring_class != "automatic": # We are already linked and have a keyring set. Insist on using # the recorded backend. try: ring = load_keyring(keyring_class) except Exception as exc: # Bomb out with an exception. title = f"Cannot load keyring {keyring_class}" message = "Please relink Maestral to get a new access token." new_exc = KeyringAccessError(title, message).with_traceback( exc.__traceback__ ) self._logger.error(title, exc_info=exc_info_tuple(new_exc)) raise new_exc return ring else: # We are not yet linked. Try loading the preset or the preferred keyring # backend for the platform. try: ring = load_keyring(keyring_class) except Exception: # get preferred keyring backends for platform available_rings = keyring.backend.get_all_keyring() supported_rings = [ k for k in available_rings if isinstance(k, supported_keyring_backends) ] ring = max(supported_rings, key=lambda x: x.priority) self._conf.set( "auth", "keyring", f"{ring.__class__.__module__}.{ring.__class__.__name__}", ) return ring def _get_accessor(self) -> str: return f"config:{self._config_name}:{self._account_id}" def _migrate_keyring(self, account_id: str) -> None: token = self.keyring.get_password("Maestral", account_id) if token: self.keyring.set_password("Maestral", self._get_accessor(), token) self.keyring.delete_password("Maestral", account_id) @property def linked(self) -> bool: """Whether we have full auth credentials (read only).""" if self.account_id: legacy = self._token_access_type == "legacy" and self.access_token offline = self._token_access_type == "offline" and self.refresh_token if legacy or offline: return True return False @property def account_id(self) -> Optional[str]: """The account ID (read only). This call may block until the keyring is unlocked.""" return self._account_id @property def token_access_type(self) -> Optional[str]: """The type of access token (read only). If 'legacy', we have a long-lived access token. If 'offline', we have a short-lived access token with an expiry time and a long-lived refresh token to generate new access tokens. This call may block until the keyring is unlocked.""" with self._lock: if not self.loaded: self.load_token() return self._token_access_type @property def access_token(self) -> Optional[str]: """The access token (read only). This will always be set for a 'legacy' token. For an 'offline' token, this will only be set if we completed the auth flow in the current session. In case of an 'offline' token, use the refresh token to retrieve a short-lived access token through the Dropbox API instead. This call may block until the keyring is unlocked.""" with self._lock: if not self.loaded: self.load_token() return self._access_token @property def refresh_token(self) -> Optional[str]: """The refresh token (read only). This will only be set for an 'offline' token. This call may block until the keyring is unlocked.""" with self._lock: if not self.loaded: self.load_token() return self._refresh_token @property def access_token_expiration(self) -> Optional[datetime]: """The expiry time for the short-lived access token (read only). This will only be set for an 'offline' token and if we completed the flow during the current session.""" # this will only be set if we linked in the current session return self._expires_at
[docs] def load_token(self) -> None: """ Loads auth token from system keyring. This will be called automatically when accessing any of the properties :attr:`linked`, :attr:`access_token`, :attr:`refresh_token` or :attr:`token_access_type`. This call may block until the keyring is unlocked. :raises KeyringAccessError: if the system keyring is locked or otherwise cannot be accessed (for example if the app bundle signature has been invalidated). """ self._logger.debug(f"Using keyring: {self.keyring}") if not self._account_id: return try: self._migrate_keyring(self._account_id) token = self.keyring.get_password("Maestral", self._get_accessor()) access_type = self._state.get("auth", "token_access_type") if not access_type: # if no token type was saved, we linked with a version < 1.2.0 # default to legacy token access type access_type = "legacy" self._state.set("auth", "token_access_type", access_type) self.loaded = True if token: if access_type == "legacy": self._access_token = token elif access_type == "offline": self._refresh_token = token else: msg = "Invalid token access type in state file." err = RuntimeError("Invalid token access type in state file.") self._logger.error(msg, exc_info=exc_info_tuple(err)) raise err self._token_access_type = access_type except (KeyringLocked, InitError): title = "Could not load auth token" msg = f"{self.keyring.name} is locked. Please unlock the keyring and try again." new_exc = KeyringAccessError(title, msg) self._logger.error(title, exc_info=exc_info_tuple(new_exc)) raise new_exc except Exception as e: title = "Could not load auth token" new_exc = KeyringAccessError(title, e.args[0]) self._logger.error(title, exc_info=exc_info_tuple(new_exc)) raise new_exc
[docs] def get_auth_url(self) -> str: """ Retrieves an auth URL to start the OAuth2 implicit grant flow. :returns: Dropbox auth URL. """ authorize_url = self._auth_flow.start() return authorize_url
[docs] def verify_auth_token(self, code: str) -> int: """ If the user approves the app, they will be presented with a single usage "authorization code". Have the user copy/paste that authorization code into the app and then call this method to exchange it for a long-lived auth token. :param code: Ephemeral auth code. :returns: :attr:`Success`, :attr:`InvalidToken`, or :attr:`ConnectionFailed`. """ with self._lock: try: res = self._auth_flow.finish(code) self._access_token = res.access_token self._refresh_token = res.refresh_token self._expires_at = res.expires_at self._account_id = res.account_id self._token_access_type = self.default_token_access_type self.loaded = True return self.Success except requests.exceptions.HTTPError: return self.InvalidToken except CONNECTION_ERRORS: return self.ConnectionFailed
[docs] def save_creds(self) -> None: """ Saves the auth token to system keyring. Falls back to plain text storage if the user denies access to keyring. This should be called after :meth:`verify_auth_token` returned successfully. """ with self._lock: self._conf.set("auth", "account_id", self._account_id) self._state.set("auth", "token_access_type", self._token_access_type) if self._token_access_type == "offline": token = self.refresh_token else: token = self.access_token if not token: raise RuntimeError("No credentials set") try: self.keyring.set_password("Maestral", self._get_accessor(), token) if isinstance(self.keyring, keyrings.alt.file.PlaintextKeyring): cli.warn( "No supported keyring found, credentials stored in plain text" ) cli.ok("Credentials written") except Exception: # switch to plain text keyring if we cannot access preferred backend self.keyring = keyrings.alt.file.PlaintextKeyring() self.save_creds()
[docs] def delete_creds(self) -> None: """ Deletes auth token from system keyring. :raises KeyringAccessError: if the system keyring is locked or otherwise cannot be accessed (for example if the app bundle signature has been invalidated). """ with self._lock: if not self._account_id: # when keyring.delete_password is called without a username, # it may delete all passwords stored by Maestral on some backends return try: self.keyring.delete_password("Maestral", self._get_accessor()) cli.ok("Credentials removed") except (KeyringLocked, InitError): title = "Could not delete auth token" msg = f"{self.keyring.name} is locked. Please unlock the keyring and try again." exc = KeyringAccessError(title, msg) self._logger.error(title, exc_info=exc_info_tuple(exc)) raise exc except PasswordDeleteError as exc: # password does not exist in keyring self._logger.info(exc.args[0]) except Exception as e: title = "Could not delete auth token" new_exc = KeyringAccessError(title, e.args[0]) self._logger.error(title, exc_info=exc_info_tuple(new_exc)) raise new_exc self.keyring = None self._conf.set("auth", "account_id", "") self._state.set("auth", "token_access_type", "") self._account_id = None self._access_token = None self._refresh_token = None self._token_access_type = None
def __repr__(self) -> str: return ( f"<{self.__class__.__name__}(config={self._config_name!r}, " f"account_id={self._account_id})>" )