diff --git a/ex2.py b/ex2.py index 377466c..b467f89 100644 --- a/ex2.py +++ b/ex2.py @@ -1,7 +1,14 @@ +""" +Author: Lukas Heiligenbrunner +Matr.Nr.: K12104785 +Exercise 2 +""" + import os.path from enum import Enum from glob import glob from typing import TextIO +import hashlib import PIL from PIL import Image, ImageStat @@ -14,7 +21,7 @@ def mkdirIfNotExists(path: str): def validSuffix(file: str) -> bool: - return file.endswith((".jpg", ".jpeg", ".JPG")) + return file.endswith((".jpg", ".jpeg", ".JPG", ".JPEG")) def validFileSize(file: str) -> bool: @@ -53,7 +60,8 @@ class FileLogger(metaclass=Singleton): self.file.close() def log(self, filename: str, errorcode: ErrorCodes): - self.file.write(f"{filename},{errorcode.value}\n") + # enforce a CRLF line ending with \r\n !! + self.file.write(f"{os.path.basename(filename)};{errorcode.value}\r\n") def openImage(path: str) -> PIL.Image.Image or None: @@ -89,54 +97,79 @@ def validVariance(img: PIL.Image.Image) -> bool: return min(variances) > .0 -def processFile(f: str): +class FileCopy(metaclass=Singleton): + format: str + out_dir: str + + hashmap: [str] + idx: int + + def init(self, out_dir: str, format: str): + self.format = format + self.out_dir = out_dir + self.idx = 1 + self.hashmap = [] + + def copyFile(self, img: PIL.Image.Image) -> bool: + hexhash = hashlib.md5(img.__array__()).hexdigest() + if hexhash in self.hashmap: + return False + # element already copied + else: + self.hashmap.append(hexhash) + # apply format string + filename = f"%{self.format}" % self.idx + self.idx += 1 + # copy file + im = PIL.Image.fromarray(img.__array__()) + im.save(os.path.join(self.out_dir, f"{filename}.jpg"), format="jpeg") + im.close() + return True + + +def processFile(f: str) -> bool: # check suffix if not validSuffix(f): log(f, ErrorCodes.Extension) FileLogger().log(f, ErrorCodes.Extension) - return + return False # check file size if not validFileSize(f): log(f, ErrorCodes.FileSize) FileLogger().log(f, ErrorCodes.FileSize) - return + return False img = openImage(f) if img is None: log(f, ErrorCodes.InvalidImage) FileLogger().log(f, ErrorCodes.InvalidImage) - return + return False if not validImageShape(img): log(f, ErrorCodes.ImageShape) FileLogger().log(f, ErrorCodes.ImageShape) - return + return False if not validVariance(img): log(f, ErrorCodes.VarianceLZero) FileLogger().log(f, ErrorCodes.VarianceLZero) - return + return False # lets copy the image data - # todo check if image has already be copied + if not FileCopy().copyFile(img): + log(f, ErrorCodes.Duplicate) + FileLogger().log(f, ErrorCodes.Duplicate) + return False + + # ok nice if we are here we have successfully copied the img (: # close image img.close() + return True -def findFiles(path: str) -> [str]: - files = glob(path + '/**/*', recursive=True) - print(files) - - # sort filenames - files.sort() - - for f in files: - processFile(f) - - -def validate_images(input_dir: str, output_dir: str, log_file: str, formatter: str or None = None): +def validate_images(input_dir: str, output_dir: str, log_file: str, formatter: str = "05d") -> int: # check if out_put dir exists mkdirIfNotExists(output_dir) @@ -146,12 +179,24 @@ def validate_images(input_dir: str, output_dir: str, log_file: str, formatter: s # init file logger FileLogger().init(log_file) + FileCopy().init(output_dir, formatter) # scan input dir for files - findFiles(input_dir) - pass + files = glob(input_dir + '/**/*.*', recursive=True) + + # sort filenames + files.sort() + + succcounter = 0 + for f in files: + if processFile(f): + succcounter += 1 + + FileLogger().close() + return succcounter # Press the green button in the gutter to run the script. if __name__ == '__main__': - validate_images("./input", "./output", "logs/1/log.txt") + print(validate_images("unittest/unittest_input_0", "unittest/outputs/unittest_input_0", + "unittest/outputs/unittest_input_0.log", "06d")) diff --git a/ex2_unittest.py b/ex2_unittest.py new file mode 100644 index 0000000..28df537 --- /dev/null +++ b/ex2_unittest.py @@ -0,0 +1,159 @@ +""" +Author -- Michael Widrich, Andreas Schörgenhumer +Contact -- schoergenhumer@ml.jku.at +Date -- 04.03.2022 + +############################################################################### + +The following copyright statement applies to all code within this file. + +Copyright statement: +This material, no matter whether in printed or electronic form, +may be used for personal and non-commercial educational use only. +Any reproduction of this manuscript, no matter whether as a whole or in parts, +no matter whether in printed or in electronic form, requires explicit prior +acceptance of the authors. + +############################################################################### + +Images taken from: https://pixabay.com/ +""" + +import hashlib +import os +import shutil +import sys +from glob import glob + +import dill as pkl + + +def print_outs(outs, line_token="-"): + print(line_token * 40) + print(outs, end="" if isinstance(outs, str) and outs.endswith("\n") else "\n") + print(line_token * 40) + + +ex_file = "ex2.py" +full_points = 15 +points = full_points +python = sys.executable + +solutions_dir = os.path.join("unittest", "solutions") +outputs_dir = os.path.join("unittest", "outputs") + +# Remove previous outputs folder +shutil.rmtree(outputs_dir, ignore_errors=True) + +inputs = sorted(glob(os.path.join("unittest", "unittest_input_*"), recursive=True)) +if not len(inputs): + raise FileNotFoundError("Could not find unittest_input_* files") + +with open(os.path.join(solutions_dir, "counts.pkl"), "rb") as f: + sol_counts = pkl.load(f) + +for test_i, input_folder in enumerate(inputs): + comment = "" + fcall = "" + + with open(os.devnull, "w") as null: + # sys.stdout = null + try: + from ex2 import validate_images + + proper_import = True + except Exception as e: + outs = "" + errs = e + points -= full_points / len(inputs) + proper_import = False + finally: + sys.stdout.flush() + sys.stdout = sys.__stdout__ + + if proper_import: + with open(os.devnull, "w") as null: + # sys.stdout = null + try: + input_basename = os.path.basename(input_folder) + output_dir = os.path.join(outputs_dir, input_basename) + logfilepath = output_dir + ".log" + formatter = "06d" + counts = validate_images(input_dir=input_folder, output_dir=output_dir, log_file=logfilepath, + formatter=formatter) + fcall = f'validate_images(\n\tinput_dir="{input_folder}",\n\toutput_dir="{output_dir}",\n\tlog_file="{logfilepath}",\n\tformatter="{formatter}"\n)' + errs = "" + + try: + with open(os.path.join(outputs_dir, f"{input_basename}.log"), "r") as lfh: + logfile = lfh.read() + except FileNotFoundError: + # two cases: + # 1) no invalid files and thus no log file -> ok -> equal to empty tlogfile + # 2) invalid files but no log file -> not ok -> will fail the comparison with tlogfile (below) + logfile = "" + with open(os.path.join(solutions_dir, f"{input_basename}.log"), "r") as lfh: + # must replace the separator that was used when creating the solution files + tlogfile = lfh.read().replace("\\", os.path.sep) + + files = sorted(glob(os.path.join(outputs_dir, input_basename, "**", "*"), recursive=True)) + hashing_function = hashlib.sha256() + for file in files: + with open(file, "rb") as fh: + hashing_function.update(fh.read()) + hash = hashing_function.digest() + hashing_function = hashlib.sha256() + tfiles = sorted(glob(os.path.join(solutions_dir, input_basename, "**", "*"), recursive=True)) + for file in tfiles: + with open(file, "rb") as fh: + hashing_function.update(fh.read()) + thash = hashing_function.digest() + + tcounts = sol_counts[input_basename] + + if not counts == tcounts: + points -= full_points / len(inputs) + comment = f"Function should return {tcounts} but returned {counts}" + elif not [f.split(os.path.sep)[-2:] for f in files] == [f.split(os.path.sep)[-2:] for f in tfiles]: + points -= full_points / len(inputs) + comment = f"Contents of output directory do not match (see directory 'solutions')" + elif not hash == thash: + points -= full_points / len(inputs) + comment = f"Hash value of the files in the output directory do not match (see directory 'solutions')" + elif not logfile == tlogfile: + points -= full_points / len(inputs) + comment = f"Contents of logfiles do not match (see directory 'solutions')" + + except Exception as e: + outs = "" + errs = e + points -= full_points / len(inputs) + finally: + sys.stdout.flush() + sys.stdout = sys.__stdout__ + + print() + print_outs(f"Test {test_i}", line_token="#") + print("Function call:") + print_outs(fcall) + + if errs: + print(f"Some unexpected errors occurred:") + print_outs(f"{type(errs).__name__}: {errs}") + else: + print("Notes:") + print_outs("No issues found" if comment == "" else comment) + + # due to floating point calculations it could happen that we get -0 here + if points < 0: + assert abs(points) < 1e-7, f"points were {points} < 0: error when subtracting points?" + points = abs(points) + print(f"Current points: {points:.2f}") + +print(f"\nEstimated points upon submission: {points:.2f} (out of {full_points:.2f})") +if points < full_points: + print(f"Check the folder '{outputs_dir}' to see where your errors are") +else: + shutil.rmtree(os.path.join(outputs_dir)) +print(f"This is only an estimate, see 'Instructions for submitting homework' in Moodle " + f"for common mistakes that can still lead to 0 points.")