Source code for simple_dvc.util_fsspec

"""
fsspec wrappers that should make working with S3 / the local file system
seemless.

TODO:
    Someone must have already implemented this somewhere. Find that to
    either use directly or as a reference.

Note:
    * While under development needs to be synced between

    ~/code/simple_dvc/simple_dvc/util_fsspec.py

    AND

    ~/code/watch/watch/utils/util_fsspec.py

    Might move to kwutil later


Look into:
    https://github.com/fsspec/universal_pathlib
    https://pypi.org/project/pathlibfs/
"""
from os.path import join
import pathlib
import ubelt as ub
import os
import fsspec

NOOP_CALLBACK = fsspec.callbacks.NoOpCallback()


[docs] class FSPath(str): """ Provide a pathlib.Path-like way of interacting with fsspec. This has a few notable differences with pathlib.Path. We inherit from :class:`str` because :class:`pathlib.Path` semantics can break protocols sections of URIs. This means we have to use :mod:`os.path` functions to implement things like :meth:`FSPath.relative_to` and :meth:`FSPath.joinpath` (which behave differently than pathlib) Note: Not all of the fsspec / pathlib operations are currently implemented, add as needed. Example: >>> cwd = FSPath.coerce('.') >>> print(cwd) >>> print(cwd.fs) """ # Final subclasses must define this as a string to be passed to # fsspec.filesystem(__protocol__) __protocol__ = NotImplemented # Doen't seem like a method a path-like should have, maybe we want a # manager path. # @classmethod # def available_protocols(cls): # return fsspec.available_protocols()
[docs] @classmethod def _new_fs(cls, **kwargs): """ Create a new filesystem instance based on __protocol__ """ return fsspec.filesystem(cls.__protocol__, **kwargs)
[docs] @classmethod def _current_fs(cls, **kwargs): """ The "default" FileSystem object. Get the most recent filesystem with this protocol, or create a new one with defaults. Returns: AbstractFileSystem """ fs_cls = fsspec.get_filesystem_class(cls.__protocol__) return fs_cls.current()
def __new__(cls, path, *, fs=None): # Note: the value of the string is set in the __new__ method because # strings are immutable. So we dont need to call super or anything. # The first argument will be the value of the string. if fs is None: # Lazy creation of a new fs fs = cls._current_fs() self = str.__new__(cls, path) self.fs = fs return self
[docs] @classmethod def coerce(cls, path): """ Determine which backend to use automatically Example: >>> path2 = FSPath.coerce('/local/path') >>> print(f'path2={path2}') >>> assert path2.is_local() >>> # xdoctest: +REQUIRES(module:s3fs) >>> path1 = FSPath.coerce('s3://demo_bucket') >>> print(f'path1={path1}') >>> assert path1.is_remote() """ if path.startswith('s3:'): self = S3Path(path) elif path.startswith('ssh:'): self = SSHPath.coerce(path) # elif path.startswith('/vsis3/'): # # convert gdal virtual filesystems to s3 paths? # self = S3Path('s3://' + path[7:]) else: self = LocalPath(path) return self
[docs] def relative_to(self, other): return self.__class__(os.path.relpath(self, other))
[docs] def is_remote(self): return isinstance(self, RemotePath)
[docs] def is_local(self): return isinstance(self, LocalPath)
[docs] def open(self, mode='rb', block_size=None, cache_options=None, compression=None): """ Example: >>> dpath = LocalPath.appdir('simple_dvc/fsspec/tests/open').ensuredir() >>> fpath = dpath / 'file.txt' >>> file = fpath.open(mode='w') >>> file.write('hello world') >>> file.close() >>> assert fpath.read_text() == fpath.open('r').read() """ return self.fs.open(self, mode=mode, block_size=block_size, cache_options=cache_options, compression=compression)
[docs] def ls(self, detail=False, **kwargs): return self.fs.ls(self, detail=detail, **kwargs)
[docs] def touch(self, truncate=False, **kwargs): self.fs.touch(self, truncate=truncate, **kwargs)
[docs] def move(self, path1, path2, recursive='auto', maxdepth=None, **kwargs): if recursive == 'auto': recursive = self.is_dir() self.fs.move(self, path2, recursive=recursive, maxdepth=maxdepth, **kwargs)
[docs] def delete(self, recursive='auto', maxdepth=True): """ Deletes this file or this directory (and all of its contents) Unlike fs.delete, this will not error if the file doesnt exist. See :func:`FSPath.rm` if you want standard error-ing behavior. """ if recursive == 'auto': recursive = self.is_dir() try: return self.fs.delete(self, recursive=recursive, maxdepth=maxdepth) except FileNotFoundError: ...
[docs] def rm(self, recursive='auto', maxdepth=True): """ Deletes this file or this directory (and all of its contents) """ if recursive == 'auto': recursive = self.is_dir() return self.fs.rm(self, recursive=recursive, maxdepth=maxdepth)
[docs] def mkdir(self, create_parents=True, **kwargs): """ Note: does nothing on some filesystems (e.g. S3) """ return self.fs.mkdir(self, create_parents=create_parents, **kwargs)
[docs] def stat(self): return self.fs.stat(self)
[docs] def is_dir(self): return self.fs.isdir(self)
[docs] def is_file(self): return self.fs.isfile(self)
[docs] def exists(self): return self.fs.exists(self)
[docs] def write_text(self, value, **kwargs): return self.fs.write_text(self, value, **kwargs)
[docs] def read_text(self, **kwargs): return self.fs.read_text(self, **kwargs)
[docs] def walk(self, include_protocol='auto', **kwargs): """ Yields: Tuple[Self, List[str], List[str]] - root, dir names, file names """ if include_protocol == 'auto': include_protocol = self.is_remote() if include_protocol: for root, dnames, fnames in self.fs.walk(self, **kwargs): root = self.__class__(self.fs.unstrip_protocol(root), fs=self.fs) yield root, dnames, fnames else: for root, dnames, fnames in self.fs.walk(self, **kwargs): root = self.__class__(root, fs=self.fs) yield root, dnames, fnames
@property def parent(self): return self.__class__(os.path.dirname(self), fs=self.fs) @property def name(self): return os.path.basename(self) @property def stem(self): return os.path.splitext(self.name)[0] @property def suffix(self): return os.path.splitext(self.name)[1] @property def suffixes(self): return self.name.split('.')[1:] @property def parts(self): return self.split(self.fs.sep)
[docs] def copy(self, dst, recursive='auto', maxdepth=None, on_error=None, callback=None, verbose=1, idempotent=True, overwrite=False, **kwargs): """ Copies this file or directory to dst Abtracts fsspec copy / put / get. If dst ends with a "/", it will be assumed to be a directory, and target files will go within. Unlike fsspec, this attempts to be idempotent. Args: dst (FSPath): location to copy to recursive (bool | str): If 'auto' (the default), attempt to determine if this is a directory or a file. Set to True if it is a directory and False otherwise. If you know what this is beforehand, you can set it explicitly to be more efficient. maxdepth (int | None): only makes sense when recursive is True callback (None | callable): for put / get cases on_error (str): either "raise", "ignore". Only applicable in the "copy" case. idempotent (bool): if False, use standard fsspec behavior, otherwise attempt to be idempotent. overwrite (bool): if True, overwrite existing data instead of erroring. Defaults to False. Note: There are different functions depending on if we are going from remote->remote (copy), local->remote (put), or remote->local (get) References: https://filesystem-spec.readthedocs.io/en/latest/copying.html """ if recursive == 'auto': # On S3, asking if something is a dir can give permission # issues, whereas asking if something is a file seems ok. # try: # recursive = self.is_dir() # except PermissionError: recursive = not self.is_file() if callback is None: callback = NOOP_CALLBACK commonkw = { 'recursive': recursive, 'maxdepth': maxdepth, **kwargs, } if overwrite: raise NotImplementedError if verbose: print(f'Copy {self} -> {dst}') # HANDLE SPECIAL CASE WHERE FSSPEC IS NOT IDEMPOTENT if idempotent: if recursive and dst.exists(): dst = dst.parent + '/' if isinstance(self, LocalPath): if not self.exists(): raise IOError(f'{self} does not exist') if isinstance(dst, RemotePath): # TODO: test if we are an empty directory and fail because # generally we cant copy an empty directory to a remote. try: if recursive: return dst.fs.put(self, dst, **commonkw, callback=callback) else: return dst.fs.put_file(self, dst, callback=callback) except FileExistsError: # TODO: overwrite raise elif isinstance(dst, LocalPath): return self.fs.copy(self, dst, **commonkw, callback=callback) else: raise TypeError(type(dst)) elif isinstance(self, RemotePath): if isinstance(dst, RemotePath): return self.fs.copy(self, dst, **commonkw, on_error=on_error) elif isinstance(dst, (LocalPath, pathlib.Path)): if recursive: return self.fs.get(self, dst, **commonkw, callback=callback) else: # Using put on an s3 bucket seems like it fails when # directories have tight permissions, but put file # seems ok. Need to ensure that we are actually just # using a file in this instance. return self.fs.get_file(self, dst, callback=callback) else: raise TypeError(type(dst)) else: raise TypeError(type(self))
[docs] def joinpath(self, *others): return self.__class__(join(self, *others), fs=self.fs)
def __truediv__(self, other): return self.__class__(join(self, other), fs=self.fs) def __add__(self, other): """ Returns a new string starting with this fspath representation. """ return self.__class__(str(self) + other, fs=self.fs) def __radd__(self, other): """ Returns a new string ending with this fspath representation. """ return self.__class__(other + str(self), fs=self.fs)
[docs] def tree(self, max_files=100, dirblocklist=None, show_nfiles='auto', return_text=False, return_tree=True, pathstyle='name', max_depth=None, with_type=False, abs_root_label=True, colors=not ub.NO_COLOR): """ Filesystem tree representation Like the unix util tree, but allow writing numbers of files per directory when given -d option Ported from xdev.misc.tree_repr TODO: instead of building the networkx structure and then waiting to display everything, build and display simultaniously. Will require using a modified version of write_network_text Args: max_files (int | None) : maximum files to print before supressing a directory pathstyle (str): can be rel, name, or abs return_tree (bool): if True return the tree return_text (bool): if True return the text maxdepth (int | None): maximum depth to descend abs_root_label (bool): if True force the root to always be absolute colors (bool): if True use rich """ import io import os import networkx as nx from cmd_queue.util.util_networkx import write_network_text from kwutil.util_pattern import MultiPattern # tree = nx.OrderedDiGraph() tree = nx.DiGraph() if dirblocklist is not None: dirblocklist = MultiPattern.coerce(dirblocklist, hint='glob') def _make_label(p, force_abs=False): if force_abs: pathrep = p elif pathstyle == 'rel': pathrep = p.relative_to(self) elif pathstyle == 'name': pathrep = p.name elif pathstyle == 'abs': pathrep = p else: KeyError(pathstyle) types = [] islink = p.is_link() isdir = p.is_dir() isfile = p.is_file() isbroken = False scolor = '' tcolor = '' L_scolor = '' L_tcolor = '' if islink: if colors: L_scolor = '[cyan]' L_tcolor = '[/cyan]' types.append('L') if not isfile and not isdir: isbroken = True if isbroken: if colors: L_scolor = '[red]' L_tcolor = '[/red]' types.append('B') if isfile: if colors: scolor = '[reset]' tcolor = '[/reset]' if os.access(p, os.X_OK): scolor = '[green]' tcolor = '[/green]' types.append('F') if isdir: if colors: scolor = f'[blue][link={p.absolute()}]' tcolor = '[/link][/blue]' types.append('D') if islink: target = os.readlink(p) pathrep = L_scolor + pathrep + L_tcolor + ' -> ' + scolor + target + tcolor else: pathrep = scolor + pathrep + tcolor if with_type: typelabel = ''.join(types) return f'({typelabel}) ' + pathrep else: return pathrep # TODO: rectify with "find" start_depth = self.count(self.fs.sep) for root, dnames, fnames in self.walk(): curr_depth = root.count(self.fs.sep) if max_depth is not None: if (curr_depth - start_depth): del dnames[:] if dirblocklist is not None: dnames[:] = [ dname for dname in dnames if not dirblocklist.match(dname)] dnames[:] = sorted(dnames) tree.add_node(root) too_many_files = max_files is not None and len(fnames) >= max_files if show_nfiles == 'auto': show_nfiles_ = too_many_files else: show_nfiles_ = show_nfiles num_files = len(fnames) if show_nfiles_: prefix = '[ {} ] '.format(num_files) else: prefix = '' force_abs = abs_root_label and curr_depth == start_depth label = '{}{}'.format(prefix, _make_label(root, force_abs)) tree.nodes[root]['label'] = label if not too_many_files: for fname in fnames: fpath = root / fname tree.add_node(fpath) tree.nodes[fpath]['label'] = _make_label(fpath) tree.add_edge(root, fpath) for dname in dnames: dpath = root / dname tree.add_node(dpath) tree.nodes[dpath]['label'] = _make_label(dpath) tree.add_edge(root, dpath) file = io.StringIO() write_network_text(tree, file) file.seek(0) text = file.read() info = {} if return_text: info['text'] = text else: if colors: from rich import print as rprint rprint(text) else: print(text) if return_tree: info['tree'] = tree return info
[docs] class LocalPath(FSPath): """ The implementation for the local filesystem Example: >>> dpath = ub.Path.appdir('simple_dvc/tests/util_fsspec/demo') >>> dpath.delete().ensuredir() >>> (dpath / 'file1.txt').write_text('data') >>> (dpath / 'dpath').ensuredir() >>> (dpath / 'dpath/file2.txt').write_text('data') >>> self = LocalPath(dpath).absolute() >>> print(f'self={self}') >>> print(self.ls()) >>> info = self.tree() >>> fsspec_dpath = (dpath / 'dpath') >>> fsspec_fpath = (dpath / 'file1.txt') >>> pathlib_dpath = ub.Path(dpath / 'pathlib_dpath') >>> pathlib_fpath = ub.Path(dpath / 'pathlib_fpath') >>> assert not pathlib_dpath.exists() >>> assert not pathlib_fpath.exists() >>> fsspec_dpath.copy(pathlib_dpath) >>> fsspec_fpath.copy(pathlib_fpath) >>> assert pathlib_dpath.exists() >>> assert pathlib_fpath.exists() """ __protocol__ = 'file'
[docs] def ensuredir(self, mode=0o0777): pathlib.Path(self).mkdir(mode=mode, parents=True, exist_ok=True) return self
[docs] def absolute(self): return self.__class__(os.path.abspath(self), fs=self.fs)
[docs] @classmethod def appdir(cls, *args, **kw): return cls(str(ub.Path.appdir(*args, **kw)))
[docs] class RemotePath(FSPath): """ Abstract implementation for all remote filesystems """
[docs] class S3Path(RemotePath): """ The specific S3 remote filesystem. Control credentials with the environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. A single S3 filesystem is used by default, but you can work with multiple of them if you pass in the fs object. E.g. fs = S3Path._new_fs(profile='iarpa') self = S3Path('s3://kitware-smart-watch-data/', fs=fs) self.ls() To work with different S3 filesystems, Requirements: s3fs>=2023.6.0 References: .. [S3FS_Docs] https://s3fs.readthedocs.io/en/latest/?badge=latest Example: >>> # xdoctest: +REQUIRES(module:s3fs) >>> fs = S3Path._new_fs() """ __protocol__ = 's3'
[docs] def _as_gdal_vsi(self): return '/vsis3/' + self[len(self.__protocol__) + 3:]
[docs] class SSHPath(RemotePath): """ Ignore: fs = SSHPath._new_fs(host='localhost') self = SSHPath('.', fs=fs) self.ls() self = SSHPath('misc/notes', fs=fs) self.tree() """ __protocol__ = 'ssh' @property def host(self): return self.fs.host
[docs] @classmethod def coerce(cls, path): proto, rest = path.split('://') host, _hostpath = rest.split('/', 1) hostpath = '/' + _hostpath fs = cls._new_fs(host=host) self = cls(hostpath, fs=fs) return self
[docs] class MemoryPath(FSPath): """ Ignore: self = MemoryPath('/') self.ls() (self / 'file').write_text('data') self.ls() ref_mempath = MemoryPath('/') ref_mempath.ls() # The MemoryFileSystem is global for the entire program new_mempath = MemoryPath('/', fs=MemoryPath._new_fs()) new_mempath.ls() # Show the entire contents of the memory filesystem new_mempath.fs.store """ __protocol__ = 'memory'