203 lines
4.9 KiB
Python
203 lines
4.9 KiB
Python
"""
|
|
Author: Lukas Heiligenbrunner
|
|
Matr.Nr.: K12104785
|
|
Exercise 2
|
|
"""
|
|
|
|
import os.path
|
|
from enum import Enum
|
|
from glob import glob
|
|
from typing import TextIO
|
|
import hashlib
|
|
|
|
import PIL
|
|
from PIL import Image, ImageStat
|
|
|
|
|
|
def mkdirIfNotExists(path: str):
|
|
if not os.path.isdir(path):
|
|
print(f"creating folder: {path}")
|
|
os.makedirs(path)
|
|
|
|
|
|
def validSuffix(file: str) -> bool:
|
|
return file.endswith((".jpg", ".jpeg", ".JPG", ".JPEG"))
|
|
|
|
|
|
def validFileSize(file: str) -> bool:
|
|
return os.path.getsize(file) < 250_000
|
|
|
|
|
|
class ErrorCodes(Enum):
|
|
Extension = 1
|
|
FileSize = 2
|
|
InvalidImage = 3
|
|
ImageShape = 4
|
|
VarianceLZero = 5
|
|
Duplicate = 6
|
|
|
|
|
|
def log(filename: str, errorCode: ErrorCodes):
|
|
print(f"[{filename}] :: {errorCode}")
|
|
|
|
|
|
class Singleton(type):
|
|
_instances = {}
|
|
|
|
def __call__(cls, *args, **kwargs):
|
|
if cls not in cls._instances:
|
|
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
|
return cls._instances[cls]
|
|
|
|
|
|
class FileLogger(metaclass=Singleton):
|
|
file: TextIO or None = None
|
|
|
|
def init(self, log_file_path: str):
|
|
self.file = open(log_file_path, 'a')
|
|
|
|
def close(self):
|
|
self.file.close()
|
|
|
|
def log(self, filename: str, errorcode: ErrorCodes):
|
|
# enforce a CRLF line ending with \r\n !!
|
|
self.file.write(f"{os.path.basename(filename)};{errorcode.value}\r\n")
|
|
|
|
|
|
def openImage(path: str) -> PIL.Image.Image or None:
|
|
try:
|
|
return Image.open(path)
|
|
except (FileNotFoundError, PIL.UnidentifiedImageError, ValueError, TypeError) as e:
|
|
return None
|
|
|
|
|
|
def validImageShape(img: PIL.Image.Image) -> bool:
|
|
# check read mode of channels
|
|
if img.mode != "RGB":
|
|
return False
|
|
|
|
# check if nr and order of channels is correct
|
|
if img.getbands() != ('R', 'G', 'B'):
|
|
return False
|
|
|
|
# check if file format is jepeg
|
|
if img.format != "JPEG":
|
|
return False
|
|
|
|
# check if height and length is > 96
|
|
if img.height <= 96 or img.width <= 96:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def validVariance(img: PIL.Image.Image) -> bool:
|
|
variances = ImageStat.Stat(img).var
|
|
# variance cannot be negative!
|
|
return min(variances) > .0
|
|
|
|
|
|
class FileCopy(metaclass=Singleton):
|
|
format: str
|
|
out_dir: str
|
|
|
|
hashmap: [str]
|
|
idx: int
|
|
|
|
def init(self, out_dir: str, format: str):
|
|
self.format = format
|
|
self.out_dir = out_dir
|
|
self.idx = 1
|
|
self.hashmap = []
|
|
|
|
def copyFile(self, img: PIL.Image.Image) -> bool:
|
|
hexhash = hashlib.md5(img.__array__()).hexdigest()
|
|
if hexhash in self.hashmap:
|
|
return False
|
|
# element already copied
|
|
else:
|
|
self.hashmap.append(hexhash)
|
|
# apply format string
|
|
filename = f"%{self.format}" % self.idx
|
|
self.idx += 1
|
|
# copy file
|
|
im = PIL.Image.fromarray(img.__array__())
|
|
im.save(os.path.join(self.out_dir, f"{filename}.jpg"), format="jpeg")
|
|
im.close()
|
|
return True
|
|
|
|
|
|
def processFile(f: str) -> bool:
|
|
# check suffix
|
|
if not validSuffix(f):
|
|
log(f, ErrorCodes.Extension)
|
|
FileLogger().log(f, ErrorCodes.Extension)
|
|
return False
|
|
|
|
# check file size
|
|
if not validFileSize(f):
|
|
log(f, ErrorCodes.FileSize)
|
|
FileLogger().log(f, ErrorCodes.FileSize)
|
|
return False
|
|
|
|
img = openImage(f)
|
|
if img is None:
|
|
log(f, ErrorCodes.InvalidImage)
|
|
FileLogger().log(f, ErrorCodes.InvalidImage)
|
|
return False
|
|
|
|
if not validImageShape(img):
|
|
log(f, ErrorCodes.ImageShape)
|
|
FileLogger().log(f, ErrorCodes.ImageShape)
|
|
return False
|
|
|
|
if not validVariance(img):
|
|
log(f, ErrorCodes.VarianceLZero)
|
|
FileLogger().log(f, ErrorCodes.VarianceLZero)
|
|
return False
|
|
|
|
# lets copy the image data
|
|
if not FileCopy().copyFile(img):
|
|
log(f, ErrorCodes.Duplicate)
|
|
FileLogger().log(f, ErrorCodes.Duplicate)
|
|
return False
|
|
|
|
# ok nice if we are here we have successfully copied the img (:
|
|
|
|
# close image
|
|
img.close()
|
|
return True
|
|
|
|
|
|
def validate_images(input_dir: str, output_dir: str, log_file: str, formatter: str = "05d") -> int:
|
|
# check if out_put dir exists
|
|
mkdirIfNotExists(output_dir)
|
|
|
|
# check if logfile dir exists
|
|
logdir = os.path.dirname(log_file)
|
|
mkdirIfNotExists(logdir)
|
|
|
|
# init file logger
|
|
FileLogger().init(log_file)
|
|
FileCopy().init(output_dir, formatter)
|
|
|
|
# scan input dir for files
|
|
files = glob(input_dir + '/**/*.*', recursive=True)
|
|
|
|
# sort filenames
|
|
files.sort()
|
|
|
|
succcounter = 0
|
|
for f in files:
|
|
if processFile(f):
|
|
succcounter += 1
|
|
|
|
FileLogger().close()
|
|
return succcounter
|
|
|
|
|
|
# Press the green button in the gutter to run the script.
|
|
if __name__ == '__main__':
|
|
print(validate_images("unittest/unittest_input_0", "unittest/outputs/unittest_input_0",
|
|
"unittest/outputs/unittest_input_0.log", "06d"))
|