Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import re | |
import json | |
import torch | |
import numpy as np | |
import pandas as pd | |
from tqdm import tqdm | |
from pathlib import Path | |
import spaces | |
# PDF processing | |
import PyPDF2 | |
# LLM and embeddings | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
from sentence_transformers import SentenceTransformer | |
# RAG components | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain.schema import Document | |
from langchain.embeddings import HuggingFaceEmbeddings | |
# Arabic text processing | |
import arabic_reshaper | |
from bidi.algorithm import get_display | |
# Evaluation | |
from rouge_score import rouge_scorer | |
import sacrebleu | |
from sklearn.metrics import accuracy_score, precision_recall_fscore_support | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from collections import defaultdict | |
# Gradio for the interface | |
import gradio as gr | |
# Helper functions | |
def safe_tokenize(text): | |
"""Pure regex tokenizer with no NLTK dependency""" | |
if not text: | |
return [] | |
# Replace punctuation with spaces around them | |
text = re.sub(r'([.,!?;:()\[\]{}"\'/\\])', r' \1 ', text) | |
# Split on whitespace and filter empty strings | |
return [token for token in re.split(r'\s+', text.lower()) if token] | |
def detect_language(text): | |
"""Detect if text is primarily Arabic or English""" | |
# Simple heuristic: count Arabic characters | |
arabic_chars = re.findall(r'[\u0600-\u06FF]', text) | |
is_arabic = len(arabic_chars) > len(text) * 0.5 | |
return "arabic" if is_arabic else "english" | |
# Evaluation metrics | |
def calculate_bleu(prediction, reference): | |
"""Calculate BLEU score without any NLTK dependency""" | |
# Tokenize texts using our own tokenizer | |
pred_tokens = safe_tokenize(prediction.lower()) | |
ref_tokens = [safe_tokenize(reference.lower())] | |
# If either is empty, return 0 | |
if not pred_tokens or not ref_tokens[0]: | |
return {"bleu_1": 0, "bleu_2": 0, "bleu_4": 0} | |
# Get n-grams function | |
def get_ngrams(tokens, n): | |
return [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)] | |
# Calculate precision for each n-gram level | |
precisions = [] | |
for n in range(1, 5): # 1-gram to 4-gram | |
if len(pred_tokens) < n: | |
precisions.append(0) | |
continue | |
pred_ngrams = get_ngrams(pred_tokens, n) | |
ref_ngrams = get_ngrams(ref_tokens[0], n) | |
# Count matches | |
matches = sum(1 for ng in pred_ngrams if ng in ref_ngrams) | |
# Calculate precision | |
if pred_ngrams: | |
precisions.append(matches / len(pred_ngrams)) | |
else: | |
precisions.append(0) | |
# Return BLEU scores | |
return { | |
"bleu_1": precisions[0], | |
"bleu_2": (precisions[0] * precisions[1]) ** 0.5 if len(precisions) > 1 else 0, | |
"bleu_4": (precisions[0] * precisions[1] * precisions[2] * precisions[3]) ** 0.25 if len(precisions) > 3 else 0 | |
} | |
def calculate_meteor(prediction, reference): | |
"""Simple word overlap metric as METEOR alternative""" | |
# Tokenize with our custom tokenizer | |
pred_tokens = set(safe_tokenize(prediction.lower())) | |
ref_tokens = set(safe_tokenize(reference.lower())) | |
# Calculate Jaccard similarity as METEOR alternative | |
if not pred_tokens or not ref_tokens: | |
return 0 | |
intersection = len(pred_tokens.intersection(ref_tokens)) | |
union = len(pred_tokens.union(ref_tokens)) | |
return intersection / union if union > 0 else 0 | |
def calculate_f1_precision_recall(prediction, reference): | |
"""Calculate word-level F1, precision, and recall with custom tokenizer""" | |
# Tokenize with our custom tokenizer | |
pred_tokens = set(safe_tokenize(prediction.lower())) | |
ref_tokens = set(safe_tokenize(reference.lower())) | |
# Calculate overlap | |
common = pred_tokens.intersection(ref_tokens) | |
# Calculate precision, recall, F1 | |
precision = len(common) / len(pred_tokens) if pred_tokens else 0 | |
recall = len(common) / len(ref_tokens) if ref_tokens else 0 | |
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0 | |
return {'precision': precision, 'recall': recall, 'f1': f1} | |
def evaluate_retrieval_quality(contexts, query, language): | |
"""Evaluate the quality of retrieved contexts""" | |
# This is a placeholder function | |
return { | |
'language_match_ratio': 1.0, | |
'source_diversity': len(set([ctx.get('source', '') for ctx in contexts])) / max(1, len(contexts)), | |
'mrr': 1.0 | |
} | |
# PDF Processing and Vector Store | |
def simple_process_pdfs(pdf_paths): | |
"""Process PDF documents and return document objects""" | |
documents = [] | |
print(f"Processing PDFs: {pdf_paths}") | |
for pdf_path in pdf_paths: | |
try: | |
if not os.path.exists(pdf_path): | |
print(f"Warning: {pdf_path} does not exist") | |
continue | |
print(f"Processing {pdf_path}...") | |
text = "" | |
with open(pdf_path, 'rb') as file: | |
reader = PyPDF2.PdfReader(file) | |
for page in reader.pages: | |
page_text = page.extract_text() | |
if page_text: # If we got text from this page | |
text += page_text + "\n\n" | |
if text.strip(): # If we got some text | |
doc = Document( | |
page_content=text, | |
metadata={"source": pdf_path, "filename": os.path.basename(pdf_path)} | |
) | |
documents.append(doc) | |
print(f"Successfully processed: {pdf_path}") | |
else: | |
print(f"Warning: No text extracted from {pdf_path}") | |
except Exception as e: | |
print(f"Error processing {pdf_path}: {e}") | |
import traceback | |
traceback.print_exc() | |
print(f"Processed {len(documents)} PDF documents") | |
return documents | |
def create_vector_store(documents): | |
"""Split documents into chunks and create a FAISS vector store""" | |
# Text splitter for breaking documents into chunks | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=50, | |
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] | |
) | |
# Split documents into chunks | |
chunks = [] | |
for doc in documents: | |
doc_chunks = text_splitter.split_text(doc.page_content) | |
# Preserve metadata for each chunk | |
chunks.extend([ | |
Document(page_content=chunk, metadata=doc.metadata) | |
for chunk in doc_chunks | |
]) | |
print(f"Created {len(chunks)} chunks from {len(documents)} documents") | |
# Create a proper embedding function for LangChain | |
embedding_function = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2" | |
) | |
# Create FAISS index | |
vector_store = FAISS.from_documents( | |
chunks, | |
embedding_function | |
) | |
return vector_store | |
# Model Loading and RAG System - Improved to handle SentencePiece issues | |
def load_model_and_tokenizer(): | |
"""Load the ALLaM-7B model and tokenizer with error handling""" | |
model_name = "ALLaM-AI/ALLaM-7B-Instruct-preview" | |
print(f"Loading model: {model_name}") | |
try: | |
# Check if sentencepiece is installed | |
try: | |
import sentencepiece | |
print("SentencePiece is installed") | |
except ImportError: | |
print("Warning: SentencePiece is not installed. Attempting to proceed with AutoTokenizer only.") | |
# First attempt with AutoTokenizer | |
tokenizer = AutoTokenizer.from_pretrained( | |
model_name, | |
trust_remote_code=True, | |
use_fast=False | |
) | |
# Load model with appropriate settings for ALLaM | |
model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
torch_dtype=torch.bfloat16, | |
trust_remote_code=True, | |
device_map="auto", | |
) | |
print("Model loaded successfully with AutoTokenizer!") | |
return model, tokenizer | |
except Exception as e: | |
print(f"First loading attempt failed: {e}") | |
# If SentencePiece error, provide helpful message | |
if "SentencePiece" in str(e): | |
raise ImportError( | |
"The model requires SentencePiece library which is missing. " | |
"Add 'sentencepiece>=0.1.95' to your requirements.txt file." | |
) | |
# Other general error | |
raise Exception(f"Failed to load model: {e}") | |
def retrieve_context(query, vector_store, top_k=5): | |
"""Retrieve most relevant document chunks for a given query""" | |
# Search the vector store using similarity search | |
results = vector_store.similarity_search_with_score(query, k=top_k) | |
# Format the retrieved contexts | |
contexts = [] | |
for doc, score in results: | |
contexts.append({ | |
"content": doc.page_content, | |
"source": doc.metadata.get("source", "Unknown"), | |
"relevance_score": score | |
}) | |
return contexts | |
def generate_response(query, contexts, model, tokenizer, language="auto"): | |
"""Generate a response using retrieved contexts with ALLaM-specific formatting""" | |
# Auto-detect language if not specified | |
if language == "auto": | |
language = detect_language(query) | |
# Format the prompt based on language | |
if language == "arabic": | |
instruction = ( | |
"أنت مساعد افتراضي يهتم برؤية السعودية 2030. استخدم المعلومات التالية للإجابة على السؤال. " | |
"إذا لم تعرف الإجابة، فقل بأمانة إنك لا تعرف." | |
) | |
else: # english | |
instruction = ( | |
"You are a virtual assistant for Saudi Vision 2030. Use the following information to answer the question. " | |
"If you don't know the answer, honestly say you don't know." | |
) | |
# Combine retrieved contexts | |
context_text = "\n\n".join([f"Document: {ctx['content']}" for ctx in contexts]) | |
# Format the prompt for ALLaM instruction format | |
prompt = f"""<s>[INST] {instruction} | |
Context: | |
{context_text} | |
Question: {query} [/INST]</s>""" | |
try: | |
# Generate response with appropriate parameters for ALLaM | |
inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
# Generate with appropriate parameters | |
outputs = model.generate( | |
inputs.input_ids, | |
attention_mask=inputs.attention_mask, | |
max_new_tokens=512, | |
temperature=0.7, | |
top_p=0.9, | |
do_sample=True, | |
repetition_penalty=1.1 | |
) | |
# Decode the response | |
full_output = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Extract just the answer part (after the instruction) | |
response = full_output.split("[/INST]")[-1].strip() | |
# If response is empty for some reason, return the full output | |
if not response: | |
response = full_output | |
return response | |
except Exception as e: | |
print(f"Error during generation: {e}") | |
# Fallback response | |
return "I apologize, but I encountered an error while generating a response." | |
# Assistant Class | |
class Vision2030Assistant: | |
def __init__(self, model, tokenizer, vector_store): | |
self.model = model | |
self.tokenizer = tokenizer | |
self.vector_store = vector_store | |
self.conversation_history = [] | |
def answer(self, user_query): | |
"""Process a user query and return a response with sources""" | |
# Detect language | |
language = detect_language(user_query) | |
# Add user query to conversation history | |
self.conversation_history.append({"role": "user", "content": user_query}) | |
# Get the full conversation context | |
conversation_context = "\n".join([ | |
f"{'User' if msg['role'] == 'user' else 'Assistant'}: {msg['content']}" | |
for msg in self.conversation_history[-6:] # Keep last 3 turns (6 messages) | |
]) | |
# Enhance query with conversation context for better retrieval | |
enhanced_query = f"{conversation_context}\n{user_query}" | |
# Retrieve relevant contexts | |
contexts = retrieve_context(enhanced_query, self.vector_store, top_k=5) | |
# Generate response | |
response = generate_response(user_query, contexts, self.model, self.tokenizer, language) | |
# Add response to conversation history | |
self.conversation_history.append({"role": "assistant", "content": response}) | |
# Also return sources for transparency | |
sources = [ctx.get("source", "Unknown") for ctx in contexts] | |
unique_sources = list(set(sources)) | |
return response, unique_sources, contexts | |
def reset_conversation(self): | |
"""Reset the conversation history""" | |
self.conversation_history = [] | |
return "Conversation has been reset." | |
# Comprehensive evaluation dataset | |
comprehensive_evaluation_data = [ | |
# === Overview === | |
{ | |
"query": "ما هي رؤية السعودية 2030؟", | |
"reference": "رؤية السعودية 2030 هي خطة استراتيجية تهدف إلى تنويع الاقتصاد السعودي وتقليل الاعتماد على النفط مع تطوير قطاعات مختلفة مثل الصحة والتعليم والسياحة.", | |
"category": "overview", | |
"language": "arabic" | |
}, | |
{ | |
"query": "What is Saudi Vision 2030?", | |
"reference": "Saudi Vision 2030 is a strategic framework aiming to diversify Saudi Arabia's economy and reduce dependence on oil, while developing sectors like health, education, and tourism.", | |
"category": "overview", | |
"language": "english" | |
}, | |
# === Economic Goals === | |
{ | |
"query": "ما هي الأهداف الاقتصادية لرؤية 2030؟", | |
"reference": "تشمل الأهداف الاقتصادية زيادة مساهمة القطاع الخاص إلى 65%، وزيادة الصادرات غير النفطية إلى 50% من الناتج المحلي غير النفطي، وخفض البطالة إلى 7%.", | |
"category": "economic", | |
"language": "arabic" | |
}, | |
{ | |
"query": "What are the economic goals of Vision 2030?", | |
"reference": "The economic goals of Vision 2030 include increasing private sector contribution from 40% to 65% of GDP, raising non-oil exports from 16% to 50%, reducing unemployment from 11.6% to 7%.", | |
"category": "economic", | |
"language": "english" | |
}, | |
# === Social Goals === | |
{ | |
"query": "كيف تعزز رؤية 2030 الإرث الثقافي السعودي؟", | |
"reference": "تتضمن رؤية 2030 الحفاظ على الهوية الوطنية، تسجيل مواقع أثرية في اليونسكو، وتعزيز الفعاليات الثقافية.", | |
"category": "social", | |
"language": "arabic" | |
}, | |
{ | |
"query": "How does Vision 2030 aim to improve quality of life?", | |
"reference": "Vision 2030 plans to enhance quality of life by expanding sports facilities, promoting cultural activities, and boosting tourism and entertainment sectors.", | |
"category": "social", | |
"language": "english" | |
} | |
] | |
# Gradio Interface | |
def initialize_system(): | |
"""Initialize the Vision 2030 Assistant system""" | |
# Define paths for PDF files in the root directory | |
pdf_files = ["saudi_vision203.pdf", "saudi_vision2030_ar.pdf"] | |
# Print available files for debugging | |
print("Files in current directory:", os.listdir(".")) | |
# Process PDFs and create vector store | |
vector_store_dir = "vector_stores" | |
os.makedirs(vector_store_dir, exist_ok=True) | |
if os.path.exists(os.path.join(vector_store_dir, "index.faiss")): | |
print("Loading existing vector store...") | |
embedding_function = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2" | |
) | |
vector_store = FAISS.load_local(vector_store_dir, embedding_function) | |
else: | |
print("Creating new vector store...") | |
documents = simple_process_pdfs(pdf_files) | |
if not documents: | |
raise ValueError("No documents were processed successfully. Cannot continue.") | |
vector_store = create_vector_store(documents) | |
vector_store.save_local(vector_store_dir) | |
# Load model and tokenizer | |
model, tokenizer = load_model_and_tokenizer() | |
# Initialize assistant | |
assistant = Vision2030Assistant(model, tokenizer, vector_store) | |
return assistant | |
def evaluate_response(query, response, reference): | |
"""Evaluate a single response against a reference""" | |
# Calculate metrics | |
rouge = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True) | |
rouge_scores = rouge.score(response, reference) | |
bleu_scores = calculate_bleu(response, reference) | |
meteor = calculate_meteor(response, reference) | |
word_metrics = calculate_f1_precision_recall(response, reference) | |
# Format results | |
evaluation_results = { | |
"ROUGE-1": f"{rouge_scores['rouge1'].fmeasure:.4f}", | |
"ROUGE-2": f"{rouge_scores['rouge2'].fmeasure:.4f}", | |
"ROUGE-L": f"{rouge_scores['rougeL'].fmeasure:.4f}", | |
"BLEU-1": f"{bleu_scores['bleu_1']:.4f}", | |
"BLEU-4": f"{bleu_scores['bleu_4']:.4f}", | |
"METEOR": f"{meteor:.4f}", | |
"Word Precision": f"{word_metrics['precision']:.4f}", | |
"Word Recall": f"{word_metrics['recall']:.4f}", | |
"Word F1": f"{word_metrics['f1']:.4f}" | |
} | |
return evaluation_results | |
def run_conversation(assistant, query): | |
"""Run a query through the assistant and return the response""" | |
response, sources, contexts = assistant.answer(query) | |
return response, sources, contexts | |
def run_evaluation_on_sample(assistant, sample_index=0): | |
"""Run evaluation on a selected sample from the evaluation dataset""" | |
if sample_index < 0 or sample_index >= len(comprehensive_evaluation_data): | |
return "Invalid sample index", "", "", {} | |
# Get the sample | |
sample = comprehensive_evaluation_data[sample_index] | |
query = sample["query"] | |
reference = sample["reference"] | |
category = sample["category"] | |
language = sample["language"] | |
# Reset conversation and get response | |
assistant.reset_conversation() | |
response, sources, contexts = assistant.answer(query) | |
# Evaluate response | |
evaluation_results = evaluate_response(query, response, reference) | |
return query, response, reference, evaluation_results, sources, category, language | |
def qualitative_evaluation_interface(assistant=None): | |
"""Create a Gradio interface for qualitative evaluation""" | |
# If assistant is None, create a simplified interface | |
if assistant is None: | |
with gr.Blocks(title="Vision 2030 Assistant - Initialization Error") as interface: | |
gr.Markdown("# Vision 2030 Assistant - Initialization Error") | |
gr.Markdown("There was an error initializing the assistant. Please check the logs for details.") | |
gr.Textbox(label="Status", value="System initialization failed") | |
return interface | |
sample_options = [f"{i+1}. {item['query'][:50]}..." for i, item in enumerate(comprehensive_evaluation_data)] | |
with gr.Blocks(title="Vision 2030 Assistant - Qualitative Evaluation") as interface: | |
gr.Markdown("# Vision 2030 Assistant - Qualitative Evaluation") | |
gr.Markdown("This interface allows you to evaluate the Vision 2030 Assistant on predefined samples or your own queries.") | |
with gr.Tab("Sample Evaluation"): | |
gr.Markdown("### Evaluate the assistant on predefined samples") | |
sample_dropdown = gr.Dropdown( | |
choices=sample_options, | |
label="Select a sample query", | |
value=sample_options[0] if sample_options else None | |
) | |
eval_button = gr.Button("Evaluate Sample") | |
with gr.Row(): | |
with gr.Column(): | |
sample_query = gr.Textbox(label="Query") | |
sample_category = gr.Textbox(label="Category") | |
sample_language = gr.Textbox(label="Language") | |
with gr.Column(): | |
sample_response = gr.Textbox(label="Assistant Response") | |
sample_reference = gr.Textbox(label="Reference Answer") | |
sample_sources = gr.Textbox(label="Sources Used") | |
with gr.Row(): | |
metrics_display = gr.JSON(label="Evaluation Metrics") | |
with gr.Tab("Custom Evaluation"): | |
gr.Markdown("### Evaluate the assistant on your own query") | |
custom_query = gr.Textbox( | |
lines=3, | |
placeholder="Enter your question about Saudi Vision 2030...", | |
label="Your Query" | |
) | |
custom_reference = gr.Textbox( | |
lines=3, | |
placeholder="Enter a reference answer (optional)...", | |
label="Reference Answer (Optional)" | |
) | |
custom_eval_button = gr.Button("Get Response and Evaluate") | |
custom_response = gr.Textbox(label="Assistant Response") | |
custom_sources = gr.Textbox(label="Sources Used") | |
custom_metrics = gr.JSON( | |
label="Evaluation Metrics (if reference provided)", | |
visible=True | |
) | |
with gr.Tab("Conversation Mode"): | |
gr.Markdown("### Have a conversation with the Vision 2030 Assistant") | |
chatbot = gr.Chatbot(label="Conversation") | |
conv_input = gr.Textbox( | |
placeholder="Ask about Saudi Vision 2030...", | |
label="Your message" | |
) | |
with gr.Row(): | |
conv_button = gr.Button("Send") | |
reset_button = gr.Button("Reset Conversation") | |
conv_sources = gr.Textbox(label="Sources Used") | |
# Sample evaluation event handlers | |
def handle_sample_selection(selection): | |
if not selection: | |
return "", "", "", "", "", "", "" | |
# Extract index from the selection string | |
try: | |
index = int(selection.split(".")[0]) - 1 | |
query, response, reference, metrics, sources, category, language = run_evaluation_on_sample(assistant, index) | |
sources_str = ", ".join(sources) | |
return query, response, reference, metrics, sources_str, category, language | |
except Exception as e: | |
print(f"Error in handle_sample_selection: {e}") | |
import traceback | |
traceback.print_exc() | |
return f"Error processing selection: {e}", "", "", {}, "", "", "" | |
eval_button.click( | |
handle_sample_selection, | |
inputs=[sample_dropdown], | |
outputs=[sample_query, sample_response, sample_reference, metrics_display, | |
sample_sources, sample_category, sample_language] | |
) | |
sample_dropdown.change( | |
handle_sample_selection, | |
inputs=[sample_dropdown], | |
outputs=[sample_query, sample_response, sample_reference, metrics_display, | |
sample_sources, sample_category, sample_language] | |
) | |
# Custom evaluation event handlers | |
def handle_custom_evaluation(query, reference): | |
if not query: | |
return "Please enter a query", "", {} | |
# Reset conversation to ensure clean state | |
assistant.reset_conversation() | |
# Get response | |
response, sources, _ = assistant.answer(query) | |
sources_str = ", ".join(sources) | |
# Evaluate if reference is provided | |
metrics = {} | |
if reference: | |
metrics = evaluate_response(query, response, reference) | |
return response, sources_str, metrics | |
custom_eval_button.click( | |
handle_custom_evaluation, | |
inputs=[custom_query, custom_reference], | |
outputs=[custom_response, custom_sources, custom_metrics] | |
) | |
# Conversation mode event handlers | |
def handle_conversation(message, history): | |
if not message: | |
return history, "", "" | |
# Get response | |
response, sources, _ = assistant.answer(message) | |
sources_str = ", ".join(sources) | |
# Update history | |
history = history + [[message, response]] | |
return history, "", sources_str | |
def reset_conv(): | |
result = assistant.reset_conversation() | |
return [], result, "" | |
conv_button.click( | |
handle_conversation, | |
inputs=[conv_input, chatbot], | |
outputs=[chatbot, conv_input, conv_sources] | |
) | |
reset_button.click( | |
reset_conv, | |
inputs=[], | |
outputs=[chatbot, conv_input, conv_sources] | |
) | |
return interface | |
# Main function to run in Hugging Face Space | |
def main(): | |
# Start with a debugging report | |
print("=" * 50) | |
print("SYSTEM INITIALIZATION") | |
print("=" * 50) | |
print("Current directory:", os.getcwd()) | |
print("Files in directory:", os.listdir(".")) | |
print("=" * 50) | |
# Check for SentencePiece | |
try: | |
import sentencepiece | |
print("SentencePiece is installed: ✓") | |
except ImportError: | |
print("WARNING: SentencePiece is NOT installed! This will cause errors with the tokenizer.") | |
# Initialize the system with simplified error handling | |
try: | |
# First create a very simple Gradio interface to show we're starting | |
with gr.Blocks(title="Vision 2030 Assistant - Starting") as loading_interface: | |
gr.Markdown("# Vision 2030 Assistant") | |
gr.Markdown("System is initializing. This may take a few minutes...") | |
status = gr.Textbox(value="Loading resources...", label="Status") | |
with gr.Blocks(title="Vision 2030 Assistant - Model Loading") as model_interface: | |
gr.Markdown("# Vision 2030 Assistant - Loading Model") | |
gr.Markdown("The system is now loading the ALLaM-7B model. This may take several minutes.") | |
status = gr.Textbox(value="Loading model...", label="Status") | |
# Now try the actual initialization | |
try: | |
print("Starting system initialization...") | |
assistant = initialize_system() | |
print("Creating interface...") | |
interface = qualitative_evaluation_interface(assistant) | |
print("Launching interface...") | |
return interface | |
except ImportError as e: | |
print(f"Import error during initialization: {e}") | |
# Create a simple error interface specifically for SentencePiece errors | |
if "SentencePiece" in str(e): | |
with gr.Blocks(title="Vision 2030 Assistant - SentencePiece Error") as sp_error: | |
gr.Markdown("# Vision 2030 Assistant - SentencePiece Error") | |
gr.Markdown("The model requires the SentencePiece library which is missing.") | |
gr.Markdown(""" | |
## How to Fix: | |
Add these lines to your `requirements.txt` file: | |
``` | |
sentencepiece>=0.1.95 | |
protobuf>=3.20.0 | |
``` | |
Then rebuild your Hugging Face Space. | |
""") | |
return sp_error | |
else: | |
# For other import errors | |
with gr.Blocks(title="Vision 2030 Assistant - Import Error") as import_error: | |
gr.Markdown("# Vision 2030 Assistant - Import Error") | |
gr.Markdown(f"An import error occurred: {str(e)}") | |
# Display possible solutions | |
gr.Markdown(""" | |
## Possible solutions: | |
Check your `requirements.txt` file for missing dependencies. | |
""") | |
return import_error | |
except Exception as e: | |
print(f"Error during initialization: {e}") | |
import traceback | |
traceback.print_exc() | |
# Create a general error interface | |
with gr.Blocks(title="Vision 2030 Assistant - Error") as debug_interface: | |
gr.Markdown("# Vision 2030 Assistant - Initialization Error") | |
gr.Markdown("There was an error initializing the assistant.") | |
# Display error details | |
gr.Textbox( | |
value=f"Error: {str(e)}", | |
label="Error Details", | |
lines=5 | |
) | |
# Show file system status | |
files_list = "\n".join(os.listdir(".")) | |
gr.Textbox( | |
value=files_list, | |
label="Files in Directory", | |
lines=10 | |
) | |
# Add a button to check PDFs | |
def check_pdfs(): | |
result = [] | |
for pdf_file in ["saudi_vision203.pdf", "saudi_vision2030_ar.pdf"]: | |
if os.path.exists(pdf_file): | |
size = os.path.getsize(pdf_file) / (1024 * 1024) # Size in MB | |
result.append(f"{pdf_file}: Found ({size:.2f} MB)") | |
else: | |
result.append(f"{pdf_file}: Not found") | |
return "\n".join(result) | |
check_btn = gr.Button("Check PDF Files") | |
pdf_status = gr.Textbox(label="PDF Status", lines=3) | |
check_btn.click(check_pdfs, inputs=[], outputs=[pdf_status]) | |
return debug_interface | |
except Exception as e: | |
print(f"Critical error: {e}") | |
with gr.Blocks(title="Vision 2030 Assistant - Critical Error") as critical_error: | |
gr.Markdown("# Vision 2030 Assistant - Critical Error") | |
gr.Markdown(f"A critical error occurred: {str(e)}") | |
# Display stacktrace | |
import traceback | |
trace = traceback.format_exc() | |
gr.Textbox( | |
value=trace, | |
label="Error Traceback", | |
lines=15 | |
) | |
return critical_error | |
if __name__ == "__main__": | |
demo = main() | |
demo.launch() |