|
from typing import Dict, List, Optional, Union |
|
from transformers.models.qwen2_vl.image_processing_qwen2_vl import Qwen2VLImageProcessor, smart_resize |
|
from transformers.image_transforms import ( |
|
convert_to_rgb, |
|
resize, |
|
to_channel_dimension_format, |
|
) |
|
from transformers.image_utils import ( |
|
OPENAI_CLIP_MEAN, |
|
OPENAI_CLIP_STD, |
|
ChannelDimension, |
|
ImageInput, |
|
PILImageResampling, |
|
VideoInput, |
|
get_image_size, |
|
infer_channel_dimension_format, |
|
is_scaled_image, |
|
make_batched_videos, |
|
make_flat_list_of_images, |
|
make_list_of_images, |
|
to_numpy_array, |
|
valid_images, |
|
validate_preprocess_arguments,) |
|
|
|
from transformers.utils import TensorType, logging |
|
import numpy as np |
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
class Qwen2VLImageProcessorExport(Qwen2VLImageProcessor): |
|
|
|
|
|
def _preprocess( |
|
self, |
|
images: Union[ImageInput, VideoInput], |
|
do_resize: bool = None, |
|
resample: PILImageResampling = None, |
|
do_rescale: bool = None, |
|
rescale_factor: float = None, |
|
do_normalize: bool = None, |
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
image_std: Optional[Union[float, List[float]]] = None, |
|
do_convert_rgb: bool = None, |
|
data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, |
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
): |
|
""" |
|
Preprocess an image or batch of images. Copy of the `preprocess` method from `CLIPImageProcessor`. |
|
|
|
Args: |
|
images (`ImageInput`): |
|
Image or batch of images to preprocess. Expects pixel values ranging from 0 to 255. If pixel values range from 0 to 1, set `do_rescale=False`. |
|
vision_info (`List[Dict]`, *optional*): |
|
Optional list of dictionaries containing additional information about vision inputs. |
|
do_resize (`bool`, *optional*, defaults to `self.do_resize`): |
|
Whether to resize the image. |
|
resample (`PILImageResampling`, *optional*, defaults to `self.resample`): |
|
Resampling filter to use if resizing the image. This can be one of the `PILImageResampling` enums. |
|
do_rescale (`bool`, *optional*, defaults to `self.do_rescale`): |
|
Whether to rescale the image. |
|
rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`): |
|
Scale factor to use if rescaling the image. |
|
do_normalize (`bool`, *optional*, defaults to `self.do_normalize`): |
|
Whether to normalize the image. |
|
image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`): |
|
Mean to use if normalizing the image. Can be a float or a list of floats corresponding to the number of channels in the image. |
|
image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`): |
|
Standard deviation to use if normalizing the image. Can be a float or a list of floats corresponding to the number of channels in the image. |
|
do_convert_rgb (`bool`, *optional*, defaults to `self.do_convert_rgb`): |
|
Whether to convert the image to RGB. |
|
data_format (`ChannelDimension`, *optional*, defaults to `ChannelDimension.FIRST`): |
|
The channel dimension format for the output image. Can be one of: |
|
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. |
|
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. |
|
- Unset: Use the channel dimension format of the input image. |
|
input_data_format (`ChannelDimension` or `str`, *optional*): |
|
The channel dimension format for the input image. Can be one of: |
|
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. |
|
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. |
|
- `"none"` or `ChannelDimension.NONE`: image in (height, width) format. - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. |
|
""" |
|
images = make_list_of_images(images) |
|
|
|
if do_convert_rgb: |
|
images = [convert_to_rgb(image) for image in images] |
|
|
|
|
|
images = [to_numpy_array(image) for image in images] |
|
|
|
if do_rescale and is_scaled_image(images[0]): |
|
logger.warning_once( |
|
"It looks like you are trying to rescale already rescaled images. If the input" |
|
" images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again." |
|
) |
|
if input_data_format is None: |
|
|
|
input_data_format = infer_channel_dimension_format(images[0]) |
|
|
|
height, width = get_image_size(images[0], channel_dim=input_data_format) |
|
resized_height, resized_width = height, width |
|
processed_images = [] |
|
for image in images: |
|
if do_resize: |
|
resized_height, resized_width = smart_resize( |
|
height, |
|
width, |
|
factor=self.patch_size * self.merge_size, |
|
min_pixels=self.min_pixels, |
|
max_pixels=self.max_pixels, |
|
) |
|
image = resize( |
|
image, size=(resized_height, resized_width), resample=resample, input_data_format=input_data_format |
|
) |
|
|
|
if do_rescale: |
|
image = self.rescale(image, scale=rescale_factor, input_data_format=input_data_format) |
|
|
|
if do_normalize: |
|
image = self.normalize( |
|
image=image, mean=image_mean, std=image_std, input_data_format=input_data_format |
|
) |
|
|
|
image = to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format) |
|
processed_images.append(image) |
|
|
|
patches = np.array(processed_images) |
|
if data_format == ChannelDimension.LAST: |
|
patches = patches.transpose(0, 3, 1, 2) |
|
if patches.shape[0] % self.temporal_patch_size != 0: |
|
repeats = np.repeat(patches[-1][np.newaxis], self.temporal_patch_size - 1, axis=0) |
|
patches = np.concatenate([patches, repeats], axis=0) |
|
channel = patches.shape[1] |
|
grid_t = patches.shape[0] // self.temporal_patch_size |
|
grid_h, grid_w = resized_height // self.patch_size, resized_width // self.patch_size |
|
patches = patches.reshape( |
|
grid_t, |
|
self.temporal_patch_size, |
|
channel, |
|
grid_h // self.merge_size, |
|
self.merge_size, |
|
self.patch_size, |
|
grid_w // self.merge_size, |
|
self.merge_size, |
|
self.patch_size, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
patches = patches.transpose(0, 3, 6, 4, 7, 1,5,8, 2) |
|
flatten_patches = patches.reshape( |
|
grid_t, grid_h * grid_w, self.temporal_patch_size * self.patch_size * self.patch_size, channel |
|
) |
|
return flatten_patches, (grid_t, grid_h, grid_w) |