|
from transformers import AutoTokenizer, AutoModel, AutoImageProcessor |
|
from sentence_transformers import SentenceTransformer |
|
import torch |
|
import torch.nn.functional as F |
|
from PIL import Image |
|
import requests |
|
import os |
|
import json |
|
import math |
|
import re |
|
import pandas as pd |
|
import numpy as np |
|
from omeka_s_api_client import OmekaSClient,OmekaSClientError |
|
from typing import List, Dict, Any, Union |
|
import io |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv(os.path.join(os.getcwd(), ".env")) |
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
|
|
|
processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5") |
|
vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True) |
|
|
|
|
|
text_model = SentenceTransformer("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True, token=HF_TOKEN) |
|
|
|
def image_url_to_pil(url: str, max_size=(512, 512)) -> Image: |
|
""" |
|
Ex usage : image_blobs = df["image_url"].apply(image_url_to_pil).tolist() |
|
""" |
|
response = requests.get(url, stream=True, timeout=5) |
|
response.raise_for_status() |
|
image = Image.open(io.BytesIO(response.content)).convert("RGB") |
|
image.thumbnail(max_size, Image.Resampling.LANCZOS) |
|
return image |
|
|
|
def generate_img_embed(images_urls, batch_size=20): |
|
"""Generate image embeddings in batches to manage memory usage. |
|
|
|
Args: |
|
images_urls (list): List of image URLs |
|
batch_size (int): Number of images to process at once |
|
""" |
|
all_embeddings = [] |
|
|
|
for i in range(0, len(images_urls), batch_size): |
|
batch_urls = images_urls[i:i + batch_size] |
|
images = [image_url_to_pil(image_url) for image_url in batch_urls] |
|
inputs = processor(images, return_tensors="pt") |
|
img_emb = vision_model(**inputs).last_hidden_state |
|
img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1) |
|
all_embeddings.append(img_embeddings.detach().numpy()) |
|
|
|
return np.vstack(all_embeddings) |
|
|
|
def generate_text_embed(sentences: List, batch_size=64): |
|
"""Generate text embeddings in batches to manage memory usage. |
|
|
|
Args: |
|
sentences (List): List of text strings to encode |
|
batch_size (int): Number of sentences to process at once |
|
""" |
|
all_embeddings = [] |
|
|
|
for i in range(0, len(sentences), batch_size): |
|
batch_sentences = sentences[i:i + batch_size] |
|
embeddings = text_model.encode(batch_sentences) |
|
all_embeddings.append(embeddings) |
|
|
|
return np.vstack(all_embeddings) |
|
|
|
def add_concatenated_text_field_exclude_keys(item_dict, keys_to_exclude=None, text_field_key="text", pair_separator=" - "): |
|
if not isinstance(item_dict, dict): |
|
raise TypeError("Input must be a dictionary.") |
|
if keys_to_exclude is None: |
|
keys_to_exclude = set() |
|
else: |
|
keys_to_exclude = set(keys_to_exclude) |
|
|
|
|
|
keys_to_exclude.add(text_field_key) |
|
|
|
formatted_pairs = [] |
|
for key, value in item_dict.items(): |
|
|
|
if key in keys_to_exclude: |
|
continue |
|
|
|
|
|
is_empty_or_invalid = False |
|
if value is None: is_empty_or_invalid = True |
|
elif isinstance(value, float) and math.isnan(value): is_empty_or_invalid = True |
|
elif isinstance(value, (str, list, tuple, dict)) and len(value) == 0: is_empty_or_invalid = True |
|
|
|
|
|
if not is_empty_or_invalid: |
|
formatted_pairs.append(f"{str(key)}: {str(value)}") |
|
|
|
concatenated_text = f"search_document: {pair_separator.join(formatted_pairs)}" |
|
item_dict[text_field_key] = concatenated_text |
|
return item_dict |
|
|
|
def prepare_df_atlas(df: pd.DataFrame, id_col='id', images_col='images_urls'): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if id_col not in df.columns: |
|
df[id_col] = [f'{i}' for i in range(len(df))] |
|
|
|
|
|
|
|
|
|
|
|
|
|
df[images_col] = df[images_col].apply(lambda x: x[0] if isinstance(x, list) else x) |
|
|
|
|
|
for col in df.columns: |
|
df[col] = df[col].astype(str) |
|
|
|
return df |
|
|
|
def remove_key_value_from_dict(list_of_dict, key_to_remove): |
|
new_list = [] |
|
for dictionary in list_of_dict: |
|
new_dict = dictionary.copy() |
|
if key_to_remove in new_dict: |
|
del new_dict[key_to_remove] |
|
new_list.append(new_dict) |
|
return new_list |
|
|
|
def remove_key_value_from_dict(input_dict, key_to_remove='text'): |
|
if not isinstance(input_dict, dict): |
|
raise TypeError("Input must be a dictionary.") |
|
|
|
if key_to_remove in input_dict: |
|
del input_dict[key_to_remove] |
|
|
|
return input_dict |