Commit 0c04f67b authored by Darko Poljak's avatar Darko Poljak

Rewriten as class Sweeper with code improvements and optimizations.

parent 41cd0fe6
Pipeline #59 failed with stages
......@@ -10,28 +10,31 @@ Print duplicates
.. code:: python
from sweeper import file_dups
dups = file_dups(['images1', 'images2'])
from sweeper import Sweeper
swp = Sweeper(['images1', 'images2'])
dups = swp.file_dups()
print(dups)
Remove duplicate files
.. code:: python
from sweeper import rm_file_dups
rm_file_dups(['images'])
from sweeper import Sweeper
swp = Sweeper(['images1', 'images2'])
swp.rm()
Perform custom action
.. code:: python
from sweeper import iter_file_dups
for f, h, dups in iter_file_dups(['images']):
from sweeper import Sweeper
swp = Sweeper(['images'])
for f, h, dups in swp:
print('encountered {} which duplicates with already found duplicate files {} with hash {}'.format(f, dups, h))
As script::
python sweeper.py --help
python -m sweeper/sweeper --help
As installed console script::
......
* Play it safe and add byte by byte comparison option for hash dup files?
Or use one more, different, hash algorithm?
from __future__ import absolute_import
from .sweeper import file_dups, mv_file_dups, rm_file_dups, iter_file_dups
from .sweeper import Sweeper
__all__ = ['file_dups', 'mv_file_dups', 'rm_file_dups', 'iter_file_dups']
__all__ = ['Sweeper']
......@@ -57,21 +57,18 @@ Options:
from __future__ import print_function
__author__ = 'Darko Poljak <darko.poljak@gmail.com>'
__version__ = '0.6.0'
__version__ = '0.9.0'
__license__ = 'GPLv3'
__all__ = [
'file_dups', 'rm_file_dups', 'mv_file_dups', 'iter_file_dups'
]
__all__ = ['Sweeper']
import sys
import hashlib
import os
from collections import defaultdict
from functools import partial
import hashlib
DEF_HASHALGS = ['sha1']
# some differences in python versions
# we prefer iter methods
if sys.version_info[0] == 3:
......@@ -109,40 +106,7 @@ def _filehash(filepath, hashalg, block_size):
def _uniq_list(list_):
return set(list_)
def _gather_file_list(dirs):
'''Gather file paths in directory list dirs.
Return tuple (count, files) where count is files
list length and files is list of file paths in
specified directories.
'''
count = 0
files = []
for dir_ in dirs:
for dirpath, dirnames, filenames in os.walk(dir_):
count += len(filenames)
# replace fpath with realpath value (eliminate symbolic links)
files.extend([os.path.realpath(os.path.join(dirpath, fname))
for fname in filenames])
return (count, files)
# iter through file paths in files list
def _files_iter_from_list(files):
for fpath in files:
yield fpath
# iter through file paths by os.walking
def _files_iter_from_disk(topdirs):
for topdir in topdirs:
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
# replace fpath with realpath value (eliminate symbolic links)
fpath = os.path.realpath(os.path.join(dirpath, fname))
yield fpath
return list(set(list_))
def _fbequal(fpath1, fpath2):
......@@ -166,195 +130,10 @@ def _fbequal(fpath1, fpath2):
return False
def file_dups(topdirs=['./'], hashalgs=DEF_HASHALGS, block_size=4096, verbose=False,
safe_mode=False):
"""Find duplicate files in directory list. Return directory
with keys equal to file hash value and value as list of
file paths whose content is the same.
If safe_mode is true then you want to play safe: do byte
by byte comparison for hash duplicate files.
"""
dups = defaultdict(list)
# replace dir paths with realpath value (eliminate symbolic links)
for i in range(len(topdirs)):
topdirs[i] = os.path.realpath(topdirs[i])
if verbose:
if safe_mode:
print('safe mode is on')
print('gathering and counting files...', end='')
sys.stdout.flush()
count, files = _gather_file_list(topdirs)
current = 1
print(count)
_files_iter = partial(_files_iter_from_list, files)
else:
_files_iter = partial(_files_iter_from_disk, topdirs)
for fpath in _files_iter():
if verbose:
print('\rprocessing file {0}/{1}: calc hash'.format(current,
count),
end='')
sys.stdout.flush()
hexmds = [_filehash(fpath, h, block_size) for h in hashalgs]
hexmd = tuple(hexmds)
dup_files = dups[hexmd]
files_equals = False
if safe_mode:
if dup_files:
if verbose:
print('\rprocessing file {0}/{1}: byte cmp'.format(current,
count),
end='')
sys.stdout.flush()
for f in dup_files:
if _fbequal(f, fpath):
files_equals = True
break
if verbose and not files_equals:
print('\nsame hash value {} but not same bytes for file {}'
' with files {}'.format(hexmd, fpath, dup_files))
else: # when list is empty in safe mode
files_equals = True
else:
files_equals = True # when safe mode is off
if verbose:
current += 1
if files_equals:
dups[hexmd].append(fpath)
if verbose:
print('')
# make result dict with unique file paths list
result = {}
for k, v in _dict_iter_items(dups):
uniq_v = _uniq_list(v)
if len(uniq_v) > 1:
result[k] = uniq_v
return result
def iter_file_dups(topdirs=['./'], hashalgs=DEF_HASHALGS, block_size=4096,
safe_mode=False):
"""Find duplicate files in directory list.
Yield tuple of file path, hash tuple and list of duplicate files
as soon as duplicate file is found.
Newly found file is not included in the list at the yield time,
but is appended later before next yield.
This means that not all duplicate files are returned with any
return value. Same hash value and sublist could be returned later
if file with same content is found.
If safe_mode is true then you want to play safe: do byte
by byte comparison for hash duplicate files.
"""
# internaly, file dups dict is still maintained
dups = defaultdict(list)
# replace dir paths with realpath value (eliminate symbolic links)
for i in range(len(topdirs)):
topdirs[i] = os.path.realpath(topdirs[i])
_files_iter = partial(_files_iter_from_disk, topdirs)
for fpath in _files_iter():
hexmds = [_filehash(fpath, h, block_size) for h in hashalgs]
hexmd = tuple(hexmds)
dup_files = dups[hexmd]
# there were dup list elements (used for yield)
if safe_mode and dup_files:
# compare only with first file in dup_files
# all files in dup_files list are already content equal
files_equals = _fbequal(dup_files[0], fpath)
else: # when list is emtpy in safe mode or when safe mode is off
files_equals = True
if files_equals:
# yield only if current dup files list isn't empty
if dup_files:
yield (fpath, hexmd, dups[hexmd])
# finally append newly found file to dup list
dups[hexmd].append(fpath)
def _extract_files_for_action(topdirs, hashalgs, block_size, keep_prefix,
verbose, safe_mode):
for files in _dict_iter_values(file_dups(topdirs=topdirs,
hashalgs=hashalgs, block_size=block_size,
verbose=verbose, safe_mode=safe_mode)):
found = False
if keep_prefix:
result = []
for f in files:
if f.startswith(keep_prefix) and not found:
found = True
else:
result.append(f)
if not found:
result = files[1:]
yield (files, result)
def rm_file_dups(topdirs=['./'], hashalgs=DEF_HASHALGS, block_size=4096,
simulate=False, keep_prefix=None, verbose=False,
safe_mode=False):
"""Remove duplicate files found in specified directory list.
If keep_prefix is specified then first file with that path
prefix found is kept in the original directory.
Otherwise first file in list is kept in the original directory.
If simulate is True then only print the action, do not actually
perform it.
If safe_mode is true then do byte by byte comparison for
hash duplicate files.
"""
for dups, extracted in _extract_files_for_action(topdirs, hashalgs,
block_size, keep_prefix,
verbose, safe_mode):
if simulate or verbose:
print('found duplicates: \n{}'.format(dups))
for f in extracted:
if simulate or verbose:
print('rm {}'.format(f))
if not simulate:
os.remove(f)
def mv_file_dups(topdirs=['./'], hashalgs=DEF_HASHALGS, block_size=4096,
dest_dir='dups', simulate=False, keep_prefix=None,
verbose=False, safe_mode=False):
"""Move duplicate files found in specified directory list.
If keep_prefix is specified then first file with that path
prefix found is kept in the original directory.
Otherwise first file in list is kept in the original directory.
If simulate is True then only print the action, do not actually
perform it.
If safe_mode is true then do byte by byte comparison for
hash duplicate files.
"""
import shutil
if not os.path.exists(dest_dir):
if simulate:
print('mkdir {}'.format(dest_dir))
else:
os.mkdir(dest_dir)
elif not os.path.isdir(dest_dir):
errmsg = '{} is not a directory'.format(dest_dir)
if simulate:
print('would raise:', errmsg)
else:
raise OSError(errmsg)
for dups, extracted in _extract_files_for_action(topdirs, hashalgs,
block_size, keep_prefix,
verbose, safe_mode):
if simulate or verbose:
print('found duplicates: \n{}'.format(dups))
for f in extracted:
if simulate or verbose:
print('mv {0} to {1}'.format(f, dest_dir))
if not simulate:
shutil.move(f, dest_dir)
def _remap_keys_to_str(d):
'''Iterator that remaps dictionary keys to string in case keys are tuple
or list. Leave it unchanged otherwise.
Yields string key, value pairs.
'''
for k in _dict_iter_keys(d):
if isinstance(k, tuple) or isinstance(k, list):
......@@ -364,6 +143,214 @@ def _remap_keys_to_str(d):
yield (key, d[k])
def _gather_file_list(dirs):
'''Gather file paths in directory list dirs.
Return tuple (count, files) where count is files
list length and files is list of file paths in
specified directories.
'''
files = []
for dir_ in dirs:
for dirpath, dirnames, filenames in os.walk(dir_):
# replace fpath with realpath value (eliminate symbolic links)
files.extend([os.path.realpath(os.path.join(dirpath, fname))
for fname in filenames])
return files
class Sweeper(object):
DEF_HASHALGS = ['sha1']
def __init__(self, topdirs=['./'], hashalgs=DEF_HASHALGS,
block_size=4096, verbose=False, safe_mode=False):
# replace dir paths with realpath value (eliminate symbolic links)
self.topdirs = []
for i in range(len(topdirs)):
self.topdirs.append(os.path.realpath(topdirs[i]))
self.hashalgs = hashalgs
self.block_size = block_size
self.verbose = verbose
self.safe_mode = safe_mode
# iter through file paths in files list
def _files_iter_from_list(self, files):
return (fpath for fpath in files)
# iter through file paths by os.walking
def _files_iter_from_disk(self):
for topdir in self.topdirs:
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
# replace fpath with realpath value
# (eliminate symbolic links)
fpath = os.path.realpath(os.path.join(dirpath, fname))
yield fpath
def file_dups(self):
"""Find duplicate files in directory list. Return directory
with keys equal to file hash value and value as list of
file paths whose content is the same.
If safe_mode is true then you want to play safe: do byte
by byte comparison for hash duplicate files.
"""
dups = defaultdict(list)
if self.verbose:
if self.safe_mode:
print('safe mode is on')
print('gathering and counting files...', end='')
sys.stdout.flush()
files = _gather_file_list(self.topdirs)
count = len(files)
current = 1
print(count)
_files_iter = partial(self._files_iter_from_list, files)
else:
_files_iter = self._files_iter_from_disk
for fpath in _files_iter():
if self.verbose:
print('\rprocessing file {0}/{1}: calc hash'.format(current,
count),
end='')
sys.stdout.flush()
hexmds = [_filehash(fpath, h, self.block_size)
for h in self.hashalgs]
hexmd = tuple(hexmds)
dup_files = dups[hexmd]
files_equals = False
if self.safe_mode:
if dup_files:
if self.verbose:
print('\rprocessing file {0}/{1}: byte cmp'.format(
current, count), end='')
sys.stdout.flush()
for f in dup_files:
if _fbequal(f, fpath):
files_equals = True
break
if self.verbose and not files_equals:
print('\nsame hash value {} but not same bytes for'
' file {} with files {}'.format(
hexmd, fpath, dup_files))
else: # when list is empty in safe mode
files_equals = True
else:
files_equals = True # when safe mode is off
if self.verbose:
current += 1
if files_equals:
dups[hexmd].append(fpath)
if self.verbose:
print('')
# make result dict with unique file paths list
result = {}
for k, v in _dict_iter_items(dups):
uniq_v = _uniq_list(v)
if len(uniq_v) > 1:
result[k] = uniq_v
return result
def __iter__(self):
"""Find duplicate files in directory list.
Yield tuple of file path, hash tuple and list of duplicate files
as soon as duplicate file is found.
Newly found file is not included in the list at the yield time,
but is appended later before next yield.
This means that not all duplicate files are returned with any
return value. Same hash value and sublist could be returned later
if file with same content is found.
If safe_mode is true then you want to play safe: do byte
by byte comparison for hash duplicate files.
"""
# internaly, file dups dict is still maintained
dups = defaultdict(list)
_files_iter = self._files_iter_from_disk
for fpath in _files_iter():
hexmds = [_filehash(fpath, h, self.block_size)
for h in self.hashalgs]
hexmd = tuple(hexmds)
dup_files = dups[hexmd]
# there were dup list elements (used for yield)
if self.safe_mode and dup_files:
# compare only with first file in dup_files
# all files in dup_files list are already content equal
files_equals = _fbequal(dup_files[0], fpath)
else: # when list is emtpy in safe mode or when safe mode is off
files_equals = True
if files_equals:
# yield only if current dup files list isn't empty
if dup_files:
yield (fpath, hexmd, dups[hexmd])
# finally append newly found file to dup list
dups[hexmd].append(fpath)
def _extract_files_for_action(self, keep_prefix):
dups = self.file_dups()
for files in _dict_iter_values(dups):
found = False
if keep_prefix:
result = []
for f in files:
if f.startswith(keep_prefix) and not found:
found = True
else:
result.append(f)
if not found:
result = list(files)[1:]
yield (files, result)
def _do_action(self, simulate, keep_prefix, action, action_str):
for dups, extracted in self._extract_files_for_action(keep_prefix):
if simulate or self.verbose:
print('found duplicates: \n{}'.format(dups))
for f in extracted:
if simulate or self.verbose:
print(action_str.format(f))
if not simulate:
action(f)
def rm(self, simulate=False, keep_prefix=None):
"""Remove duplicate files found in specified directory list.
If keep_prefix is specified then first file with that path
prefix found is kept in the original directory.
Otherwise first file in list is kept in the original directory.
If simulate is True then only print the action, do not actually
perform it.
If safe_mode is true then do byte by byte comparison for
hash duplicate files.
"""
self._do_action(simulate, keep_prefix, os.remove, 'rm {}')
def mv(self, dest_dir='dups', simulate=False, keep_prefix=None):
"""Move duplicate files found in specified directory list.
If keep_prefix is specified then first file with that path
prefix found is kept in the original directory.
Otherwise first file in list is kept in the original directory.
If simulate is True then only print the action, do not actually
perform it.
If safe_mode is true then do byte by byte comparison for
hash duplicate files.
"""
import shutil
if not os.path.exists(dest_dir):
if simulate:
print('mkdir {}'.format(dest_dir))
else:
os.mkdir(dest_dir)
elif not os.path.isdir(dest_dir):
errmsg = '{} is not a directory'.format(dest_dir)
if simulate:
print('would raise:', errmsg)
else:
raise OSError(errmsg)
self._do_action(simulate, keep_prefix,
partial(shutil.move, dst=dest_dir),
'mv {0} to ' + dest_dir)
def main():
"""Main when used as script. See usage (--help).
"""
......@@ -398,12 +385,11 @@ def main():
dest_dir = args['--move']
safe_mode = args['--safe-mode']
sweeper = Sweeper(topdirs=topdirs, hashalgs=hashalgs,
block_size=block_size, verbose=verbose,
safe_mode=safe_mode)
if action == 'print' or action == 'pprint':
dups = file_dups(topdirs=topdirs,
hashalgs=hashalgs,
block_size=block_size,
verbose=verbose,
safe_mode=safe_mode)
dups = sweeper.file_dups()
# defaultdict(list) -> dict
spam = dict(dups)
if spam:
......@@ -417,20 +403,9 @@ def main():
print(json.dumps({k: v for k, v in _remap_keys_to_str(spam)},
indent=4))
elif action == 'move':
mv_file_dups(topdirs=topdirs, hashalgs=hashalgs,
block_size=block_size,
dest_dir=dest_dir,
simulate=simulate,
keep_prefix=keep_prefix,
verbose=verbose,
safe_mode=safe_mode)
sweeper.mv(dest_dir, simulate, keep_prefix)
elif action == 'remove':
rm_file_dups(topdirs=topdirs, hashalgs=hashalgs,
block_size=block_size,
simulate=simulate,
keep_prefix=keep_prefix,
verbose=verbose,
safe_mode=safe_mode)
sweeper.rm(simulate, keep_prefix)
else:
print('Invalid action "{}"'.format(action))
......
......@@ -3,7 +3,7 @@
# License: GPLv3
import unittest
from sweeper import file_dups, iter_file_dups
from sweeper import Sweeper
import os
mydir = os.path.dirname(os.path.realpath(__file__))
......@@ -11,7 +11,8 @@ mydir = os.path.dirname(os.path.realpath(__file__))
class TestSweeper(unittest.TestCase):
def test_file_dups_dups(self):
dups = file_dups([os.path.join(mydir, 'testfiles_dups')])
swp = Sweeper(topdirs=[os.path.join(mydir, 'testfiles_dups')])
dups = swp.file_dups()
dups_exist = False
for h, flist in dups.items():
if len(flist) > 1:
......@@ -19,24 +20,26 @@ class TestSweeper(unittest.TestCase):
self.assertTrue(dups_exist)
def test_file_dups_nodups(self):
dups = file_dups([os.path.join(mydir, 'testfiles_nodups')])
swp = Sweeper(topdirs=[os.path.join(mydir, 'testfiles_nodups')])
dups = swp.file_dups()
for h, flist in dups.items():
self.assertTrue(len(flist) == 1)
# does not actually test safe_mode, we would need to find
# hash collision
def test_file_dups_safe_mode(self):
dups = file_dups([os.path.join(mydir, 'testfiles_dups')],
safe_mode=True)
swp = Sweeper(topdirs=[os.path.join(mydir, 'testfiles_dups')],
safe_mode=True)
dups = swp.file_dups()
for h, flist in dups.items():
if len(flist) > 1:
dups_exist = True
self.assertTrue(dups_exist)
def test_iter_file_dups_dups(self):
it = iter_file_dups([os.path.join(mydir, 'testfiles_dups')])
swp = Sweeper(topdirs=[os.path.join(mydir, 'testfiles_dups')])
dups_exist = False
for x in it:
for x in swp:
dups_exist = True
filepath, h, dups = x
self.assertNotIn(filepath, dups)
......@@ -44,9 +47,9 @@ class TestSweeper(unittest.TestCase):
self.assertTrue(dups_exist)
def test_iter_file_dups_nodups(self):
it = iter_file_dups([os.path.join(mydir, 'testfiles_nodups')])
swp = Sweeper([os.path.join(mydir, 'testfiles_nodups')])
dups_exist = False
for x in it:
for x in swp:
dups_exist = True
break
self.assertFalse(dups_exist)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment