sweeper/sweeper/sweeper.py

126 lines
3.6 KiB
Python
Raw Normal View History

2014-01-27 06:31:40 +00:00
#!/usr/bin/env python
# Author: Darko Poljak <darko.poljak@gmail.com>
# License: GPLv3
"""Sweeper.
Usage: sweeper.py [options] [<directory>...]
Arguments:
<directory> directory path to scan for files
Options:
-h, --help show this screen
-b <blocksize>, --block-size=<blocksize> size of block used when reading file's
content [default: 4096]
-d <hashalg>, --digest-alg=<hashalg> secure hash algorithm [default: md5]
-a <action>, --action=<action> action on duplicate files
(print, remove, move) [default: print]
-m <directory>, --move=<directory> move duplicate files to directory
(used with move action) [default: ./dups]
"""
__author__ = 'Darko Poljak <darko.poljak@gmail.com>'
__version__ = '0.1.0'
__license__ = 'GPLv3'
__all__ = [
'file_dups', 'rm_file_dups', 'mv_file_dups', 'do_with_file_dups'
]
import sys
import hashlib
import os
from collections import defaultdict
if sys.version_info[0] == 3:
def _do_encode(buf):
return buf
def _dict_iter_items(d):
return d.items()
else:
def _do_encode(buf):
return buf
def _dict_iter_items(d):
return d.iteritems()
def _filehash(filepath, hashalg, block_size):
md = hashlib.new(hashalg)
with open(filepath, "rb") as f:
for buf in iter(lambda: f.read(block_size), b''):
md.update(_do_encode(buf))
return md.hexdigest()
def file_dups(topdirs=['./'], hashalg='md5', block_size=4096):
dups = defaultdict(list)
for topdir in topdirs:
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
fpath = os.path.join(dirpath, fname)
hexmd = _filehash(fpath, hashalg, block_size)
dups[hexmd].append(fpath)
result = {k: v for k, v in _dict_iter_items(dups) if len(v) > 1}
return result
def rm_file_dups(topdirs=['./'], hashalg='md5', block_size=4096):
for files in do_with_file_dups(topdirs, hashalg, block_size):
for f in files:
os.remove(f)
def mv_file_dups(topdirs=['./'], hashalg='md5', block_size=4096, dest_dir='dups'):
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
if not os.path.isdir(dest_dir):
raise OSError('%s is not a directory' % dest_dir)
import shutil
for files in do_with_file_dups(topdirs, hashalg, block_size):
for i, f in enumerate(files):
if i > 0:
shutil.move(f, dest_dir)
def do_with_file_dups(topdirs=['./'], hashalg='md5', block_size=4096):
dups = file_dups(topdirs, hashalg, block_size)
for fpaths in dups.itervalues():
yield fpaths
def main(args):
import json
topdirs = args['<directory>']
if not topdirs:
topdirs = ['./']
action = args['--action']
try:
bs = int(args['--block-size'])
args['--block-size'] = bs
except ValueError:
print('Invalid block size "%s"' % args['--block-size'])
sys.exit(1)
if action == 'print':
dups = file_dups(topdirs, args['--digest-alg'], args['--block-size'])
print(json.dumps(dict(dups), indent=4))
elif action == 'move':
mv_file_dups(topdirs, args['--digest-alg'], args['--block-size'],
args['--move'])
elif action == 'remove':
rm_file_dups(topdirs, args['--digest-alg'], args['--block-size'])
else:
print('Invalid action "%s"' % action)
if __name__ == '__main__':
from docopt import docopt
arguments = docopt(__doc__)
main(arguments)