Source code for simple_dvc.registery

"""
The port of the DVC regsitery from geowatch.


CommandLine:
    sdvc registery list --hardware=ssd --tags=phase2_data
    sdvc registery list --hardware=hdd --tags=phase2_data
    sdvc registery list --hardware=auto --tags=phase2_data
"""
import ubelt as ub
import warnings
import os


[docs] class DataRegistry: """ Provide a quick way of storing and querying for machine specific paths Example: >>> from simple_dvc.registery import * # NOQA >>> self = DataRegistry() >>> self.read() >>> test_dpath = ub.Path.appdir('sdvc/tests/dvc_registry').ensuredir() >>> repo1 = (test_dpath / 'repo1').ensuredir() >>> repo2 = (test_dpath / 'repo2').ensuredir() >>> repo_hdd2 = (test_dpath / 'repo2-hdd').ensuredir() >>> repo_ssd2 = (test_dpath / 'repo2-ssd').ensuredir() >>> repo_ffs2 = (test_dpath / 'repo2-ffs').ensuredir() >>> # >>> self.add('repo1', path=repo1, tags='data_phase1') >>> self.add('repo2', path=repo2, tags='expt_phase1') >>> self.add('repo_hdd2', path=repo_hdd2, hardware='hdd', tags='expt_phase1') >>> self.add('repo_ffs2', path=repo_ffs2, hardware='ffs', tags='expt_phase1', priority=10) >>> self.add('repo_ssd2', path=repo_ssd2, hardware='ssd', tags='expt_phase1') >>> print(self.pandas()) >>> print(ub.urepr(self.read())) >>> self.query(tags='expt_phase1') >>> self.query() """ def __init__(self, registry_fpath=None): if registry_fpath is None: # Default location that allows for backwards compat with # watch / geowatch. watch_config_dpath = ub.Path.appdir(type='config', appname='watch') registry_dpath = (watch_config_dpath / 'registry').ensuredir() registry_fpath = registry_dpath / 'watch_dvc_registry.shelf' self.registry_fpath = registry_fpath self._default_attributes = ub.udict({ 'priority': None, 'hardware': None, 'tags': None, }) # TODO: just use default and filter NoParams self._expected_attrs = { 'name': ub.NoParam, 'path': ub.NoParam, } | self._default_attributes
[docs] def pandas(self, **kwargs): import pandas as pd df = pd.DataFrame(self.query(**kwargs)) if len(df): df['exists'] = df['path'].apply(lambda p: ub.Path(p).exists()) return df
[docs] def list(self, **kwargs): from rich import print print(self.pandas(**kwargs).to_string())
[docs] def _open(self): import shelve shelf = shelve.open(os.fspath(self.registry_fpath)) return shelf
[docs] def add(self, name, path, **kwargs): if name is None: raise ValueError('Must specify a name') if path is None: raise ValueError('Must specify a path') unknown = ub.udict(kwargs) - self._default_attributes if unknown: raise ValueError(f'Unknown kwargs={unknown}') path = ub.Path(path).absolute() if 'hardware' in kwargs: if kwargs['hardware'] == 'auto': from watch.utils import util_hardware info = util_hardware.disk_info_of_path(path) if 'hwtype' in info: kwargs['hardware'] = info['hwtype'] else: print('unable to automatically determine hardware type') kwargs.pop('hardware') row = ub.udict({'name': name, 'path': os.fspath(path)}) | self._default_attributes row |= (kwargs & row) shelf = self._open() try: shelf[name] = row finally: shelf.close()
[docs] def set(self, name, path=None, **kwargs): """ Set an attribute of a row """ if name is None: raise ValueError('Must specify a name') unknown = ub.udict(kwargs) - self._default_attributes if unknown: raise ValueError(f'Unknown kwargs={unknown}') row = ub.udict({'name': name, 'path': path}) | self._default_attributes row |= (kwargs & row) shelf = self._open() try: existing = shelf[name] row |= {existing[k] for k, v in row if v is None} shelf[name] = row finally: shelf.close()
[docs] def remove(self, name): """ Ignore: name = 'test' path = 'foo/bar' hardware = 'fake' """ if name is None: raise ValueError('Must specify a name') shelf = self._open() try: shelf.pop(name) except Exception as ex: warnings.warn('Unable to access shelf: {}'.format(ex)) finally: shelf.close()
[docs] def read(self): """ Ignore: name = 'test' path = 'foo/bar' hardware = 'fake' """ # Hard coded fallback candidate DVC paths hardcoded_paths = [ # {'name': 'namek', 'hardware': 'hdd', 'tags': 'phase1', 'path': ub.Path('/media/joncrall/raid/home/joncrall/data/dvc-repos/smart_watch_dvc')}, # {'name': 'ooo', 'hardware': 'hdd', 'tags': 'phase1', 'path': ub.Path('/media/joncrall/raid/dvc-repos/smart_watch_dvc')}, # {'name': 'rutgers', 'hardware': None, 'tags': 'phase1', 'path': ub.Path('/media/native/data/data/smart_watch_dvc')}, # {'name': 'uky', 'hardware': None, 'tags': 'phase1', 'path': ub.Path('/localdisk0/SCRATCH/watch/ben/smart_watch_dvc')}, # {'name': 'purri', 'hardware': None, 'tags': 'phase1', 'path': ub.Path('/data4/datasets/smart_watch_dvc')}, # {'name': 'crall-ssd', 'hardware': 'ssd', 'tags': 'phase1_data', 'path': ub.Path('~/data/dvc-repos/smart_watch_dvc-ssd').expand()}, # {'name': 'crall-hdd', 'hardware': 'hdd', 'tags': 'phase1_data', 'path': ub.Path('~/data/dvc-repos/smart_watch_dvc-hdd').expand()}, # {'name': 'phase1_standard', 'hardware': None, 'tags': 'phase1_data', 'path': ub.Path('~/data/dvc-repos/smart_watch_dvc').expand()}, {'name': 'hack_data_hdd', 'hardware': 'hdd', 'tags': 'phase2_data', 'path': ub.Path('~/data/dvc-repos/smart_data_dvc').expand()}, {'name': 'hack_expt_hdd', 'hardware': 'hdd', 'tags': 'phase2_expt', 'path': ub.Path('~/data/dvc-repos/smart_expt_dvc').expand()}, {'name': 'hack_data_ssd', 'hardware': 'ssd', 'tags': 'phase2_data', 'path': ub.Path('~/data/dvc-repos/smart_data_dvc-ssd').expand()}, {'name': 'hack_expt_ssd', 'hardware': 'ssd', 'tags': 'phase2_expt', 'path': ub.Path('~/data/dvc-repos/smart_expt_dvc-ssd').expand()}, {'name': 'hack_data2', 'tags': 'phase2_data', 'path': ub.Path('~/data/smart_data_dvc/').expand()}, {'name': 'hack_expt2', 'tags': 'phase2_expt', 'path': ub.Path('~/data/smart_expt_dvc/').expand()}, ] # registry_rows = [row for row in hardcoded_paths if row['path'].exists()] registry_rows = hardcoded_paths.copy() shelf = self._open() try: registry_rows += list(shelf.values()) except Exception as ex: warnings.warn('Unable to access shelf: {}'.format(ex)) finally: shelf.close() registry_rows = sorted( registry_rows, key=lambda r: r['priority'] if r.get('priority', None) is not None else -float('inf'))[::-1] return registry_rows
[docs] def query(self, must_exist=False, **kwargs): unexepcted = ub.udict(kwargs) - self._expected_attrs if unexepcted: raise ValueError( 'Unexpected query keywords: {}. Valid keywords are {}'.format( ub.urepr(list(unexepcted.keys()), nl=0), ub.urepr(list(self._expected_attrs.keys()), nl=0), )) query = ub.udict({k: v for k, v in kwargs.items() if v is not None}) ENABLE_EXPERIMENTAL_SPECIAL_QUERY_LOGIC = 1 if ENABLE_EXPERIMENTAL_SPECIAL_QUERY_LOGIC: special_query = {} if query.get('hardware', None) == 'auto': special_query['hardware'] = query.pop('hardware') results = [] candidate_rows = self.read() for row in candidate_rows: if query: relevant = ub.udict(row).subdict(query, default=None) flag = relevant == query else: flag = True if must_exist: if not ub.Path(row['path']).exists(): flag = False if flag: results.append(row) if ENABLE_EXPERIMENTAL_SPECIAL_QUERY_LOGIC: if special_query.get('hardware') == 'auto': # Make SSDs have higher priority than everything else hardware_to_results = ub.group_items(results, lambda x: x.get('hardware', None)) hardware_to_max_priority = ub.udict() for hardware, subs in hardware_to_results.items(): hardware_to_max_priority[hardware] = max([s.get('priority', 0) or 0 for s in subs]) non_ssd_priority = max(1, 1, *(hardware_to_max_priority - {'ssd'}).values()) min_ssd_priority = min(0, 0, *(hardware_to_max_priority & {'ssd'}).values()) for row in hardware_to_results.get('ssd', []): row['priority'] = (row.get('priority', 0) or 0) - min_ssd_priority + non_ssd_priority * 2 # print('hardware_to_results = {}'.format(ub.urepr(hardware_to_results, nl=2))) HACK_JONS_REMOTE_PATTERN = 1 if HACK_JONS_REMOTE_PATTERN: # If we can detect the remote pattern that jon likes (where remote # machines are mounted via sshfs in the $HOME/remote/$REMOTENAME # directory and the localmachine $HOME is symlinked to via # $HOME/remote/$HOSTNAME) then use that version of the paths so its # easier to work across multiple machines. import platform host = platform.node() for row in results: path = ub.Path(row['path']) if path.exists() and f'remote/{host}' not in str(path): remote_base = ub.Path(f'~/remote/{host}').expand() remote_alt = path.shrinkuser(home=remote_base) if remote_alt.exists(): row['path'] = os.fspath(remote_alt) results = sorted( results, key=lambda r: r['priority'] if r.get('priority', None) is not None else -float('inf'))[::-1] return results
[docs] def find(self, on_error="raise", envvar=None, **kwargs): name = kwargs.get('name', None) if envvar is not None: environ_dvc_dpath = os.environ.get(envvar, "") else: environ_dvc_dpath = None if environ_dvc_dpath and name is None: results = [ub.Path(environ_dvc_dpath)] else: results = [ub.Path(r['path']) for r in self.query(**kwargs)] if not results: print('Error in DataRegistry.find. Listing existing data...') print(self.list()) print('Error in DataRegistry.find. Listing query results...') print(self.list(**kwargs)) print('... for query kwargs = {}'.format(ub.urepr(kwargs, nl=1))) raise Exception('No suitable data directory found') if kwargs.get('must_exist', True): results = [found for found in results if found.exists()] if not results: if on_error == "raise": print('Error in DataRegistry.find. Listing existing data...') print(self.list()) print('Error in DataRegistry.find. Listing query results...') print(self.list(**kwargs)) print('... for query kwargs = {}'.format(ub.urepr(kwargs, nl=1))) raise Exception('No existing data directory found') else: return None return results[0]
[docs] def find_dvc_dpath(name=ub.NoParam, on_error="raise", **kwargs): """ Return the location of the GEOWATCH DVC Data path if it exists and is in a "standard" location. NOTE: other team members can add their "standard" locations if they want. SeeAlso: sdvc WATCH_DATA_DPATH=$(geowatch_dvc) python -m watch.cli.find_dvc --hardware=hdd python -m watch.cli.find_dvc --hardware=ssd """ registry = DataRegistry() if name is not ub.NoParam: kwargs['name'] = name return registry.find(on_error=on_error, **kwargs)
import scriptconfig as scfg # NOQA
[docs] class CommonRegistryConfig(scfg.DataConfig): """ Common CLI arguments for ``dvc registery``. """ name = scfg.Value(None, help='specify a name to query or store or remove', position=1) hardware = scfg.Value(None, help='Specify hdd, ssd, etc..., Setable and getable property') priority = scfg.Value(None, help='Higher is more likely. Setable and getable property') tags = scfg.Value(None, help='User note. Setable and queryable property') path = scfg.Value(None, help='The path to the dvc repo. Setable and queryable property')
[docs] class DVC_RegisteryCLI(scfg.ModalCLI): """ Command line helper to find the path to the watch DVC repo CommandLine: # List currently known directories sdvc registery list # Add a new path (with optional hardware and tags) mkdir -p $HOME/tmp/datadir sdvc registery add --name=testdir --path=$HOME/tmp/datadir --hardware=hdd --tags=mytag # Lookup the newly registered path sdvc registery find --tags mytag # Remove the test entry sdvc registery remove testdir Example: ... """ __command__ = 'registery'
[docs] class Add(CommonRegistryConfig): """ Register a new path (or overwrite / update an existing one) """ __command__ = 'add'
[docs] @classmethod def main(cls, cmdline=1, **kwargs): config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) registry = DataRegistry() registry.add(**config)
[docs] class List(CommonRegistryConfig): """ List registered paths """ __command__ = 'list' must_exist = scfg.Value(False, help='if True, filter to only directories that exist. Defaults to false except on "find", which is True.')
[docs] @classmethod def main(cls, cmdline=1, **kwargs): config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) config = dict(config) must_exist = config.pop('must_exist') registry = DataRegistry() registry.list(**config, must_exist=must_exist)
[docs] class Remove(CommonRegistryConfig): """ Remove a registered path """ __command__ = 'remove'
[docs] @classmethod def main(cls, cmdline=1, **kwargs): config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) registry = DataRegistry() registry.remove(**config)
[docs] class Set(CommonRegistryConfig): """ Set properties of a registered path """ __command__ = 'set'
[docs] @classmethod def main(cls, cmdline=1, **kwargs): config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) registry = DataRegistry() registry.set(**config)
[docs] class Find(CommonRegistryConfig): """ Search for a path registered via ``sdvc registry add`` """ __command__ = 'find' must_exist = scfg.Value(True, help='if True, filter to only directories that exist. Defaults to false except on "find", which is True.')
[docs] @classmethod def main(cls, cmdline=1, **kwargs): config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) config = dict(config) must_exist = config.pop('must_exist') registry = DataRegistry() dpath = registry.find(**config, must_exist=must_exist) print(dpath)
if __name__ == '__main__': """ CommandLine: python ~/code/simple_dvc/simple_dvc/registery.py list """ DVC_RegisteryCLI.main()