File size: 3,526 Bytes
62dbcfb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
from multiprocessing import Pool
from pathlib import Path

import cv2
import numpy as np
from chainner_ext import ResizeFilter, resize
from cv2.typing import MatLike
from PIL import Image
from tqdm import tqdm

DRY_RUN = False  # Set this to False when you want to actually copy the files


def split_file_path(path: Path | str) -> tuple[Path, str, str]:
    """

    Returns the base directory, file name, and extension of the given file path.

    """
    base, ext = os.path.splitext(path)
    dirname, basename = os.path.split(base)
    return Path(dirname), basename, ext


def _read_cv_from_path(path: str) -> MatLike:
    img = None
    try:
        img = cv2.imdecode(np.fromfile(path, dtype=np.uint8), cv2.IMREAD_COLOR)
    except Exception:
        pass

    if img is None:
        try:
            img = cv2.imread(str(path), cv2.IMREAD_COLOR)
        except Exception as e:
            raise RuntimeError(
                f'Error reading image image from path "{path}". Image may be corrupt.'
            ) from e

    if img is None:  # type: ignore
        raise RuntimeError(
            f'Error reading image image from path "{path}". Image may be corrupt.'
        )

    return img


def custom_resize(image: MatLike, size_ratio: float) -> np.ndarray:
    h, w = image.shape[:2]
    pil_image = Image.fromarray(image)

    new_image = np.array(pil_image)
    new_image = new_image.astype(np.float32) / 255.0
    new_image = resize(
        new_image,
        (round(w * size_ratio), round(h * size_ratio)),
        ResizeFilter.CubicCatrom,
        False,
    )
    new_image = (new_image * 255).astype(np.uint8)

    pil_image = Image.fromarray(new_image)
    return np.array(pil_image)


def downscale_image(args: tuple[str, str, float]) -> None:
    input_path, output_path, size_ratio = args
    if not DRY_RUN:
        image = _read_cv_from_path(input_path)
        image = custom_resize(image, size_ratio)

        cv2.imwrite(output_path, image)
    else:
        print(output_path)


def process_images(

    input_directory: str, output_directory: str, size_ratio: float

) -> None:
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    image_paths = []

    for root, _, files in os.walk(input_directory):
        image_files = []
        for file in files:
            image_files.append(file)

        selected_files = image_files

        for file in selected_files:
            input_image_path = os.path.join(root, file)
            output_subdirectory = os.path.relpath(root, input_directory)
            output_subdirectory_path = os.path.join(
                output_directory, output_subdirectory
            )
            output_image_path = os.path.join(output_subdirectory_path, file)

            if not DRY_RUN and not os.path.exists(output_subdirectory_path):
                os.makedirs(output_subdirectory_path)

            image_paths.append((input_image_path, output_image_path, size_ratio))

    with Pool(processes=8) as pool, tqdm(total=len(image_paths)) as pbar:
        for _ in pool.imap_unordered(downscale_image, image_paths):
            pbar.update(1)


if __name__ == "__main__":
    scale = 4
    scale_str = f"x{scale}"
    size_ratio = 1 / scale

    input_directory = r"C:\path\to\input\dir"
    output_directory = r"C:\path\to\output\dir"
    process_images(input_directory, output_directory, size_ratio)