"""Fileson class to manipulate Fileson databases."""
import json, os, time, re
from collections import defaultdict
from typing import Any, Tuple, Generator
from logdict import LogDict
from hash import sha_file
[docs]def gmt_str(mtime: int=None) -> str:
"""Convert st_mtime to GMT string."""
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(mtime))
[docs]class Fileson(LogDict):
"""File database with previous versions support based on LogDict.
The file format is fully compatible so you can use :meth:`LogDict.create`
to instantiate one. Special keys like :scan:, :checksum: used for metadata
and additional :meth:`files` and :meth:`dirs` methods expose certain types
of contents. Also, :meth:`set` used to implement "set if changed"
functionality.
"""
summer = {
'sha1': lambda p,f: sha_file(p),
'sha1fast': lambda p,f: sha_file(p, quick=True)+str(f['size']),
}
[docs] @classmethod
def load_or_scan(cls: 'Fileson', db_or_dir: str, **kwargs) -> 'Fileson':
"""Load Fileson database or create one by scanning a directory.
This basically calls :meth:`load` or creates a new
instance and uses :meth:`scan` after it (passing kwargs).
Args:
db_or_dir (str): Database or directory name
Returns:
Fileson: New class instance
"""
if os.path.isdir(db_or_dir):
fs = cls()
fs.scan(db_or_dir, **kwargs)
return fs
else: return cls.load(db_or_dir)
[docs] @classmethod
def load(cls: 'Fileson', dbfile: str) -> 'Fileson':
"""Overloaded class method to support f.fson~1 history syntax."""
m = re.match('(.*)~(\d+)', dbfile)
if m: dbfile = m.group(1)
fs = super(Fileson, cls).load(dbfile)
if m: end = (':scan:', fs[':scan:'] - int(m.group(2)) + 1)
return fs.slice(None, end) if m else fs
[docs] def dirs(self) -> list:
"""Return paths to dirs."""
return [p for p in self if p[0] != ':' and not 'size' in self[p]]
[docs] def files(self) -> list:
"""Return paths to files."""
return [p for p in self if p[0] != ':' and 'size' in self[p]]
[docs] def set(self, key: Any, val: Any) -> bool:
"""Set key to val if there's a change, in which case return True."""
if key in self and self[key] == val: return False
self[key] = val # change will be recorded by LogDict
return True
[docs] def scan(self, directory: str, **kwargs) -> None:
"""Scan a directory for objects or changes.
Every invocation creates a new 'run', a version to Fileson
database. Only changes need to be stored. You can then use
for example :meth:`genItems` and pick only objects that
were changed on a given run.
Args:
directory (str): Directory to scan
\*\*kwargs: Booleans 'verbose' and 'strict' control behaviour
"""
checksum = kwargs.get('checksum', None)
verbose = kwargs.get('verbose', 0)
strict = kwargs.get('strict', False)
make_key = lambda p,f: (p if strict else p.split(os.sep)[-1],
f['modified_gmt'], f['size'])
# Set metadata for run
self[':scan:'] = self.get(':scan:', 0) + 1 # first in a scan!
self[':directory:'] = directory
self[':checksum:'] = checksum
self[':date_gmt:'] = gmt_str()
ccache = {}
missing = set(self.files()) | set(self.dirs())
if checksum:
for p in self.files():
f = self[p]
if isinstance(f, dict) and checksum in f:
ccache[make_key(p,f)] = f[checksum]
startTime = time.time()
fileCount, byteCount, nextG = 0, 0, 1
for dirName, subdirList, fileList in os.walk(directory):
p = os.path.relpath(dirName, directory)
self.set(p, { 'modified_gmt': gmt_str(os.stat(dirName).st_mtime) })
missing.discard(p)
for fname in fileList:
fpath = os.path.join(dirName, fname)
p = os.path.relpath(fpath, directory) # relative for csLookup
s = os.stat(fpath)
f = { 'size': s.st_size, 'modified_gmt': gmt_str(s.st_mtime) }
if checksum:
if verbose > 1 and not make_key(p,f) in ccache:
print(checksum, p)
f[checksum] = ccache.get(make_key(p,f), None) or \
Fileson.summer[checksum](fpath, f)
self.set(p, f)
missing.discard(p)
if verbose >= 1:
fileCount += 1
byteCount += f['size']
if byteCount > nextG * 2**30:
nextG = byteCount // 2**30 + 1;
elapsed = time.time() - startTime
print(fileCount, 'files processed',
'%.1f G in %.2f s' % (byteCount/2**30, elapsed))
# Mark missing elements as removed (if not already so)
for p in missing: del self[p]