"""Fileson class to manipulate Fileson databases."""
import json, os, time, re
from datetime import datetime
from typing import Any, Tuple, Generator
from logdict import LogDict
from hash import sha_file
# Speed up scanning with scandir in Python 3.5 (or PIP package)
try: from os import scandir
except ImportError: from scandir import scandir
[docs]def scantree(path, skip=lambda x: False):
"""Recursively yield DirEntry objects for given directory."""
for e in scandir(path):
if skip(e.path): continue
yield e # the entry itself
if e.is_dir(follow_symlinks=False):
yield from scantree(e.path, skip)
[docs]def gmt_str(mtime: int=None) -> str:
"""Convert st_mtime to GMT string."""
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(mtime))
[docs]def gmt_epoch(mtime: str) -> int:
"""Convert YYYY-MM-DD HH:MM:SS in GMT to epoch."""
utc_time = datetime.strptime(mtime, '%Y-%m-%d %H:%M:%S')
return int((utc_time - datetime(1970, 1, 1)).total_seconds())
[docs]class Fileson(LogDict):
"""File database with previous versions support based on LogDict.
The file format is fully compatible so you can use :meth:`LogDict.create`
to instantiate one. Special keys like :scan:, :checksum: used for metadata
and additional :meth:`files` and :meth:`dirs` methods expose certain types
of contents. Also, :meth:`set` used to implement "set if changed"
functionality.
"""
summer = {
'none': lambda p,f: None,
'sha1': lambda p,f: sha_file(p),
'sha1fast': lambda p,f: sha_file(p, quick=True)+str(f['size']),
}
[docs] @classmethod
def load_or_scan(cls: 'Fileson', db_or_dir: str, **kwargs) -> 'Fileson':
"""Load Fileson database or create one by scanning a directory.
This basically calls :meth:`load` or creates a new
instance and uses :meth:`scan` after it (passing kwargs).
Args:
db_or_dir (str): Database or directory name
Returns:
Fileson: New class instance
"""
if os.path.isdir(db_or_dir):
fs = cls()
fs.scan(db_or_dir, **kwargs)
return fs
else: return cls.load(db_or_dir)
[docs] @classmethod
def load(cls: 'Fileson', dbfile: str) -> 'Fileson':
"""Overloaded class method to support f.fson~1 history syntax."""
m = re.match(r'(.*)~(\d+)', dbfile)
if m: dbfile = m.group(1)
fs = super(Fileson, cls).load(dbfile)
if m: end = (':scan:', fs[':scan:'] - int(m.group(2)) + 1)
return fs.slice(None, end) if m else fs
[docs] def dirs(self) -> list:
"""Return paths to dirs."""
return [p for p in self if p[0] != ':' and not 'size' in self[p]]
[docs] def files(self) -> list:
"""Return paths to files."""
return [p for p in self if p[0] != ':' and 'size' in self[p]]
[docs] def set(self, key: Any, val: Any) -> bool:
"""Set key to val if there's a change, in which case return True."""
if key in self and self[key] == val: return False
self[key] = val # change will be recorded by LogDict
return True
[docs] def scan(self, directory: str, **kwargs) -> None:
"""Scan a directory for objects or changes.
Every invocation creates a new 'run', a version to Fileson
database. Only changes need to be stored. You can then use
for example :meth:`genItems` and pick only objects that
were changed on a given run.
Args:
directory (str): Directory to scan
**kwargs: Booleans 'verbose' and 'strict' control behaviour
"""
checksum = kwargs.get('checksum', None)
verbose = kwargs.get('verbose', 0)
skiplist = kwargs.get('skip', [])
strict = kwargs.get('strict', False)
make_key = lambda p,f: (p if strict else p.split(os.sep)[-1],
f['modified_gmt'], f['size'])
# Set metadata for run
self[':scan:'] = self.get(':scan:', 0) + 1 # first in a scan!
self[':directory:'] = directory
self[':checksum:'] = checksum
self[':date_gmt:'] = gmt_str()
ccache = {}
if checksum:
for p in self.files():
f = self[p]
if isinstance(f, dict) and checksum in f:
ccache[make_key(p,f)] = f[checksum]
missing = set(self.files()) | set(self.dirs())
skip = lambda p: any(pat in p for pat in skiplist)
startTime, fileCount, byteCount, seenG = time.time(), 0, 0, 0
if verbose: print('Scanning', directory, 'skipping', skiplist)
for e in scantree(directory, skip):
p = os.path.relpath(e.path, directory)
missing.discard(p)
# Store symlink details
if e.is_symlink():
# Get relative path to target
relative = os.path.relpath(os.readlink(e.path), directory)
self.set(p, { 'link': relative,
'modified_gmt': gmt_str(e.stat().st_mtime) })
if verbose > 1: print('Symlink', p, '->', self[p]['link'])
# Process directories
elif e.is_dir(follow_symlinks=False):
self.set(p, { 'modified_gmt': gmt_str(e.stat().st_mtime) })
# Should be a file
else:
f = { 'size': e.stat().st_size,
'modified_gmt': gmt_str(e.stat().st_mtime) }
if checksum:
if verbose > 1 and not make_key(p,f) in ccache:
print(checksum, p)
f[checksum] = ccache.get(make_key(p,f), None) or \
Fileson.summer[checksum](e.path, f)
self.set(p, f)
if verbose >= 1:
fileCount += 1
byteCount += f['size']
if byteCount // 2**30 > seenG:
seenG = byteCount // 2**30
secs = time.time() - startTime
print(f'{fileCount} files, {seenG:.2f} GiB in {secs}s')
for p in missing:
if verbose > 1: print('Removed missing entry', p)
del self[p] # remove elements not seen this time