abdull4h's picture
Update app.py
d04e4d9 verified
raw
history blame
12.7 kB
# Import necessary libraries
import gradio as gr
import time
import logging
import os
import re
from datetime import datetime
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer, util
import faiss
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import PyPDF2
import io
import spaces # Added for @spaces.GPU decorator
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('Vision2030Assistant')
# Check for GPU availability
has_gpu = torch.cuda.is_available()
logger.info(f"GPU available: {has_gpu}")
# Define the Vision2030Assistant class
class Vision2030Assistant:
def __init__(self):
"""Initialize the Vision 2030 Assistant with models, knowledge base, and indices."""
logger.info("Initializing Vision 2030 Assistant...")
self.load_embedding_models()
self.load_language_model()
self._create_knowledge_base()
self._create_indices()
self._create_sample_eval_data()
self.metrics = {"response_times": [], "user_ratings": [], "factual_accuracy": []}
self.session_history = {} # Dictionary to store session history
self.has_pdf_content = False # Flag to indicate if PDF content is available
logger.info("Assistant initialized successfully")
def load_embedding_models(self):
"""Load Arabic and English embedding models on CPU."""
try:
self.arabic_embedder = SentenceTransformer('CAMeL-Lab/bert-base-arabic-camelbert-ca')
self.english_embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
# Models remain on CPU; GPU usage handled in decorated functions
logger.info("Embedding models loaded successfully")
except Exception as e:
logger.error(f"Failed to load embedding models: {e}")
self._fallback_embedding()
def _fallback_embedding(self):
"""Fallback method for embedding models using a simple random vector approach."""
logger.warning("Using fallback embedding method")
class SimpleEmbedder:
def encode(self, text, device=None): # Added device parameter for compatibility
import hashlib
hash_obj = hashlib.md5(text.encode())
np.random.seed(int(hash_obj.hexdigest(), 16) % 2**32)
return np.random.randn(384).astype(np.float32)
self.arabic_embedder = SimpleEmbedder()
self.english_embedder = SimpleEmbedder()
def load_language_model(self):
"""Load the DistilGPT-2 language model on CPU."""
try:
self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2")
self.model = AutoModelForCausalLM.from_pretrained("distilgpt2")
self.generator = pipeline(
'text-generation',
model=self.model,
tokenizer=self.tokenizer,
device=-1 # CPU
)
logger.info("Language model loaded successfully")
except Exception as e:
logger.error(f"Failed to load language model: {e}")
self.generator = None
def _create_knowledge_base(self):
"""Initialize the knowledge base with basic Vision 2030 information."""
self.english_texts = [
"Vision 2030 is Saudi Arabia's strategic framework to reduce dependence on oil, diversify the economy, and develop public sectors.",
"The key pillars of Vision 2030 are a vibrant society, a thriving economy, and an ambitious nation.",
"NEOM is a planned smart city in Tabuk Province, a key Vision 2030 project."
]
self.arabic_texts = [
"رؤية 2030 هي إطار استراتيجي لتقليل الاعتماد على النفط وتنويع الاقتصاد.",
"الركائز الرئيسية لرؤية 2030 هي مجتمع حيوي، واقتصاد مزدهر، ووطن طموح.",
"نيوم مدينة ذكية مخططة في تبوك، مشروع رئيسي لرؤية 2030."
]
self.pdf_english_texts = []
self.pdf_arabic_texts = []
def _create_indices(self):
"""Create FAISS indices for the initial knowledge base on CPU."""
try:
# English index
english_vectors = [self.english_embedder.encode(text) for text in self.english_texts]
dim = len(english_vectors[0])
nlist = max(1, len(english_vectors) // 10)
quantizer = faiss.IndexFlatL2(dim)
self.english_index = faiss.IndexIVFFlat(quantizer, dim, nlist)
self.english_index.train(np.array(english_vectors))
self.english_index.add(np.array(english_vectors))
# Arabic index
arabic_vectors = [self.arabic_embedder.encode(text) for text in self.arabic_texts]
self.arabic_index = faiss.IndexIVFFlat(quantizer, dim, nlist)
self.arabic_index.train(np.array(arabic_vectors))
self.arabic_index.add(np.array(arabic_vectors))
logger.info("FAISS indices created successfully")
except Exception as e:
logger.error(f"Error creating indices: {e}")
def _create_sample_eval_data(self):
"""Create sample evaluation data for testing factual accuracy."""
self.eval_data = [
{"question": "What are the key pillars of Vision 2030?",
"lang": "en",
"reference": "The key pillars of Vision 2030 are a vibrant society, a thriving economy, and an ambitious nation."},
{"question": "ما هي الركائز الرئيسية لرؤية 2030؟",
"lang": "ar",
"reference": "الركائز الرئيسية لرؤية 2030 هي مجتمع حيوي، واقتصاد مزدهر، ووطن طموح."}
]
def retrieve_context(self, query, lang, session_id, device='cpu'):
"""Retrieve relevant context using the specified device for encoding."""
try:
history = self.session_history.get(session_id, [])
history_context = " ".join([f"Q: {q} A: {a}" for q, a in history[-2:]])
embedder = self.arabic_embedder if lang == "ar" else self.english_embedder
query_vec = embedder.encode(query, device=device)
if lang == "ar":
if self.has_pdf_content and self.pdf_arabic_texts:
index = self.pdf_arabic_index
texts = self.pdf_arabic_texts
else:
index = self.arabic_index
texts = self.arabic_texts
else:
if self.has_pdf_content and self.pdf_english_texts:
index = self.pdf_english_index
texts = self.pdf_english_texts
else:
index = self.english_index
texts = self.english_texts
D, I = index.search(np.array([query_vec]), k=2)
context = "\n".join([texts[i] for i in I[0] if i >= 0]) + f"\nHistory: {history_context}"
return context if context.strip() else "No relevant information found."
except Exception as e:
logger.error(f"Retrieval error: {e}")
return "Error retrieving context."
@spaces.GPU
def generate_response(self, query, session_id):
"""Generate a response using GPU resources when available."""
if not query.strip():
return "Please enter a valid question."
start_time = time.time()
try:
lang = "ar" if any('\u0600' <= c <= '\u06FF' for c in query) else "en"
context = self.retrieve_context(query, lang, session_id, device='cuda')
if "Error" in context or "No relevant" in context:
reply = context
elif self.generator:
# Move the language model to GPU
self.generator.model.to('cuda')
prompt = f"Context: {context}\nQuestion: {query}\nAnswer:"
response = self.generator(prompt, max_length=150, num_return_sequences=1, do_sample=True, temperature=0.7)
reply = response[0]['generated_text'].split("Answer:")[-1].strip()
# Move the language model back to CPU
self.generator.model.to('cpu')
else:
reply = context
self.session_history.setdefault(session_id, []).append((query, reply))
self.metrics["response_times"].append(time.time() - start_time)
return reply
except Exception as e:
logger.error(f"Response generation error: {e}")
return "Sorry, an error occurred. Please try again."
def evaluate_factual_accuracy(self, response, reference):
"""Evaluate the factual accuracy of a response using semantic similarity."""
try:
embedder = self.english_embedder # Assuming reference is in English for simplicity
response_vec = embedder.encode(response)
reference_vec = embedder.encode(reference)
similarity = util.cos_sim(response_vec, reference_vec).item()
return similarity
except Exception as e:
logger.error(f"Evaluation error: {e}")
return 0.0
@spaces.GPU
def process_pdf(self, file):
"""Process a PDF file and update the knowledge base using GPU for encoding."""
if not file:
return "Please upload a PDF file."
try:
pdf_reader = PyPDF2.PdfReader(io.BytesIO(file))
text = "".join([page.extract_text() or "" for page in pdf_reader.pages])
if not text.strip():
return "No extractable text found in PDF."
# Split text into chunks
chunks = [text[i:i+300] for i in range(0, len(text), 300)]
self.pdf_english_texts = [c for c in chunks if not any('\u0600' <= char <= '\u06FF' for char in c)]
self.pdf_arabic_texts = [c for c in chunks if any('\u0600' <= char <= '\u06FF' for char in c)]
# Create indices for PDF content using GPU
if self.pdf_english_texts:
english_vectors = [self.english_embedder.encode(text, device='cuda') for text in self.pdf_english_texts]
dim = len(english_vectors[0])
nlist = max(1, len(english_vectors) // 10)
quantizer = faiss.IndexFlatL2(dim)
self.pdf_english_index = faiss.IndexIVFFlat(quantizer, dim, nlist)
self.pdf_english_index.train(np.array(english_vectors))
self.pdf_english_index.add(np.array(english_vectors))
if self.pdf_arabic_texts:
arabic_vectors = [self.arabic_embedder.encode(text, device='cuda') for text in self.pdf_arabic_texts]
dim = len(arabic_vectors[0])
nlist = max(1, len(arabic_vectors) // 10)
quantizer = faiss.IndexFlatL2(dim)
self.pdf_arabic_index = faiss.IndexIVFFlat(quantizer, dim, nlist)
self.pdf_arabic_index.train(np.array(arabic_vectors))
self.pdf_arabic_index.add(np.array(arabic_vectors))
self.has_pdf_content = True
return f"PDF processed: {len(self.pdf_english_texts)} English, {len(self.pdf_arabic_texts)} Arabic chunks."
except Exception as e:
logger.error(f"PDF processing error: {e}")
return f"Error processing PDF: {e}"
# Create the Gradio interface
def create_interface():
"""Set up the Gradio interface for chatting and PDF uploading."""
assistant = Vision2030Assistant()
def chat(query, history, session_id):
reply = assistant.generate_response(query, session_id)
history.append((query, reply))
return history, ""
with gr.Blocks() as demo:
gr.Markdown("# Vision 2030 Virtual Assistant")
session_id = gr.State(value="user1") # Fixed session ID for simplicity
chatbot = gr.Chatbot()
msg = gr.Textbox(label="Ask a question")
submit = gr.Button("Submit")
pdf_upload = gr.File(label="Upload PDF", type="binary")
upload_status = gr.Textbox(label="Upload Status")
submit.click(chat, [msg, chatbot, session_id], [chatbot, msg])
pdf_upload.upload(assistant.process_pdf, pdf_upload, upload_status)
return demo
# Launch the interface
if __name__ == "__main__":
demo = create_interface()
demo.launch()