Other Python Script Pages

Find Duplicate Files

The current script is hardwired to search for duplicate files whose names end with .mp4 or .zip, but this is easily changed. The findups() procedure takes a path to search from, and a compiled regular expression (or any object with a .match() method that takes a string and returns True or False).

Usage is then

findups /path/to/search/from
(and I figure that it is easy enough to copy this script and change the line defining the regexp that it isn't worth writing a command line parsing function, and subsequent documentation for command line args required to specify the search pattern.

Script source:

#!/usr/bin/env py

import sys,os,os.path,glob,re,functools,itertools,collections,hashlib
import tempfile,fnmatch

# Find duplicate files
class F:
    def __init__(self,s,n):
        self.s = s
        self.n = n # pathname of file
        self.bb = None # first block of file
        self.hh = None # sha256 hash of file
        self.bs = 1<<16 # 64k
    def b(self):
        if self.bb is None:
            # read first block
            self.bb = open(self.n,"rb").read(self.bs)
        return self.bb
    def h(self):
        if self.hh is None:
            h = hashlib.sha256()
            try:
                with open(self.n,"rb") as f:
                    while True:
                        data = f.read(self.bs)
                        if not data:
                            break
                        h.update(data)
                self.hh = h.hexdigest()
            except FileNotFoundError:
                self.hh = "File Not Found"
            except PermissionError:
                self.hh = "Permission Error"
    def __repr__(self):
        t = "File<size={},name={}>".format(self.s,self,n)
        if self.hh is not None:
            t[-1:] = ",hash={}>".format(self.hh)
        return t
def mkf(fn):
    try:
        s = os.path.getsize(fn)
        rfn = os.path.realpath(fn)
        f = F(s,rfn)
        return f
    except FileNotFoundError:
        print("File not found: {}".format(fn))
        return None
    except PermissionError:
        print("Permission Error: {}".format(fn))
        return None
def blockdict(fs):
    "compute dictionary from files where keys are first 64k of each file"
    d = collections.defaultdict(list)
    for f in fs:
        b = f.b()
        d[b].append(f)
    return d
def hashdict(fs):
    "compute dictionary from files where keys are sha256 hashes of each file"
    d = collections.defaultdict(list)
    for f in fs:
        h = f.h()
        d[h].append(f)
    return d

# Approach is, for each matching file,
# put in dictionary(list) with key as size
# for elements of dict where list length > 1,
#    get first 64k, put in dictionary with these as keys
#    for elements of this block dict, where list length > 1
#        get sha256 hash of file
#        if sha256 has is equal, report duplicate
# mktemp dir, create files in there, for each duplicate,
# where duplicate files are listed on lines in file dup001.txt
def findups(searchroot,fnre):
    # For a dictionary of files by size
    size_to_files = collections.defaultdict(list)
    sizes_with_mults = set()
    for rt,ds,fs in os.walk(searchroot):
        for fn in fs:
            ffn = os.path.join(rt,fn)
            if fnre.match(fn):
                f = mkf(ffn)
                if f is not None:
                    size_to_files[f.s].append(f)
                    if len(size_to_files[f.s]) > 1:
                        sizes_with_mults.add(f.s)
                else:
                    print("Failed to mkf for {}".format(ffn))
    # Now process sizes with mults
    dupix = 0
    dupdir = tempfile.mkdtemp()
    for s in sizes_with_mults:
        fs = size_to_files[s]
        blocks_to_files = collections.defaultdict(list)
        blocks_with_mults = set()
        for f in fs:
            b = f.b()
            blocks_to_files[b].append(f)
            if len(blocks_to_files[b]) > 1:
                blocks_with_mults.add(b)
        # Now if there are blocks with mults, process these
        for b in blocks_with_mults:
            fs = blocks_to_files[b]
            hashes_to_files = collections.defaultdict(list)
            hashes_with_mults = set()
            for f in fs:
                print("Hashing: {}",f.n)
                h = f.h()
                hashes_to_files[h].append(f)
                if len(hashes_to_files[h]) > 1:
                    hashes_with_mults.add(h)
            # Now write out duplist
            for h in hashes_with_mults:
                dfn = os.path.join(dupdir,"dup{:06d}.txt".format(dupix))
                print("Dup filename: {}".format(dfn))
                open(dfn,"wt").write("\n".join(map(lambda t: t.n,hashes_to_files[h])))
                dupix += 1

# hardwired for now
fnpat = re.compile(r"^.*\.(mp4|zip)")
for s in sys.argv[1:]:
    print("Finding in: {}".format(s))
    findups(s,fnpat)