Spaces:
Running
on
Zero
Running
on
Zero
#!/usr/bin/env python3 | |
""" | |
Just search - A Smart Search Agent using Menlo/Lucy-128k | |
Part of the Just, AKA Simple series | |
Built with Gradio, DuckDuckGo Search, and Hugging Face Transformers | |
""" | |
import gradio as gr | |
import torch | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
from duckduckgo_search import DDGS | |
import json | |
import re | |
import time | |
from typing import List, Dict, Tuple | |
import spaces | |
# Initialize the model and tokenizer globally for efficiency | |
MODEL_NAME = "Menlo/Lucy-128k" | |
tokenizer = None | |
model = None | |
search_pipeline = None | |
def initialize_model(): | |
"""Initialize the Menlo/Lucy-128k model and tokenizer""" | |
global tokenizer, model, search_pipeline | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
torch_dtype=torch.float16, | |
device_map="auto", | |
trust_remote_code=True | |
) | |
search_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
torch_dtype=torch.float16, | |
device_map="auto", | |
max_new_tokens=2048, | |
temperature=0.7, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
return True | |
except Exception as e: | |
print(f"Error initializing model: {e}") | |
return False | |
def extract_thinking_and_response(text: str) -> Tuple[str, str]: | |
"""Extract thinking process and clean response from AI output""" | |
thinking = "" | |
response = text | |
# Extract thinking content | |
thinking_match = re.search(r'<think>(.*?)</think>', text, re.DOTALL) | |
if thinking_match: | |
thinking = thinking_match.group(1).strip() | |
response = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL) | |
# Clean up the response | |
response = re.sub(r'^(Assistant:|AI:|Response:|Answer:)\s*', '', response.strip()) | |
response = re.sub(r'\[INST\].*?\[\/INST\]', '', response) | |
response = re.sub(r'<\|.*?\|>', '', response) | |
return thinking.strip(), response.strip() | |
def clean_response(text: str) -> str: | |
"""Clean up the AI response to extract just the relevant content""" | |
_, response = extract_thinking_and_response(text) | |
return response | |
def generate_search_queries(user_query: str) -> Tuple[List[str], str]: | |
"""Generate multiple search queries based on user input using AI""" | |
prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
You are a search query generator. Given a user's question, generate 3-5 different search queries that would help find comprehensive information to answer their question. Return only the search queries, one per line, without numbering or bullet points. | |
Example: | |
User: "What are the latest developments in AI?" | |
latest AI developments 2024 | |
artificial intelligence breakthroughs recent | |
AI technology advances news | |
machine learning innovations 2024 | |
<|eot_id|><|start_header_id|>user<|end_header_id|> | |
{user_query} | |
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
try: | |
response = search_pipeline(prompt, max_new_tokens=200, temperature=0.3) | |
generated_text = response[0]['generated_text'] | |
# Extract just the assistant's response | |
assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
thinking, cleaned_response = extract_thinking_and_response(assistant_response) | |
# Split into individual queries and clean them | |
queries = [q.strip() for q in cleaned_response.split('\n') if q.strip()] | |
# Filter out any non-query text | |
queries = [q for q in queries if len(q) > 5 and not q.startswith('Note:') and not q.startswith('Example:')] | |
return queries[:5], thinking # Return max 5 queries and thinking | |
except Exception as e: | |
print(f"Error generating queries: {e}") | |
# Fallback to simple query variations | |
return [user_query, f"{user_query} 2024", f"{user_query} latest"], "" | |
def search_web(queries: List[str]) -> List[Dict]: | |
"""Search the web using DuckDuckGo with multiple queries""" | |
all_results = [] | |
ddgs = DDGS() | |
for query in queries: | |
try: | |
results = ddgs.text(query, max_results=5, region='wt-wt', safesearch='moderate') | |
for result in results: | |
result['search_query'] = query | |
all_results.append(result) | |
time.sleep(0.5) # Rate limiting | |
except Exception as e: | |
print(f"Error searching for '{query}': {e}") | |
continue | |
# Remove duplicates based on URL | |
seen_urls = set() | |
unique_results = [] | |
for result in all_results: | |
if result['href'] not in seen_urls: | |
seen_urls.add(result['href']) | |
unique_results.append(result) | |
return unique_results[:15] # Return max 15 results | |
def filter_relevant_results(user_query: str, search_results: List[Dict]) -> Tuple[List[Dict], str]: | |
"""Use AI to filter and rank search results by relevance""" | |
if not search_results: | |
return [], "" | |
# Prepare results summary for AI | |
results_text = "" | |
for i, result in enumerate(search_results[:12]): # Limit to avoid token overflow | |
results_text += f"{i+1}. Title: {result.get('title', 'No title')}\n" | |
results_text += f" URL: {result.get('href', 'No URL')}\n" | |
results_text += f" Snippet: {result.get('body', 'No description')[:200]}...\n\n" | |
prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
You are a search result evaluator. Given a user's question and search results, identify which results are most relevant and helpful for answering the question. | |
Return only the numbers of the most relevant results (1-5 results maximum), separated by commas. Consider: | |
- Direct relevance to the question | |
- Credibility of the source | |
- Recency of information | |
- Comprehensiveness of content | |
Example response: 1, 3, 7 | |
<|eot_id|><|start_header_id|>user<|end_header_id|> | |
Question: {user_query} | |
Search Results: | |
{results_text} | |
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
try: | |
response = search_pipeline(prompt, max_new_tokens=100, temperature=0.1) | |
generated_text = response[0]['generated_text'] | |
# Extract assistant's response | |
assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
thinking, cleaned_response = extract_thinking_and_response(assistant_response) | |
# Extract numbers | |
numbers = re.findall(r'\d+', cleaned_response) | |
selected_indices = [int(n) - 1 for n in numbers if int(n) <= len(search_results)] | |
return [search_results[i] for i in selected_indices if 0 <= i < len(search_results)][:5], thinking | |
except Exception as e: | |
print(f"Error filtering results: {e}") | |
return search_results[:5], "" # Fallback to first 5 results | |
def generate_final_answer(user_query: str, selected_results: List[Dict]) -> Tuple[str, str]: | |
"""Generate final answer based on selected search results""" | |
if not selected_results: | |
return "I couldn't find relevant information to answer your question. Please try rephrasing your query.", "" | |
# Prepare context from selected results | |
context = "" | |
for i, result in enumerate(selected_results): | |
context += f"Source {i+1}: {result.get('title', 'Unknown')}\n" | |
context += f"Content: {result.get('body', 'No content available')}\n" | |
context += f"URL: {result.get('href', 'No URL')}\n\n" | |
prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
You are a helpful research assistant. Based on the provided search results, give a comprehensive answer to the user's question. | |
Guidelines: | |
- Synthesize information from multiple sources | |
- Be accurate and factual | |
- Cite sources when possible | |
- If information is conflicting, mention it | |
- Keep the answer well-structured and easy to read | |
- Include relevant URLs for further reading | |
<|eot_id|><|start_header_id|>user<|end_header_id|> | |
Question: {user_query} | |
Search Results: | |
{context} | |
Please provide a comprehensive answer based on these sources. | |
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
try: | |
response = search_pipeline(prompt, max_new_tokens=1024, temperature=0.2) | |
generated_text = response[0]['generated_text'] | |
# Extract assistant's response | |
assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
thinking, answer = extract_thinking_and_response(assistant_response) | |
return answer, thinking | |
except Exception as e: | |
print(f"Error generating final answer: {e}") | |
return "I encountered an error while processing the search results. Please try again.", "" | |
def search_agent_workflow(user_query: str, progress=gr.Progress()) -> Tuple[str, str, str]: | |
"""Main workflow that orchestrates the search agent""" | |
if not user_query.strip(): | |
return "Please enter a search query.", "", "" | |
progress(0.1, desc="Initializing...") | |
all_thinking = [] | |
# Step 1: Generate search queries | |
progress(0.2, desc="Generating search queries...") | |
queries, thinking1 = generate_search_queries(user_query) | |
if thinking1: | |
all_thinking.append(f"**Query Generation:**\n{thinking1}") | |
queries_text = "Generated queries:\n" + "\n".join(f"β’ {q}" for q in queries) | |
# Step 2: Search the web | |
progress(0.4, desc="Searching the web...") | |
search_results = search_web(queries) | |
if not search_results: | |
return "No search results found. Please try a different query.", queries_text, "\n\n".join(all_thinking) | |
# Step 3: Filter relevant results | |
progress(0.6, desc="Filtering relevant results...") | |
relevant_results, thinking2 = filter_relevant_results(user_query, search_results) | |
if thinking2: | |
all_thinking.append(f"**Result Filtering:**\n{thinking2}") | |
# Step 4: Generate final answer | |
progress(0.8, desc="Generating comprehensive answer...") | |
final_answer, thinking3 = generate_final_answer(user_query, relevant_results) | |
if thinking3: | |
all_thinking.append(f"**Answer Generation:**\n{thinking3}") | |
progress(1.0, desc="Complete!") | |
# Prepare debug info | |
debug_info = f"{queries_text}\n\nSelected {len(relevant_results)} relevant sources:\n" | |
for i, result in enumerate(relevant_results): | |
debug_info += f"{i+1}. {result.get('title', 'No title')} - {result.get('href', 'No URL')}\n" | |
thinking_display = "\n\n".join(all_thinking) if all_thinking else "No thinking process recorded." | |
return final_answer, debug_info, thinking_display | |
# Custom CSS for dark blue theme and mobile responsiveness | |
custom_css = """ | |
/* Dark blue theme */ | |
:root { | |
--primary-bg: #0a1628; | |
--secondary-bg: #1e3a5f; | |
--accent-bg: #2563eb; | |
--text-primary: #f8fafc; | |
--text-secondary: #cbd5e1; | |
--border-color: #334155; | |
--input-bg: #1e293b; | |
--button-bg: #3b82f6; | |
--button-hover: #2563eb; | |
} | |
/* Global styles */ | |
.gradio-container { | |
background: linear-gradient(135deg, var(--primary-bg) 0%, var(--secondary-bg) 100%) !important; | |
color: var(--text-primary) !important; | |
font-family: 'Inter', 'Segoe UI', system-ui, sans-serif !important; | |
} | |
/* Mobile responsiveness */ | |
@media (max-width: 768px) { | |
.gradio-container { | |
padding: 10px !important; | |
} | |
.gr-form { | |
gap: 15px !important; | |
} | |
.gr-button { | |
font-size: 16px !important; | |
padding: 12px 20px !important; | |
} | |
} | |
/* Input styling */ | |
.gr-textbox textarea, .gr-textbox input { | |
background: var(--input-bg) !important; | |
border: 1px solid var(--border-color) !important; | |
color: var(--text-primary) !important; | |
border-radius: 8px !important; | |
} | |
/* Button styling */ | |
.gr-button { | |
background: linear-gradient(135deg, var(--button-bg) 0%, var(--accent-bg) 100%) !important; | |
color: white !important; | |
border: none !important; | |
border-radius: 8px !important; | |
font-weight: 600 !important; | |
transition: all 0.3s ease !important; | |
} | |
.gr-button:hover { | |
background: linear-gradient(135deg, var(--button-hover) 0%, var(--button-bg) 100%) !important; | |
transform: translateY(-1px) !important; | |
box-shadow: 0 4px 12px rgba(59, 130, 246, 0.3) !important; | |
} | |
/* Output styling */ | |
.gr-markdown, .gr-textbox { | |
background: var(--input-bg) !important; | |
border: 1px solid var(--border-color) !important; | |
border-radius: 8px !important; | |
color: var(--text-primary) !important; | |
} | |
/* Header styling */ | |
.gr-markdown h1 { | |
color: var(--accent-bg) !important; | |
text-align: center !important; | |
margin-bottom: 20px !important; | |
font-size: 2.5rem !important; | |
font-weight: 700 !important; | |
} | |
/* Thinking section styling */ | |
#thinking-output { | |
background: var(--secondary-bg) !important; | |
border: 1px solid var(--border-color) !important; | |
border-radius: 8px !important; | |
padding: 15px !important; | |
font-family: 'Fira Code', 'Monaco', monospace !important; | |
font-size: 0.9rem !important; | |
line-height: 1.4 !important; | |
} | |
/* Loading animation */ | |
.gr-loading { | |
background: var(--secondary-bg) !important; | |
border-radius: 8px !important; | |
} | |
/* Scrollbar styling */ | |
::-webkit-scrollbar { | |
width: 8px; | |
} | |
::-webkit-scrollbar-track { | |
background: var(--primary-bg); | |
} | |
::-webkit-scrollbar-thumb { | |
background: var(--accent-bg); | |
border-radius: 4px; | |
} | |
::-webkit-scrollbar-thumb:hover { | |
background: var(--button-hover); | |
} | |
""" | |
def create_interface(): | |
"""Create the Gradio interface""" | |
with gr.Blocks( | |
theme=gr.themes.Base( | |
primary_hue="blue", | |
secondary_hue="slate", | |
neutral_hue="slate", | |
text_size="lg", | |
spacing_size="lg", | |
radius_size="md" | |
), | |
css=custom_css, | |
title="Just search - AI Search Agent", | |
head="<meta name='viewport' content='width=device-width, initial-scale=1.0'>" | |
) as interface: | |
gr.Markdown("# π Just search", elem_id="header") | |
gr.Markdown( | |
"*Part of the Just, AKA Simple series*\n\n" | |
"**Intelligent search agent powered by Menlo/Lucy-128k**\n\n" | |
"Ask any question and get comprehensive answers from the web.", | |
elem_id="description" | |
) | |
with gr.Row(): | |
with gr.Column(scale=4): | |
query_input = gr.Textbox( | |
label="Your Question", | |
placeholder="Ask me anything... (e.g., 'What are the latest developments in AI?')", | |
lines=2, | |
elem_id="query-input" | |
) | |
with gr.Column(scale=1): | |
search_btn = gr.Button( | |
"π Search", | |
variant="primary", | |
size="lg", | |
elem_id="search-button" | |
) | |
with gr.Row(): | |
answer_output = gr.Markdown( | |
label="Answer", | |
elem_id="answer-output", | |
height=400 | |
) | |
with gr.Accordion("π€ AI Thinking Process", open=False): | |
thinking_output = gr.Markdown( | |
label="Model's Chain of Thought", | |
elem_id="thinking-output", | |
height=300 | |
) | |
with gr.Accordion("π§ Debug Info", open=False): | |
debug_output = gr.Textbox( | |
label="Search Process Details", | |
lines=8, | |
elem_id="debug-output" | |
) | |
# Event handlers | |
search_btn.click( | |
fn=search_agent_workflow, | |
inputs=[query_input], | |
outputs=[answer_output, debug_output, thinking_output], | |
show_progress=True | |
) | |
query_input.submit( | |
fn=search_agent_workflow, | |
inputs=[query_input], | |
outputs=[answer_output, debug_output, thinking_output], | |
show_progress=True | |
) | |
# Example queries | |
gr.Examples( | |
examples=[ | |
["What are the latest breakthroughs in quantum computing?"], | |
["How does climate change affect ocean currents?"], | |
["What are the best practices for sustainable agriculture?"], | |
["Explain the recent developments in renewable energy technology"], | |
["What are the health benefits of the Mediterranean diet?"] | |
], | |
inputs=query_input, | |
outputs=[answer_output, debug_output, thinking_output], | |
fn=search_agent_workflow, | |
cache_examples=False | |
) | |
gr.Markdown( | |
"---\n**Note:** This search agent generates multiple queries, searches the web, " | |
"filters results for relevance, and provides comprehensive answers. " | |
"Results are sourced from DuckDuckGo search." | |
) | |
return interface | |
def main(): | |
"""Main function to initialize and launch the app""" | |
print("π Initializing Just search...") | |
# Initialize the model | |
if not initialize_model(): | |
print("β Failed to initialize model. Please check your setup.") | |
return | |
print("β Model initialized successfully!") | |
print("π Creating interface...") | |
# Create and launch the interface | |
interface = create_interface() | |
print("π Just search is ready!") | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
show_error=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() |