Source code for maestral.utils.path

"""
This module contains functions for common path operations.
"""

# system imports
import os
import os.path as osp
import errno
import shutil
import itertools
import unicodedata
from stat import S_ISDIR
from typing import List, Optional, Tuple, Callable, Iterator, Iterable, Union

# local imports
from .hashing import DropboxContentHasher


def _path_components(path: str) -> List[str]:
    components = path.strip(osp.sep).split(osp.sep)
    cleaned_components = [c for c in components if c]
    return cleaned_components


[docs]Path = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
# ==== path relationships ==============================================================
[docs]def is_child(path: str, parent: str) -> bool: """ Checks if ``path`` semantically is inside ``parent``. Neither path needs to refer to an actual item on the drive. This function is case-sensitive. :param path: Item path. :param parent: Parent path. :returns: Whether ``path`` semantically lies inside ``parent``. """ parent = parent.rstrip(osp.sep) + osp.sep path = path.rstrip(osp.sep) return path.startswith(parent)
[docs]def is_equal_or_child(path: str, parent: str) -> bool: """ Checks if ``path`` semantically is inside ``parent`` or equals ``parent``. Neither path needs to refer to an actual item on the drive. This function is case-sensitive. :param path: Item path. :param parent: Parent path. :returns: ``True`` if ``path`` semantically lies inside ``parent`` or ``path == parent``. """ return is_child(path, parent) or path == parent
# ==== case sensitivity and normalization ==============================================
[docs]def normalize_case(string: str) -> str: """ Converts a string to lower case. Todo: Follow Python 2.5 / Dropbox conventions. :param string: Original string. :returns: Lowercase string. """ return string.lower()
[docs]def normalize_unicode(string: str) -> str: """ Normalizes a string to replace all decomposed unicode characters with their single character equivalents. :param string: Original string. :returns: Normalized string. """ return unicodedata.normalize("NFC", string)
[docs]def normalize(string: str) -> str: """ Replicates the path normalization performed by Dropbox servers. This typically only involves converting the path to lower case, with a few (undocumented) exceptions: * Unicode normalization: decomposed characters are converted to composed characters. * Lower casing of non-ascii characters: Dropbox uses the Python 2.5 behavior for conversion to lower case. This means that some cyrillic characters are incorrectly lower-cased. For example: "Ꙋ".lower() -> "Ꙋ" instead of "ꙋ" "ΣΣΣ".lower() -> "σσσ" instead of "σσς" * Trailing spaces are stripped from folder names. We do not perform this normalization here because the Dropbox API will raise sync errors for such names anyways. Note that calling :func:`normalize` on an already normalized path will return the unmodified input. Todo: Follow Python 2.5 / Dropbox conventions instead of Python 3 conventions. :param string: Original path. :returns: Normalized path. """ return normalize_case(normalize_unicode(string))
[docs]def is_fs_case_sensitive(path: str) -> bool: """ Checks if ``path`` lies on a partition with a case-sensitive file system. :param path: Path to check. :returns: Whether ``path`` lies on a partition with a case-sensitive file system. """ if path == osp.pathsep: raise ValueError(f"Cannot check '{osp.pathsep}'") if path.islower(): check_path = path.upper() else: check_path = path.lower() if exists(path) and not exists(check_path): return True else: return not osp.samefile(path, check_path)
[docs]def equivalent_path_candidates( path: str, root: str = osp.sep, norm_func: Callable = normalize, ) -> List[str]: """ Given a "normalized" path using an injective (one-directional) normalization function, this method returns a list of matching un-normalized local paths. If no such local path exists, the normalized path itself is returned. If a local path can be followed up to a certain parent in the hierarchy, it will be taken and the remaining normalized path will be appended. :Example: Assume the normalization function is ``str.lower()``. If a root directory contains two folders "/parent/subfolder/child" and "/parent/Subfolder/child", two matches will be returned for "path = /parent/subfolder/child/file.txt". :param path: Normalized path relative to ``root``. :param root: Parent directory to search in. There are significant performance improvements if a root directory with a small tree is given. :param norm_func: Normalization function to use. Defaults to :func:`normalize`. :returns: Candidates for correctly cased local paths. """ path = path.lstrip(osp.sep) if path == "": return [root] components = _path_components(path) n_components_root = len(_path_components(root)) candidates = {-1: [root]} for root, dirs, files in os.walk(root): n_components_current_root = len(_path_components(root)) depth = n_components_current_root - n_components_root all_dirs = dirs.copy() all_files = files.copy() dirs.clear() files.clear() if depth >= len(components): # Current path is too deep to be match, skip it. continue dirname_normalized = norm_func(components[depth]) for dirname in all_dirs: if norm_func(dirname) == dirname_normalized: dirs.append(dirname) if depth + 1 == len(components): # Any matching entries must be direct children of root: check files. for filename in all_files: if norm_func(filename) == dirname_normalized: files.append(filename) new_candidates = [osp.join(root, name) for name in itertools.chain(dirs, files)] if new_candidates: candidates[depth] = candidates.get(depth, []) + new_candidates i_max = max(candidates.keys()) best_candidates = candidates[i_max] local_paths = [osp.join(path, *components[i_max + 1 :]) for path in best_candidates] return local_paths
[docs]def denormalize_path(path: str, root: str = osp.sep) -> str: """ Returns a denormalized version of the given path as far as corresponding nodes with the same normalization exist in the given root directory. If multiple matches are found, only one is returned. If ``path`` does not exist in root ``root`` or ``root`` does not exist, the return value will be ``os.path.join(root, path)``. :param path: Original path relative to ``root``. :param root: Parent directory to search in. There are significant performance improvements if a root directory with a small tree is given. :returns: Absolute and cased version of given path. """ candidates = equivalent_path_candidates(path, root) return candidates[0]
[docs]def to_existing_unnormalized_path(path: str, root: str = osp.sep) -> str: """ Returns a cased version of the given path if corresponding nodes (with arbitrary casing) exist in the given root directory. If multiple matches are found, only one is returned. :param path: Original path relative to ``root``. :param root: Parent directory to search in. There are significant performance improvements if a root directory with a small tree is given. :returns: Absolute and cased version of given path. :raises FileNotFoundError: if ``path`` does not exist in root ``root`` or ``root`` itself does not exist. """ candidates = equivalent_path_candidates(path, root) for candidate in candidates: if exists(candidate): return candidate raise FileNotFoundError(f'No matches with different casing found in "{root}"')
[docs]def normalized_path_exists(path: str, root: str = osp.sep) -> bool: """ Checks if a ``path`` exists in given ``root`` directory, similar to ``os.path.exists`` but case-insensitive. Normalisation is performed as by Dropbox servers (lower case and unicode normalisation). :param path: Path relative to ``root``. :param root: Directory where we will look for ``path``. There are significant performance improvements if a root directory with a small tree is given. :returns: Whether an arbitrarily cased version of ``path`` exists. """ candidates = equivalent_path_candidates(path, root) for c in candidates: if exists(c): return True return False
[docs]def generate_cc_name(path: str, suffix: str = "conflicting copy") -> str: """ Generates a path for a conflicting copy of ``path``. The file name is created by inserting the given ``suffix`` between the filename and the extension. For instance: "my_file.txt" -> "my_file (conflicting copy).txt" If a file with the resulting path already exists (case-insensitive!), we additionally append an integer number, for instance: "my_file.txt" -> "my_file (conflicting copy 1).txt" :param path: Original path name. :param suffix: Suffix to use. Defaults to "conflicting copy". :returns: New path. """ dirname, basename = osp.split(path) filename, ext = osp.splitext(basename) i = 0 cc_candidate = f"{filename} ({suffix}){ext}" while normalized_path_exists(cc_candidate, dirname): i += 1 cc_candidate = f"{filename} ({suffix} {i}){ext}" return osp.join(dirname, cc_candidate)
# ==== higher level file operations ====================================================
[docs]def delete(path: str, raise_error: bool = False) -> Optional[OSError]: """ Deletes a file or folder at ``path``. Symlinks will not be followed. :param path: Path of item to delete. :param raise_error: Whether to raise errors or return them. :returns: Any caught exception during the deletion. """ err = None try: shutil.rmtree(path) # Will raise OSError when it finds a symlink. except OSError: try: os.unlink(path) except OSError as e: err = e if raise_error and err: raise err else: return err
[docs]def move( src_path: str, dest_path: str, raise_error: bool = False, preserve_dest_permissions=False, ) -> Optional[OSError]: """ Moves a file or folder from ``src_path`` to ``dest_path``. If either the source or the destination path no longer exist, this function does nothing. Any other exceptions are either raised or returned if ``raise_error`` is False. :param src_path: Path of item to move. :param dest_path: Destination path. Any existing file at this path will be replaced by the move. Any existing **empty** folder will be replaced if the source is also a folder. :param raise_error: Whether to raise errors or return them. :param preserve_dest_permissions: Whether to apply the permissions of the source path to the destination path. Permissions will not be set recursively. :returns: Any caught exception during the move. """ err: Optional[OSError] = None orig_mode: Optional[int] = None if preserve_dest_permissions: # save dest permissions try: orig_mode = os.stat(dest_path, follow_symlinks=False).st_mode & 0o777 except FileNotFoundError: pass try: shutil.move(src_path, dest_path) except FileNotFoundError: # do nothing of source or dest path no longer exist pass except OSError as exc: err = exc else: if orig_mode: # reapply dest permissions try: os.chmod(dest_path, orig_mode) except OSError: pass if raise_error and err: raise err else: return err
[docs]def walk( root: Union[str, os.PathLike], listdir: Callable[ [Union[str, "os.PathLike[str]"]], Iterable[os.DirEntry] ] = os.scandir, ) -> Iterator[Tuple[str, os.stat_result]]: """ Iterates recursively over the content of a folder. :param root: Root folder to walk. :param listdir: Function to call to get the folder content. :returns: Iterator over (path, stat) results. """ for entry in listdir(root): try: path = entry.path stat = entry.stat(follow_symlinks=False) yield path, stat if S_ISDIR(stat.st_mode): yield from walk(entry.path, listdir=listdir) except OSError as exc: # Directory may have been deleted between finding it in the directory # list of its parent and trying to list its contents. If this # happens we treat it as empty. Likewise, if the directory was replaced # with a file of the same name (less likely, but possible), it will be # treated as empty. if exc.errno in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): return else: raise
# ==== miscellaneous utilities =========================================================
[docs]def content_hash( local_path: str, chunk_size: int = 65536 ) -> Tuple[Optional[str], Optional[float]]: """ Computes content hash of a local file. :param local_path: Absolute path on local drive. :param chunk_size: Size of chunks to hash in bytes. :returns: Content hash to compare with Dropbox's content hash and mtime just before the hash was computed. """ hasher = DropboxContentHasher() try: mtime = os.stat(local_path, follow_symlinks=False).st_mtime try: with open(local_path, "rb", opener=opener_no_symlink) as f: while True: chunk = f.read(chunk_size) if len(chunk) == 0: break hasher.update(chunk) except IsADirectoryError: return "folder", mtime except OSError as exc: if exc.errno == errno.ELOOP: hasher.update(b"") # use empty file for symlinks else: raise exc return str(hasher.hexdigest()), mtime except FileNotFoundError: return None, None except NotADirectoryError: # a parent directory in the path refers to a file instead of a folder return None, None finally: del hasher
[docs]def fs_max_lengths_for_path(path: str = "/") -> Tuple[int, int]: """ Return the maximum length of file names and paths allowed on a file system. :param path: Path to check. This can be specified because different paths may be residing on different file systems. If the given path does not exist, the first existing parent directory in the tree be taken. :returns: Tuple giving the maximum file name and total path lengths. """ path = osp.abspath(path) dirname = osp.dirname(path) while True: try: max_char_path = os.pathconf(dirname, "PC_PATH_MAX") max_char_name = os.pathconf(dirname, "PC_NAME_MAX") return max_char_path, max_char_name except (FileNotFoundError, NotADirectoryError): dirname = osp.dirname(dirname) except ValueError: raise RuntimeError("Cannot get file length limits.") except OSError as exc: if exc.errno == errno.EINVAL: raise RuntimeError("Cannot get file length limits.") else: dirname = "/"
# ==== symlink-proof os methods ======================================================== def _get_stats_no_symlink(path: Path) -> Optional[os.stat_result]: try: return os.stat(path, follow_symlinks=False) except (FileNotFoundError, NotADirectoryError): return None
[docs]def exists(path: Path) -> bool: """Returns whether an item exists at the path. Returns True for symlinks.""" return _get_stats_no_symlink(path) is not None
[docs]def isfile(path: Path) -> bool: """Returns whether a file exists at the path. Returns True for symlinks.""" stat = _get_stats_no_symlink(path) if stat is None: return False else: return not S_ISDIR(stat.st_mode)
[docs]def isdir(path: Path) -> bool: """Returns whether a folder exists at the path. Returns False for symlinks.""" stat = _get_stats_no_symlink(path) if stat is None: return False else: return S_ISDIR(stat.st_mode)
[docs]def getsize(path: Path) -> int: """Returns the size. Returns False for symlinks.""" stat = os.stat(path, follow_symlinks=False) return stat.st_size