import os import re import json import torch import numpy as np import pandas as pd from tqdm import tqdm from pathlib import Path import spaces # Import spaces for GPU allocation # PDF processing import PyPDF2 # LLM and embeddings from transformers import AutoTokenizer, AutoModelForCausalLM from sentence_transformers import SentenceTransformer # RAG components from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain.schema import Document from langchain.embeddings import HuggingFaceEmbeddings # Arabic text processing import arabic_reshaper from bidi.algorithm import get_display # Evaluation from rouge_score import rouge_scorer import sacrebleu from sklearn.metrics import accuracy_score, precision_recall_fscore_support import matplotlib.pyplot as plt import seaborn as sns from collections import defaultdict # Gradio for the interface import gradio as gr # Helper functions def safe_tokenize(text): """Pure regex tokenizer with no NLTK dependency""" if not text: return [] # Replace punctuation with spaces around them text = re.sub(r'([.,!?;:()\[\]{}"\'/\\])', r' \1 ', text) # Split on whitespace and filter empty strings return [token for token in re.split(r'\s+', text.lower()) if token] def detect_language(text): """Detect if text is primarily Arabic or English""" # Simple heuristic: count Arabic characters arabic_chars = re.findall(r'[\u0600-\u06FF]', text) is_arabic = len(arabic_chars) > len(text) * 0.5 return "arabic" if is_arabic else "english" # Evaluation metrics def calculate_bleu(prediction, reference): """Calculate BLEU score without any NLTK dependency""" # Tokenize texts using our own tokenizer pred_tokens = safe_tokenize(prediction.lower()) ref_tokens = [safe_tokenize(reference.lower())] # If either is empty, return 0 if not pred_tokens or not ref_tokens[0]: return {"bleu_1": 0, "bleu_2": 0, "bleu_4": 0} # Get n-grams function def get_ngrams(tokens, n): return [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)] # Calculate precision for each n-gram level precisions = [] for n in range(1, 5): # 1-gram to 4-gram if len(pred_tokens) < n: precisions.append(0) continue pred_ngrams = get_ngrams(pred_tokens, n) ref_ngrams = get_ngrams(ref_tokens[0], n) # Count matches matches = sum(1 for ng in pred_ngrams if ng in ref_ngrams) # Calculate precision if pred_ngrams: precisions.append(matches / len(pred_ngrams)) else: precisions.append(0) # Return BLEU scores return { "bleu_1": precisions[0], "bleu_2": (precisions[0] * precisions[1]) ** 0.5 if len(precisions) > 1 else 0, "bleu_4": (precisions[0] * precisions[1] * precisions[2] * precisions[3]) ** 0.25 if len(precisions) > 3 else 0 } def calculate_meteor(prediction, reference): """Simple word overlap metric as METEOR alternative""" # Tokenize with our custom tokenizer pred_tokens = set(safe_tokenize(prediction.lower())) ref_tokens = set(safe_tokenize(reference.lower())) # Calculate Jaccard similarity as METEOR alternative if not pred_tokens or not ref_tokens: return 0 intersection = len(pred_tokens.intersection(ref_tokens)) union = len(pred_tokens.union(ref_tokens)) return intersection / union if union > 0 else 0 def calculate_f1_precision_recall(prediction, reference): """Calculate word-level F1, precision, and recall with custom tokenizer""" # Tokenize with our custom tokenizer pred_tokens = set(safe_tokenize(prediction.lower())) ref_tokens = set(safe_tokenize(reference.lower())) # Calculate overlap common = pred_tokens.intersection(ref_tokens) # Calculate precision, recall, F1 precision = len(common) / len(pred_tokens) if pred_tokens else 0 recall = len(common) / len(ref_tokens) if ref_tokens else 0 f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0 return {'precision': precision, 'recall': recall, 'f1': f1} def evaluate_retrieval_quality(contexts, query, language): """Evaluate the quality of retrieved contexts""" # This is a placeholder function that should be implemented based on # how you want to evaluate retrieval quality return { 'language_match_ratio': 1.0, # Placeholder 'source_diversity': len(set([ctx.get('source', '') for ctx in contexts])) / max(1, len(contexts)), 'mrr': 1.0 # Placeholder for Mean Reciprocal Rank } # PDF Processing and Vector Store def simple_process_pdfs(pdf_paths): """Process PDF documents and return document objects""" documents = [] print(f"Attempting to process PDFs: {pdf_paths}") print(f"Current directory contents: {os.listdir('.')}") for pdf_path in pdf_paths: try: if not os.path.exists(pdf_path): print(f"Warning: {pdf_path} does not exist") continue print(f"Processing {pdf_path}...") text = "" with open(pdf_path, 'rb') as file: reader = PyPDF2.PdfReader(file) for page in reader.pages: page_text = page.extract_text() if page_text: # If we got text from this page text += page_text + "\n\n" if text.strip(): # If we got some text doc = Document( page_content=text, metadata={"source": pdf_path, "filename": os.path.basename(pdf_path)} ) documents.append(doc) print(f"Successfully processed: {pdf_path}") else: print(f"Warning: No text extracted from {pdf_path}") except Exception as e: print(f"Error processing {pdf_path}: {e}") import traceback traceback.print_exc() print(f"Processed {len(documents)} PDF documents") return documents def create_vector_store(documents): """Split documents into chunks and create a FAISS vector store""" # Text splitter for breaking documents into chunks text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50, separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] ) # Split documents into chunks chunks = [] for doc in documents: doc_chunks = text_splitter.split_text(doc.page_content) # Preserve metadata for each chunk chunks.extend([ Document(page_content=chunk, metadata=doc.metadata) for chunk in doc_chunks ]) print(f"Created {len(chunks)} chunks from {len(documents)} documents") # Create a proper embedding function for LangChain embedding_function = HuggingFaceEmbeddings( model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2" ) # Create FAISS index vector_store = FAISS.from_documents( chunks, embedding_function ) return vector_store # Model Loading and RAG System @spaces.GPU # Use GPU for model loading def load_model_and_tokenizer(): """Load the ALLaM-7B model and tokenizer with error handling""" model_name = "ALLaM-AI/ALLaM-7B-Instruct-preview" print(f"Loading model: {model_name}") try: # First attempt with AutoTokenizer tokenizer = AutoTokenizer.from_pretrained( model_name, trust_remote_code=True, use_fast=False ) # Load model with appropriate settings for ALLaM model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.bfloat16, # Use bfloat16 for better compatibility trust_remote_code=True, device_map="auto", ) print("Model loaded successfully with AutoTokenizer!") except Exception as e: print(f"First loading attempt failed: {e}") print("Trying alternative loading approach...") # Try with specific tokenizer class if the first attempt fails from transformers import LlamaTokenizer tokenizer = LlamaTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16, trust_remote_code=True, device_map="auto", ) print("Model loaded successfully with LlamaTokenizer!") return model, tokenizer def retrieve_context(query, vector_store, top_k=5): """Retrieve most relevant document chunks for a given query""" # Search the vector store using similarity search results = vector_store.similarity_search_with_score(query, k=top_k) # Format the retrieved contexts contexts = [] for doc, score in results: contexts.append({ "content": doc.page_content, "source": doc.metadata.get("source", "Unknown"), "relevance_score": score }) return contexts @spaces.GPU # Use GPU for text generation def generate_response(query, contexts, model, tokenizer, language="auto"): """Generate a response using retrieved contexts with ALLaM-specific formatting""" # Auto-detect language if not specified if language == "auto": language = detect_language(query) # Format the prompt based on language if language == "arabic": instruction = ( "أنت مساعد افتراضي يهتم برؤية السعودية 2030. استخدم المعلومات التالية للإجابة على السؤال. " "إذا لم تعرف الإجابة، فقل بأمانة إنك لا تعرف." ) else: # english instruction = ( "You are a virtual assistant for Saudi Vision 2030. Use the following information to answer the question. " "If you don't know the answer, honestly say you don't know." ) # Combine retrieved contexts context_text = "\n\n".join([f"Document: {ctx['content']}" for ctx in contexts]) # Format the prompt for ALLaM instruction format prompt = f"""[INST] {instruction} Context: {context_text} Question: {query} [/INST]""" try: # Generate response with appropriate parameters for ALLaM inputs = tokenizer(prompt, return_tensors="pt").to(model.device) # Generate with appropriate parameters outputs = model.generate( inputs.input_ids, attention_mask=inputs.attention_mask, max_new_tokens=512, temperature=0.7, top_p=0.9, do_sample=True, repetition_penalty=1.1 ) # Decode the response full_output = tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract just the answer part (after the instruction) response = full_output.split("[/INST]")[-1].strip() # If response is empty for some reason, return the full output if not response: response = full_output return response except Exception as e: print(f"Error during generation: {e}") # Fallback response return "I apologize, but I encountered an error while generating a response." # Assistant Class class Vision2030Assistant: def __init__(self, model, tokenizer, vector_store): self.model = model self.tokenizer = tokenizer self.vector_store = vector_store self.conversation_history = [] def answer(self, user_query): """Process a user query and return a response with sources""" # Detect language language = detect_language(user_query) # Add user query to conversation history self.conversation_history.append({"role": "user", "content": user_query}) # Get the full conversation context conversation_context = "\n".join([ f"{'User' if msg['role'] == 'user' else 'Assistant'}: {msg['content']}" for msg in self.conversation_history[-6:] # Keep last 3 turns (6 messages) ]) # Enhance query with conversation context for better retrieval enhanced_query = f"{conversation_context}\n{user_query}" # Retrieve relevant contexts contexts = retrieve_context(enhanced_query, self.vector_store, top_k=5) # Generate response response = generate_response(user_query, contexts, self.model, self.tokenizer, language) # Add response to conversation history self.conversation_history.append({"role": "assistant", "content": response}) # Also return sources for transparency sources = [ctx.get("source", "Unknown") for ctx in contexts] unique_sources = list(set(sources)) return response, unique_sources, contexts def reset_conversation(self): """Reset the conversation history""" self.conversation_history = [] return "Conversation has been reset." # Comprehensive evaluation dataset comprehensive_evaluation_data = [ # === Overview === { "query": "ما هي رؤية السعودية 2030؟", "reference": "رؤية السعودية 2030 هي خطة استراتيجية تهدف إلى تنويع الاقتصاد السعودي وتقليل الاعتماد على النفط مع تطوير قطاعات مختلفة مثل الصحة والتعليم والسياحة.", "category": "overview", "language": "arabic" }, { "query": "What is Saudi Vision 2030?", "reference": "Saudi Vision 2030 is a strategic framework aiming to diversify Saudi Arabia's economy and reduce dependence on oil, while developing sectors like health, education, and tourism.", "category": "overview", "language": "english" }, # === Economic Goals === { "query": "ما هي الأهداف الاقتصادية لرؤية 2030؟", "reference": "تشمل الأهداف الاقتصادية زيادة مساهمة القطاع الخاص إلى 65%، وزيادة الصادرات غير النفطية إلى 50% من الناتج المحلي غير النفطي، وخفض البطالة إلى 7%.", "category": "economic", "language": "arabic" }, { "query": "What are the economic goals of Vision 2030?", "reference": "The economic goals of Vision 2030 include increasing private sector contribution from 40% to 65% of GDP, raising non-oil exports from 16% to 50%, reducing unemployment from 11.6% to 7%.", "category": "economic", "language": "english" }, # === Social Goals === { "query": "كيف تعزز رؤية 2030 الإرث الثقافي السعودي؟", "reference": "تتضمن رؤية 2030 الحفاظ على الهوية الوطنية، تسجيل مواقع أثرية في اليونسكو، وتعزيز الفعاليات الثقافية.", "category": "social", "language": "arabic" }, { "query": "How does Vision 2030 aim to improve quality of life?", "reference": "Vision 2030 plans to enhance quality of life by expanding sports facilities, promoting cultural activities, and boosting tourism and entertainment sectors.", "category": "social", "language": "english" } ] # Gradio Interface def initialize_system(): """Initialize the Vision 2030 Assistant system""" # Define paths for PDF files in the root directory pdf_files = ["saudi_vision203.pdf", "saudi_vision2030_ar.pdf"] # Print available files for debugging print("Files in current directory:", os.listdir(".")) # Check if PDFs exist for pdf_file in pdf_files: if not os.path.exists(pdf_file): print(f"Warning: {pdf_file} not found") # Process PDFs and create vector store vector_store_dir = "vector_stores" os.makedirs(vector_store_dir, exist_ok=True) if os.path.exists(os.path.join(vector_store_dir, "index.faiss")): print("Loading existing vector store...") embedding_function = HuggingFaceEmbeddings( model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2" ) vector_store = FAISS.load_local(vector_store_dir, embedding_function) else: print("Creating new vector store...") documents = simple_process_pdfs(pdf_files) if not documents: raise ValueError("No documents were processed successfully. Cannot continue.") vector_store = create_vector_store(documents) vector_store.save_local(vector_store_dir) # Load model and tokenizer model, tokenizer = load_model_and_tokenizer() # Initialize assistant assistant = Vision2030Assistant(model, tokenizer, vector_store) return assistant def evaluate_response(query, response, reference): """Evaluate a single response against a reference""" # Calculate metrics rouge = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True) rouge_scores = rouge.score(response, reference) bleu_scores = calculate_bleu(response, reference) meteor = calculate_meteor(response, reference) word_metrics = calculate_f1_precision_recall(response, reference) # Format results evaluation_results = { "ROUGE-1": f"{rouge_scores['rouge1'].fmeasure:.4f}", "ROUGE-2": f"{rouge_scores['rouge2'].fmeasure:.4f}", "ROUGE-L": f"{rouge_scores['rougeL'].fmeasure:.4f}", "BLEU-1": f"{bleu_scores['bleu_1']:.4f}", "BLEU-4": f"{bleu_scores['bleu_4']:.4f}", "METEOR": f"{meteor:.4f}", "Word Precision": f"{word_metrics['precision']:.4f}", "Word Recall": f"{word_metrics['recall']:.4f}", "Word F1": f"{word_metrics['f1']:.4f}" } return evaluation_results @spaces.GPU # Use GPU for conversation handling def run_conversation(assistant, query): """Run a query through the assistant and return the response""" response, sources, contexts = assistant.answer(query) return response, sources, contexts @spaces.GPU # Use GPU for evaluation def run_evaluation_on_sample(assistant, sample_index=0): """Run evaluation on a selected sample from the evaluation dataset""" if sample_index < 0 or sample_index >= len(comprehensive_evaluation_data): return "Invalid sample index", "", "", {} # Get the sample sample = comprehensive_evaluation_data[sample_index] query = sample["query"] reference = sample["reference"] category = sample["category"] language = sample["language"] # Reset conversation and get response assistant.reset_conversation() response, sources, contexts = assistant.answer(query) # Evaluate response evaluation_results = evaluate_response(query, response, reference) return query, response, reference, evaluation_results, sources, category, language def qualitative_evaluation_interface(assistant=None): """Create a Gradio interface for qualitative evaluation""" # If assistant is None, create a simplified interface if assistant is None: with gr.Blocks(title="Vision 2030 Assistant - Initialization Error") as interface: gr.Markdown("# Vision 2030 Assistant - Initialization Error") gr.Markdown("There was an error initializing the assistant. Please check the logs for details.") gr.Textbox(label="Status", value="System initialization failed") return interface sample_options = [f"{i+1}. {item['query'][:50]}..." for i, item in enumerate(comprehensive_evaluation_data)] with gr.Blocks(title="Vision 2030 Assistant - Qualitative Evaluation") as interface: gr.Markdown("# Vision 2030 Assistant - Qualitative Evaluation") gr.Markdown("This interface allows you to evaluate the Vision 2030 Assistant on predefined samples or your own queries.") with gr.Tab("Sample Evaluation"): gr.Markdown("### Evaluate the assistant on predefined samples") sample_dropdown = gr.Dropdown( choices=sample_options, label="Select a sample query", value=sample_options[0] if sample_options else None ) eval_button = gr.Button("Evaluate Sample") with gr.Row(): with gr.Column(): sample_query = gr.Textbox(label="Query") sample_category = gr.Textbox(label="Category") sample_language = gr.Textbox(label="Language") with gr.Column(): sample_response = gr.Textbox(label="Assistant Response") sample_reference = gr.Textbox(label="Reference Answer") sample_sources = gr.Textbox(label="Sources Used") with gr.Row(): metrics_display = gr.JSON(label="Evaluation Metrics") with gr.Tab("Custom Evaluation"): gr.Markdown("### Evaluate the assistant on your own query") custom_query = gr.Textbox( lines=3, placeholder="Enter your question about Saudi Vision 2030...", label="Your Query" ) custom_reference = gr.Textbox( lines=3, placeholder="Enter a reference answer (optional)...", label="Reference Answer (Optional)" ) custom_eval_button = gr.Button("Get Response and Evaluate") custom_response = gr.Textbox(label="Assistant Response") custom_sources = gr.Textbox(label="Sources Used") custom_metrics = gr.JSON( label="Evaluation Metrics (if reference provided)", visible=True ) with gr.Tab("Conversation Mode"): gr.Markdown("### Have a conversation with the Vision 2030 Assistant") chatbot = gr.Chatbot(label="Conversation") conv_input = gr.Textbox( placeholder="Ask about Saudi Vision 2030...", label="Your message" ) with gr.Row(): conv_button = gr.Button("Send") reset_button = gr.Button("Reset Conversation") conv_sources = gr.Textbox(label="Sources Used") # Sample evaluation event handlers def handle_sample_selection(selection): if not selection: return "", "", "", "", "", "", "" # Extract index from the selection string try: index = int(selection.split(".")[0]) - 1 query, response, reference, metrics, sources, category, language = run_evaluation_on_sample(assistant, index) sources_str = ", ".join(sources) return query, response, reference, metrics, sources_str, category, language except Exception as e: print(f"Error in handle_sample_selection: {e}") import traceback traceback.print_exc() return f"Error processing selection: {e}", "", "", {}, "", "", "" eval_button.click( handle_sample_selection, inputs=[sample_dropdown], outputs=[sample_query, sample_response, sample_reference, metrics_display, sample_sources, sample_category, sample_language] ) sample_dropdown.change( handle_sample_selection, inputs=[sample_dropdown], outputs=[sample_query, sample_response, sample_reference, metrics_display, sample_sources, sample_category, sample_language] ) # Custom evaluation event handlers @spaces.GPU # Use GPU for custom evaluation def handle_custom_evaluation(query, reference): if not query: return "Please enter a query", "", {} # Reset conversation to ensure clean state assistant.reset_conversation() # Get response response, sources, _ = assistant.answer(query) sources_str = ", ".join(sources) # Evaluate if reference is provided metrics = {} if reference: metrics = evaluate_response(query, response, reference) return response, sources_str, metrics custom_eval_button.click( handle_custom_evaluation, inputs=[custom_query, custom_reference], outputs=[custom_response, custom_sources, custom_metrics] ) # Conversation mode event handlers @spaces.GPU # Use GPU for conversation handling def handle_conversation(message, history): if not message: return history, "", "" # Get response response, sources, _ = assistant.answer(message) sources_str = ", ".join(sources) # Update history history = history + [[message, response]] return history, "", sources_str def reset_conv(): result = assistant.reset_conversation() return [], result, "" conv_button.click( handle_conversation, inputs=[conv_input, chatbot], outputs=[chatbot, conv_input, conv_sources] ) reset_button.click( reset_conv, inputs=[], outputs=[chatbot, conv_input, conv_sources] ) return interface # Main function to run in Hugging Face Space def main(): # Start with a debugging report print("=" * 50) print("SYSTEM INITIALIZATION") print("=" * 50) print("Current directory:", os.getcwd()) print("Files in directory:", os.listdir(".")) print("=" * 50) # Initialize the system try: # First check if PDF files exist pdf_files = ["saudi_vision203.pdf", "saudi_vision2030_ar.pdf"] for pdf_file in pdf_files: if not os.path.exists(pdf_file): print(f"Warning: {pdf_file} not found!") # Process with initialization print("Starting system initialization...") assistant = initialize_system() print("Creating interface...") interface = qualitative_evaluation_interface(assistant) print("Launching interface...") interface.launch() except Exception as e: print(f"Error during initialization: {e}") import traceback traceback.print_exc() # Create a simple error interface with gr.Blocks(title="Vision 2030 Assistant - Error") as debug_interface: gr.Markdown("# Vision 2030 Assistant - Initialization Error") gr.Markdown("There was an error initializing the assistant.") # Display error details gr.Textbox( value=f"Error: {str(e)}", label="Error Details", lines=5 ) # Show file system status files_list = "\n".join(os.listdir(".")) gr.Textbox( value=files_list, label="Files in Directory", lines=10 ) # Add a button to check PDFs def check_pdfs(): result = [] for pdf_file in ["saudi_vision203.pdf", "saudi_vision2030_ar.pdf"]: if os.path.exists(pdf_file): size = os.path.getsize(pdf_file) / (1024 * 1024) # Size in MB result.append(f"{pdf_file}: Found ({size:.2f} MB)") else: result.append(f"{pdf_file}: Not found") return "\n".join(result) check_btn = gr.Button("Check PDF Files") pdf_status = gr.Textbox(label="PDF Status", lines=3) check_btn.click(check_pdfs, inputs=[], outputs=[pdf_status]) debug_interface.launch() if __name__ == "__main__": main()