Find Duplicate Files

The current script is hardwired to search for duplicate files whose names end with .mp4 or .zip, but this is easily changed. The findups() procedure takes a path to search from, and a compiled regular expression (or any object with a .match() method that takes a string and returns True or False).

Usage is then

findups /path/to/search/from
(and I figure that it is easy enough to copy this script and change the line defining the regexp that it isn't worth writing a command line parsing function, and subsequent documentation for command line args required to specify the search pattern.

Script source:

#!/usr/bin/env py

import sys,os,os.path,glob,re,functools,itertools,collections,hashlib
import tempfile,fnmatch

# Find duplicate files
class F:
    def __init__(self,s,n):
        self.s = s
        self.n = n # pathname of file
        self.bb = None # first block of file
        self.hh = None # sha256 hash of file
        self.bs = 1<<16 # 64k
    def b(self):
        if self.bb is None:
            # read first block
            self.bb = open(self.n,"rb").read(self.bs)
        return self.bb
    def h(self):
        if self.hh is None:
            h = hashlib.sha256()
                with open(self.n,"rb") as f:
                    while True:
                        data = f.read(self.bs)
                        if not data:
                self.hh = h.hexdigest()
            except FileNotFoundError:
                self.hh = "File Not Found"
            except PermissionError:
                self.hh = "Permission Error"
    def __repr__(self):
        t = "File<size={},name={}>".format(self.s,self,n)
        if self.hh is not None:
            t[-1:] = ",hash={}>".format(self.hh)
        return t
def mkf(fn):
        s = os.path.getsize(fn)
        rfn = os.path.realpath(fn)
        f = F(s,rfn)
        return f
    except FileNotFoundError:
        print("File not found: {}".format(fn))
        return None
    except PermissionError:
        print("Permission Error: {}".format(fn))
        return None
def blockdict(fs):
    "compute dictionary from files where keys are first 64k of each file"
    d = collections.defaultdict(list)
    for f in fs:
        b = f.b()
    return d
def hashdict(fs):
    "compute dictionary from files where keys are sha256 hashes of each file"
    d = collections.defaultdict(list)
    for f in fs:
        h = f.h()
    return d

# Approach is, for each matching file,
# put in dictionary(list) with key as size
# for elements of dict where list length > 1,
#    get first 64k, put in dictionary with these as keys
#    for elements of this block dict, where list length > 1
#        get sha256 hash of file
#        if sha256 has is equal, report duplicate
# mktemp dir, create files in there, for each duplicate,
# where duplicate files are listed on lines in file dup001.txt
def findups(searchroot,fnre):
    # For a dictionary of files by size
    size_to_files = collections.defaultdict(list)
    sizes_with_mults = set()
    for rt,ds,fs in os.walk(searchroot):
        for fn in fs:
            ffn = os.path.join(rt,fn)
            if fnre.match(fn):
                f = mkf(ffn)
                if f is not None:
                    if len(size_to_files[f.s]) > 1:
                    print("Failed to mkf for {}".format(ffn))
    # Now process sizes with mults
    dupix = 0
    dupdir = tempfile.mkdtemp()
    for s in sizes_with_mults:
        fs = size_to_files[s]
        blocks_to_files = collections.defaultdict(list)
        blocks_with_mults = set()
        for f in fs:
            b = f.b()
            if len(blocks_to_files[b]) > 1:
        # Now if there are blocks with mults, process these
        for b in blocks_with_mults:
            fs = blocks_to_files[b]
            hashes_to_files = collections.defaultdict(list)
            hashes_with_mults = set()
            for f in fs:
                print("Hashing: {}",f.n)
                h = f.h()
                if len(hashes_to_files[h]) > 1:
            # Now write out duplist
            for h in hashes_with_mults:
                dfn = os.path.join(dupdir,"dup{:06d}.txt".format(dupix))
                print("Dup filename: {}".format(dfn))
                open(dfn,"wt").write("\n".join(map(lambda t: t.n,hashes_to_files[h])))
                dupix += 1

# hardwired for now
fnpat = re.compile(r"^.*\.(mp4|zip)")
for s in sys.argv[1:]:
    print("Finding in: {}".format(s))