|
import os
|
|
from multiprocessing import Pool
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from chainner_ext import ResizeFilter, resize
|
|
from cv2.typing import MatLike
|
|
from PIL import Image
|
|
from tqdm import tqdm
|
|
|
|
DRY_RUN = False
|
|
|
|
|
|
def split_file_path(path: Path | str) -> tuple[Path, str, str]:
|
|
"""
|
|
Returns the base directory, file name, and extension of the given file path.
|
|
"""
|
|
base, ext = os.path.splitext(path)
|
|
dirname, basename = os.path.split(base)
|
|
return Path(dirname), basename, ext
|
|
|
|
|
|
def _read_cv_from_path(path: str) -> MatLike:
|
|
img = None
|
|
try:
|
|
img = cv2.imdecode(np.fromfile(path, dtype=np.uint8), cv2.IMREAD_COLOR)
|
|
except Exception:
|
|
pass
|
|
|
|
if img is None:
|
|
try:
|
|
img = cv2.imread(str(path), cv2.IMREAD_COLOR)
|
|
except Exception as e:
|
|
raise RuntimeError(
|
|
f'Error reading image image from path "{path}". Image may be corrupt.'
|
|
) from e
|
|
|
|
if img is None:
|
|
raise RuntimeError(
|
|
f'Error reading image image from path "{path}". Image may be corrupt.'
|
|
)
|
|
|
|
return img
|
|
|
|
|
|
def custom_resize(image: MatLike, size_ratio: float) -> np.ndarray:
|
|
h, w = image.shape[:2]
|
|
pil_image = Image.fromarray(image)
|
|
|
|
new_image = np.array(pil_image)
|
|
new_image = new_image.astype(np.float32) / 255.0
|
|
new_image = resize(
|
|
new_image,
|
|
(round(w * size_ratio), round(h * size_ratio)),
|
|
ResizeFilter.CubicCatrom,
|
|
False,
|
|
)
|
|
new_image = (new_image * 255).astype(np.uint8)
|
|
|
|
pil_image = Image.fromarray(new_image)
|
|
return np.array(pil_image)
|
|
|
|
|
|
def downscale_image(args: tuple[str, str, float]) -> None:
|
|
input_path, output_path, size_ratio = args
|
|
if not DRY_RUN:
|
|
image = _read_cv_from_path(input_path)
|
|
image = custom_resize(image, size_ratio)
|
|
|
|
cv2.imwrite(output_path, image)
|
|
else:
|
|
print(output_path)
|
|
|
|
|
|
def process_images(
|
|
input_directory: str, output_directory: str, size_ratio: float
|
|
) -> None:
|
|
if not os.path.exists(output_directory):
|
|
os.makedirs(output_directory)
|
|
|
|
image_paths = []
|
|
|
|
for root, _, files in os.walk(input_directory):
|
|
image_files = []
|
|
for file in files:
|
|
image_files.append(file)
|
|
|
|
selected_files = image_files
|
|
|
|
for file in selected_files:
|
|
input_image_path = os.path.join(root, file)
|
|
output_subdirectory = os.path.relpath(root, input_directory)
|
|
output_subdirectory_path = os.path.join(
|
|
output_directory, output_subdirectory
|
|
)
|
|
output_image_path = os.path.join(output_subdirectory_path, file)
|
|
|
|
if not DRY_RUN and not os.path.exists(output_subdirectory_path):
|
|
os.makedirs(output_subdirectory_path)
|
|
|
|
image_paths.append((input_image_path, output_image_path, size_ratio))
|
|
|
|
with Pool(processes=8) as pool, tqdm(total=len(image_paths)) as pbar:
|
|
for _ in pool.imap_unordered(downscale_image, image_paths):
|
|
pbar.update(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
scale = 4
|
|
scale_str = f"x{scale}"
|
|
size_ratio = 1 / scale
|
|
|
|
input_directory = r"C:\path\to\input\dir"
|
|
output_directory = r"C:\path\to\output\dir"
|
|
process_images(input_directory, output_directory, size_ratio)
|
|
|