sweeper/sweeper/sweeper.py

147 lines
4.4 KiB
Python

#!/usr/bin/env python
# Author: Darko Poljak <darko.poljak@gmail.com>
# License: GPLv3
"""Sweeper.
Usage: sweeper.py [options] [<directory>...]
Arguments:
<directory> directory path to scan for files
Options:
-h, --help show this screen
-b <blocksize>, --block-size=<blocksize> size of block used when reading file's
content [default: 4096]
-d <hashalg>, --digest-alg=<hashalg> secure hash algorithm [default: md5]
-a <action>, --action=<action> action on duplicate files
(print, remove, move) [default: print]
-m <directory>, --move=<directory> move duplicate files to directory
(used with move action) [default: ./dups]
"""
__author__ = 'Darko Poljak <darko.poljak@gmail.com>'
__version__ = '0.1.0'
__license__ = 'GPLv3'
__all__ = [
'file_dups', 'rm_file_dups', 'mv_file_dups', 'iter_file_dups'
]
import sys
import hashlib
import os
from collections import defaultdict
# some differences in python versions
if sys.version_info[0] == 3:
def _do_encode(buf):
return buf
def _dict_iter_items(d):
return d.items()
else:
def _do_encode(buf):
return buf
def _dict_iter_items(d):
return d.iteritems()
def _filehash(filepath, hashalg, block_size):
"""Calculate secure hash for given file content using
specified hash algorithm. Use block_size block size
when reading file content.
"""
md = hashlib.new(hashalg)
with open(filepath, "rb") as f:
for buf in iter(lambda: f.read(block_size), b''):
md.update(_do_encode(buf))
return md.hexdigest()
def file_dups(topdirs=['./'], hashalg='md5', block_size=4096):
"""Find duplicate files in directory list. Return directory
with keys equal to file hash value and value as list of
file paths whose content is the same.
"""
dups = defaultdict(list)
for topdir in topdirs:
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
fpath = os.path.join(dirpath, fname)
hexmd = _filehash(fpath, hashalg, block_size)
dups[hexmd].append(fpath)
result = {k: v for k, v in _dict_iter_items(dups) if len(v) > 1}
return result
def rm_file_dups(topdirs=['./'], hashalg='md5', block_size=4096):
"""Remove duplicate files found in specified directory list.
First file in list is kept.
"""
for files in do_with_file_dups(topdirs, hashalg, block_size):
for f in files:
os.remove(f)
def mv_file_dups(topdirs=['./'], hashalg='md5', block_size=4096, dest_dir='dups'):
"""Move duplicate files found in specified directory list.
First file in list is kept in the original directory.
"""
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
if not os.path.isdir(dest_dir):
raise OSError('%s is not a directory' % dest_dir)
import shutil
for files in do_with_file_dups(topdirs, hashalg, block_size):
for i, f in enumerate(files):
if i > 0:
shutil.move(f, dest_dir)
def iter_file_dups(topdirs=['./'], hashalg='md5', block_size=4096):
"""Yield list of duplicate files when found in specified directory list.
"""
dups = file_dups(topdirs, hashalg, block_size)
for fpaths in dups.itervalues():
yield fpaths
def main():
"""Main when used as script. See usage (--help).
"""
import json
from docopt import docopt
arguments = docopt(__doc__)
topdirs = args['<directory>']
if not topdirs:
topdirs = ['./']
action = args['--action']
try:
bs = int(args['--block-size'])
args['--block-size'] = bs
except ValueError:
print('Invalid block size "%s"' % args['--block-size'])
sys.exit(1)
if action == 'print':
dups = file_dups(topdirs, args['--digest-alg'], args['--block-size'])
print(json.dumps(dict(dups), indent=4))
elif action == 'move':
mv_file_dups(topdirs, args['--digest-alg'], args['--block-size'],
args['--move'])
elif action == 'remove':
rm_file_dups(topdirs, args['--digest-alg'], args['--block-size'])
else:
print('Invalid action "%s"' % action)
# if used as script call main function
if __name__ == '__main__':
main()