sisr2onnx / scripts /data_preparation /create_lrs_bicubic_downscale.py
Zarxrax's picture
Upload 823 files
62dbcfb verified
raw
history blame
3.53 kB
import os
from multiprocessing import Pool
from pathlib import Path
import cv2
import numpy as np
from chainner_ext import ResizeFilter, resize
from cv2.typing import MatLike
from PIL import Image
from tqdm import tqdm
DRY_RUN = False # Set this to False when you want to actually copy the files
def split_file_path(path: Path | str) -> tuple[Path, str, str]:
"""
Returns the base directory, file name, and extension of the given file path.
"""
base, ext = os.path.splitext(path)
dirname, basename = os.path.split(base)
return Path(dirname), basename, ext
def _read_cv_from_path(path: str) -> MatLike:
img = None
try:
img = cv2.imdecode(np.fromfile(path, dtype=np.uint8), cv2.IMREAD_COLOR)
except Exception:
pass
if img is None:
try:
img = cv2.imread(str(path), cv2.IMREAD_COLOR)
except Exception as e:
raise RuntimeError(
f'Error reading image image from path "{path}". Image may be corrupt.'
) from e
if img is None: # type: ignore
raise RuntimeError(
f'Error reading image image from path "{path}". Image may be corrupt.'
)
return img
def custom_resize(image: MatLike, size_ratio: float) -> np.ndarray:
h, w = image.shape[:2]
pil_image = Image.fromarray(image)
new_image = np.array(pil_image)
new_image = new_image.astype(np.float32) / 255.0
new_image = resize(
new_image,
(round(w * size_ratio), round(h * size_ratio)),
ResizeFilter.CubicCatrom,
False,
)
new_image = (new_image * 255).astype(np.uint8)
pil_image = Image.fromarray(new_image)
return np.array(pil_image)
def downscale_image(args: tuple[str, str, float]) -> None:
input_path, output_path, size_ratio = args
if not DRY_RUN:
image = _read_cv_from_path(input_path)
image = custom_resize(image, size_ratio)
cv2.imwrite(output_path, image)
else:
print(output_path)
def process_images(
input_directory: str, output_directory: str, size_ratio: float
) -> None:
if not os.path.exists(output_directory):
os.makedirs(output_directory)
image_paths = []
for root, _, files in os.walk(input_directory):
image_files = []
for file in files:
image_files.append(file)
selected_files = image_files
for file in selected_files:
input_image_path = os.path.join(root, file)
output_subdirectory = os.path.relpath(root, input_directory)
output_subdirectory_path = os.path.join(
output_directory, output_subdirectory
)
output_image_path = os.path.join(output_subdirectory_path, file)
if not DRY_RUN and not os.path.exists(output_subdirectory_path):
os.makedirs(output_subdirectory_path)
image_paths.append((input_image_path, output_image_path, size_ratio))
with Pool(processes=8) as pool, tqdm(total=len(image_paths)) as pbar:
for _ in pool.imap_unordered(downscale_image, image_paths):
pbar.update(1)
if __name__ == "__main__":
scale = 4
scale_str = f"x{scale}"
size_ratio = 1 / scale
input_directory = r"C:\path\to\input\dir"
output_directory = r"C:\path\to\output\dir"
process_images(input_directory, output_directory, size_ratio)