import aiofiles import asyncio import base64 import cv2 import fitz import glob import io import json import logging import os import pandas as pd import pytz import random import re import requests import shutil import streamlit as st import sys import time import torch import zipfile from audio_recorder_streamlit import audio_recorder from contextlib import redirect_stdout from dataclasses import dataclass from datetime import datetime from diffusers import StableDiffusionPipeline from io import BytesIO from moviepy.editor import VideoFileClip from openai import OpenAI from PIL import Image from PyPDF2 import PdfReader from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel from typing import Optional # Initialize OpenAI client client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) # Logging setup logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) log_records = [] class LogCaptureHandler(logging.Handler): def emit(self, record): log_records.append(record) logger.addHandler(LogCaptureHandler()) # Streamlit configuration st.set_page_config( page_title="AI Multimodal Titan πŸš€", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded", menu_items={ 'Get Help': 'https://huggingface.co/awacke1', 'Report a Bug': 'https://huggingface.co/spaces/awacke1', 'About': "AI Multimodal Titan: PDFs, OCR, Image Gen, Audio/Video Processing, Code Execution, and More! 🌌" } ) # Session state initialization for key in ['history', 'builder', 'model_loaded', 'processing', 'asset_checkboxes', 'downloaded_pdfs', 'unique_counter', 'messages']: st.session_state.setdefault(key, [] if key in ['history', 'messages'] else {} if key in ['asset_checkboxes', 'downloaded_pdfs', 'processing'] else None if key == 'builder' else 0 if key == 'unique_counter' else False) st.session_state.setdefault('selected_model_type', "Causal LM") st.session_state.setdefault('selected_model', "None") st.session_state.setdefault('gallery_size', 2) st.session_state.setdefault('asset_gallery_container', st.sidebar.empty()) @dataclass class ModelConfig: name: str base_model: str size: str domain: Optional[str] = None model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str size: str domain: Optional[str] = None @property def model_path(self): return f"diffusion_models/{self.name}" class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None def load_model(self, model_path: str, config: Optional[ModelConfig] = None): with st.spinner(f"Loading {model_path}... ⏳"): self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token if config: self.config = config self.model.to("cuda" if torch.cuda.is_available() else "cpu") st.success(f"Model loaded! πŸŽ‰") return self def save_model(self, path: str): with st.spinner("Saving model... πŸ’Ύ"): os.makedirs(os.path.dirname(path), exist_ok=True) self.model.save_pretrained(path) self.tokenizer.save_pretrained(path) st.success(f"Model saved at {path}! βœ…") class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): with st.spinner(f"Loading diffusion model {model_path}... ⏳"): self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") if config: self.config = config st.success("Diffusion model loaded! 🎨") return self def save_model(self, path: str): with st.spinner("Saving diffusion model... πŸ’Ύ"): os.makedirs(os.path.dirname(path), exist_ok=True) self.pipeline.save_pretrained(path) st.success(f"Diffusion model saved at {path}! βœ…") def generate(self, prompt: str): return self.pipeline(prompt, num_inference_steps=20).images[0] def generate_filename(prompt, ext="png"): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") safe_prompt = re.sub(r'[<>:"/\\|?*]', '_', prompt)[:240] return f"{safe_date_time}_{safe_prompt}.{ext}" def get_download_link(file_path, mime_type="application/pdf", label="Download"): with open(file_path, "rb") as f: data = base64.b64encode(f.read()).decode() return f'{label}' def zip_directory(directory_path, zip_path): with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(directory_path): for file in files: zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.dirname(directory_path))) def get_gallery_files(file_types=["png", "pdf", "md", "wav", "mp4"]): return sorted(list({f for ext in file_types for f in glob.glob(f"*.{ext}")})) def download_pdf(url, output_path): try: response = requests.get(url, stream=True, timeout=10) if response.status_code == 200: with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except requests.RequestException as e: logger.error(f"Failed to download {url}: {e}") return False async def process_pdf_snapshot(pdf_path, mode="single"): start_time = time.time() status = st.empty() status.text(f"Processing PDF Snapshot ({mode})... (0s)") try: doc = fitz.open(pdf_path) output_files = [] if mode == "single": page = doc[0] pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) output_file = generate_filename("single", "png") pix.save(output_file) output_files.append(output_file) elif mode == "double": if len(doc) >= 2: pix1 = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) pix2 = doc[1].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) img1 = Image.frombytes("RGB", [pix1.width, pix1.height], pix1.samples) img2 = Image.frombytes("RGB", [pix2.width, pix2.height], pix2.samples) combined_img = Image.new("RGB", (pix1.width + pix2.width, max(pix1.height, pix2.height))) combined_img.paste(img1, (0, 0)) combined_img.paste(img2, (pix1.width, 0)) output_file = generate_filename("double", "png") combined_img.save(output_file) output_files.append(output_file) elif mode == "allpages": for i in range(len(doc)): page = doc[i] pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) output_file = generate_filename(f"page_{i}", "png") pix.save(output_file) output_files.append(output_file) doc.close() elapsed = int(time.time() - start_time) status.text(f"PDF Snapshot ({mode}) completed in {elapsed}s!") return output_files except Exception as e: status.error(f"Failed to process PDF: {str(e)}") return [] async def process_ocr(image, output_file): start_time = time.time() status = st.empty() status.text("Processing GOT-OCR2_0... (0s)") tokenizer = AutoTokenizer.from_pretrained("ucaslcl/GOT-OCR2_0", trust_remote_code=True) model = AutoModel.from_pretrained("ucaslcl/GOT-OCR2_0", trust_remote_code=True, torch_dtype=torch.float32).to("cpu").eval() temp_file = generate_filename("temp", "png") image.save(temp_file) result = model.chat(tokenizer, temp_file, ocr_type='ocr') os.remove(temp_file) elapsed = int(time.time() - start_time) status.text(f"GOT-OCR2_0 completed in {elapsed}s!") async with aiofiles.open(output_file, "w") as f: await f.write(result) return result async def process_image_gen(prompt, output_file): start_time = time.time() status = st.empty() status.text("Processing Image Gen... (0s)") pipeline = st.session_state['builder'].pipeline if st.session_state.get('builder') and isinstance(st.session_state['builder'], DiffusionBuilder) else StableDiffusionPipeline.from_pretrained("OFA-Sys/small-stable-diffusion-v0", torch_dtype=torch.float32).to("cpu") gen_image = pipeline(prompt, num_inference_steps=20).images[0] elapsed = int(time.time() - start_time) status.text(f"Image Gen completed in {elapsed}s!") gen_image.save(output_file) return gen_image def process_image_with_prompt(image, prompt, model="gpt-4o-mini", detail="auto"): buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") messages = [{"role": "user", "content": [{"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_str}", "detail": detail}}]}] try: response = client.chat.completions.create(model=model, messages=messages, max_tokens=300) return response.choices[0].message.content except Exception as e: return f"Error processing image with GPT: {str(e)}" def process_text_with_prompt(text, prompt, model="gpt-4o-mini"): messages = [{"role": "user", "content": f"{prompt}\n\n{text}"}] try: response = client.chat.completions.create(model=model, messages=messages, max_tokens=300) return response.choices[0].message.content except Exception as e: return f"Error processing text with GPT: {str(e)}" def process_audio(audio_input, prompt): with open(audio_input, "rb") as file: transcription = client.audio.transcriptions.create(model="whisper-1", file=file) response = client.chat.completions.create(model="gpt-4o-mini", messages=[{"role": "user", "content": f"{prompt}\n\n{transcription.text}"}]) return transcription.text, response.choices[0].message.content def process_video(video_path, prompt): base64Frames, audio_path = process_video_frames(video_path) with open(video_path, "rb") as file: transcription = client.audio.transcriptions.create(model="whisper-1", file=file) messages = [{"role": "user", "content": ["These are the frames from the video.", *map(lambda x: {"type": "image_url", "image_url": {"url": f'data:image/jpg;base64,{x}', "detail": "low"}}, base64Frames), {"type": "text", "text": f"The audio transcription is: {transcription.text}\n\n{prompt}"}]}] response = client.chat.completions.create(model="gpt-4o-mini", messages=messages) return response.choices[0].message.content def process_video_frames(video_path, seconds_per_frame=2): base64Frames = [] base_video_path, _ = os.path.splitext(video_path) video = cv2.VideoCapture(video_path) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) fps = video.get(cv2.CAP_PROP_FPS) frames_to_skip = int(fps * seconds_per_frame) curr_frame = 0 while curr_frame < total_frames - 1: video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) success, frame = video.read() if not success: break _, buffer = cv2.imencode(".jpg", frame) base64Frames.append(base64.b64encode(buffer).decode("utf-8")) curr_frame += frames_to_skip video.release() audio_path = f"{base_video_path}.mp3" try: clip = VideoFileClip(video_path) clip.audio.write_audiofile(audio_path, bitrate="32k") clip.audio.close() clip.close() except: logger.info("No audio track found in video.") return base64Frames, audio_path def execute_code(code): buffer = io.StringIO() try: with redirect_stdout(buffer): exec(code, {}, {}) return buffer.getvalue(), None except Exception as e: return None, str(e) finally: buffer.close() # Sidebar st.sidebar.subheader("Gallery Settings") st.session_state['gallery_size'] = st.sidebar.slider("Gallery Size", 1, 10, st.session_state['gallery_size'], key="gallery_size_slider") # Tabs tabs = st.tabs(["Camera πŸ“·", "Download πŸ“₯", "OCR πŸ”", "Build 🌱", "Image Gen 🎨", "PDF πŸ“„", "Image πŸ–ΌοΈ", "Audio 🎡", "Video πŸŽ₯", "Code πŸ§‘β€πŸ’»", "Gallery πŸ“š"]) (tab_camera, tab_download, tab_ocr, tab_build, tab_imggen, tab_pdf, tab_image, tab_audio, tab_video, tab_code, tab_gallery) = tabs with tab_camera: st.header("Camera Snap πŸ“·") cols = st.columns(2) for i, cam_key in enumerate(["cam0", "cam1"]): with cols[i]: cam_img = st.camera_input(f"Take a picture - Cam {i}", key=cam_key) if cam_img: filename = generate_filename(f"cam{i}") with open(filename, "wb") as f: f.write(cam_img.getvalue()) st.session_state[f'cam{i}_file'] = filename st.session_state['history'].append(f"Snapshot from Cam {i}: {filename}") st.image(Image.open(filename), caption=f"Camera {i}", use_container_width=True) with tab_download: st.header("Download PDFs πŸ“₯") url_input = st.text_area("Enter PDF URLs (one per line)", height=200) if st.button("Download πŸ€–"): urls = url_input.strip().split("\n") progress_bar = st.progress(0) for idx, url in enumerate(urls): if url: output_path = generate_filename(url, "pdf") if download_pdf(url, output_path): st.session_state['downloaded_pdfs'][url] = output_path st.session_state['history'].append(f"Downloaded PDF: {output_path}") st.session_state['asset_checkboxes'][output_path] = True progress_bar.progress((idx + 1) / len(urls)) with tab_ocr: st.header("Test OCR πŸ”") all_files = get_gallery_files() if all_files: selected_file = st.selectbox("Select File", all_files, key="ocr_select") if selected_file and st.button("Run OCR πŸš€"): if selected_file.endswith('.png'): image = Image.open(selected_file) else: doc = fitz.open(selected_file) pix = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) doc.close() output_file = generate_filename("ocr_output", "txt") result = asyncio.run(process_ocr(image, output_file)) st.text_area("OCR Result", result, height=200) st.session_state['history'].append(f"OCR Test: {selected_file} -> {output_file}") with tab_build: st.header("Build Titan 🌱") model_type = st.selectbox("Model Type", ["Causal LM", "Diffusion"], key="build_type") base_model = st.selectbox("Select Model", ["HuggingFaceTB/SmolLM-135M", "Qwen/Qwen1.5-0.5B-Chat"] if model_type == "Causal LM" else ["OFA-Sys/small-stable-diffusion-v0", "stabilityai/stable-diffusion-2-base"]) model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}") if st.button("Download Model ⬇️"): config = (ModelConfig if model_type == "Causal LM" else DiffusionConfig)(name=model_name, base_model=base_model, size="small") builder = ModelBuilder() if model_type == "Causal LM" else DiffusionBuilder() builder.load_model(base_model, config) builder.save_model(config.model_path) st.session_state['builder'] = builder st.session_state['model_loaded'] = True with tab_imggen: st.header("Test Image Gen 🎨") prompt = st.text_area("Prompt", "Generate a futuristic cityscape") if st.button("Run Image Gen πŸš€"): output_file = generate_filename("gen_output", "png") result = asyncio.run(process_image_gen(prompt, output_file)) st.image(result, caption="Generated Image", use_container_width=True) st.session_state['history'].append(f"Image Gen Test: {prompt} -> {output_file}") with tab_pdf: st.header("PDF Process πŸ“„") uploaded_pdfs = st.file_uploader("Upload PDFs", type=["pdf"], accept_multiple_files=True) view_mode = st.selectbox("View Mode", ["Single Page", "Double Page"], key="pdf_view_mode") if st.button("Process PDFs"): for pdf_file in uploaded_pdfs: pdf_path = generate_filename(pdf_file.name, "pdf") with open(pdf_path, "wb") as f: f.write(pdf_file.read()) snapshots = asyncio.run(process_pdf_snapshot(pdf_path, "double" if view_mode == "Double Page" else "single")) for snapshot in snapshots: st.image(Image.open(snapshot), caption=snapshot) text = process_image_with_prompt(Image.open(snapshot), "Extract the electronic text from image") st.text_area(f"Extracted Text from {snapshot}", text) code_prompt = f"Generate Python code based on this text:\n\n{text}" code = process_text_with_prompt(text, code_prompt) st.code(code, language="python") if st.button(f"Execute Code from {snapshot}"): output, error = execute_code(code) if error: st.error(f"Error: {error}") else: st.success(f"Output: {output or 'No output'}") with tab_image: st.header("Image Process πŸ–ΌοΈ") uploaded_images = st.file_uploader("Upload Images", type=["png", "jpg"], accept_multiple_files=True) prompt = st.text_input("Prompt", "Extract the electronic text from image") if st.button("Process Images"): for img_file in uploaded_images: img = Image.open(img_file) st.image(img, caption=img_file.name) result = process_image_with_prompt(img, prompt) st.text_area(f"Result for {img_file.name}", result) with tab_audio: st.header("Audio Process 🎡") audio_bytes = audio_recorder() if audio_bytes: filename = generate_filename("recording", "wav") with open(filename, "wb") as f: f.write(audio_bytes) st.audio(filename) transcript, summary = process_audio(filename, "Summarize this audio in markdown") st.text_area("Transcript", transcript) st.markdown(summary) with tab_video: st.header("Video Process πŸŽ₯") video_input = st.file_uploader("Upload Video", type=["mp4"]) if video_input: video_path = generate_filename(video_input.name, "mp4") with open(video_path, "wb") as f: f.write(video_input.read()) st.video(video_path) result = process_video(video_path, "Summarize this video in markdown") st.markdown(result) with tab_code: st.header("Code Executor πŸ§‘β€πŸ’»") code_input = st.text_area("Python Code", height=400) if st.button("Run Code"): output, error = execute_code(code_input) if error: st.error(f"Error: {error}") else: st.success(f"Output: {output or 'No output'}") with tab_gallery: st.header("Gallery πŸ“š") all_files = get_gallery_files() for file in all_files: if file.endswith('.png'): st.image(Image.open(file), caption=file) elif file.endswith('.pdf'): doc = fitz.open(file) pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)) st.image(Image.frombytes("RGB", [pix.width, pix.height], pix.samples), caption=file) doc.close() elif file.endswith('.md'): with open(file, "r") as f: st.markdown(f.read()) elif file.endswith('.wav'): st.audio(file) elif file.endswith('.mp4'): st.video(file) # Update gallery in sidebar def update_gallery(): container = st.session_state['asset_gallery_container'] container.empty() all_files = get_gallery_files() if all_files: container.markdown("### Asset Gallery πŸ“ΈπŸ“–") cols = container.columns(2) for idx, file in enumerate(all_files[:st.session_state['gallery_size']]): with cols[idx % 2]: if file.endswith('.png'): st.image(Image.open(file), caption=os.path.basename(file)) elif file.endswith('.pdf'): doc = fitz.open(file) pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)) st.image(Image.frombytes("RGB", [pix.width, pix.height], pix.samples), caption=os.path.basename(file)) doc.close() st.checkbox("Select", key=f"asset_{file}", value=st.session_state['asset_checkboxes'].get(file, False)) st.markdown(get_download_link(file, "application/octet-stream", "Download"), unsafe_allow_html=True) if st.button("Delete", key=f"delete_{file}"): os.remove(file) st.session_state['asset_checkboxes'].pop(file, None) st.experimental_rerun() update_gallery() # Sidebar logs and history st.sidebar.subheader("Action Logs πŸ“œ") for record in log_records: st.sidebar.write(f"{record.asctime} - {record.levelname} - {record.message}") st.sidebar.subheader("History πŸ“œ") for entry in st.session_state.get("history", []): if entry: st.sidebar.write(entry)