#!/usr/bin/env python3 """ Just search - A Smart Search Agent using Menlo/Lucy-128k Part of the Just, AKA Simple series Built with Gradio, DuckDuckGo Search, and Hugging Face Transformers """ import gradio as gr import torch from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from duckduckgo_search import DDGS import json import re import time from typing import List, Dict, Tuple import spaces # Initialize the model and tokenizer globally for efficiency MODEL_NAME = "Menlo/Lucy-128k" tokenizer = None model = None search_pipeline = None def initialize_model(): """Initialize the Menlo/Lucy-128k model and tokenizer""" global tokenizer, model, search_pipeline try: tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, torch_dtype=torch.float16, device_map="auto", trust_remote_code=True ) search_pipeline = pipeline( "text-generation", model=model, tokenizer=tokenizer, torch_dtype=torch.float16, device_map="auto", max_new_tokens=2048, temperature=0.7, do_sample=True, pad_token_id=tokenizer.eos_token_id ) return True except Exception as e: print(f"Error initializing model: {e}") return False def extract_thinking_and_response(text: str) -> Tuple[str, str]: """Extract thinking process and clean response from AI output""" thinking = "" response = text # Extract thinking content thinking_match = re.search(r'(.*?)', text, re.DOTALL) if thinking_match: thinking = thinking_match.group(1).strip() response = re.sub(r'.*?', '', text, flags=re.DOTALL) # Clean up the response response = re.sub(r'^(Assistant:|AI:|Response:|Answer:)\s*', '', response.strip()) response = re.sub(r'\[INST\].*?\[\/INST\]', '', response) response = re.sub(r'<\|.*?\|>', '', response) return thinking.strip(), response.strip() def clean_response(text: str) -> str: """Clean up the AI response to extract just the relevant content""" _, response = extract_thinking_and_response(text) return response @spaces.GPU def generate_search_queries(user_query: str) -> Tuple[List[str], str]: """Generate multiple search queries based on user input using AI""" prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a search query generator. Given a user's question, generate 3-5 different search queries that would help find comprehensive information to answer their question. Return only the search queries, one per line, without numbering or bullet points. Example: User: "What are the latest developments in AI?" latest AI developments 2024 artificial intelligence breakthroughs recent AI technology advances news machine learning innovations 2024 <|eot_id|><|start_header_id|>user<|end_header_id|> {user_query} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" try: response = search_pipeline(prompt, max_new_tokens=200, temperature=0.3) generated_text = response[0]['generated_text'] # Extract just the assistant's response assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] thinking, cleaned_response = extract_thinking_and_response(assistant_response) # Split into individual queries and clean them queries = [q.strip() for q in cleaned_response.split('\n') if q.strip()] # Filter out any non-query text queries = [q for q in queries if len(q) > 5 and not q.startswith('Note:') and not q.startswith('Example:')] return queries[:5], thinking # Return max 5 queries and thinking except Exception as e: print(f"Error generating queries: {e}") # Fallback to simple query variations return [user_query, f"{user_query} 2024", f"{user_query} latest"], "" def search_web(queries: List[str]) -> List[Dict]: """Search the web using DuckDuckGo with multiple queries""" all_results = [] ddgs = DDGS() for query in queries: try: results = ddgs.text(query, max_results=5, region='wt-wt', safesearch='moderate') for result in results: result['search_query'] = query all_results.append(result) time.sleep(0.5) # Rate limiting except Exception as e: print(f"Error searching for '{query}': {e}") continue # Remove duplicates based on URL seen_urls = set() unique_results = [] for result in all_results: if result['href'] not in seen_urls: seen_urls.add(result['href']) unique_results.append(result) return unique_results[:15] # Return max 15 results @spaces.GPU def filter_relevant_results(user_query: str, search_results: List[Dict]) -> Tuple[List[Dict], str]: """Use AI to filter and rank search results by relevance""" if not search_results: return [], "" # Prepare results summary for AI results_text = "" for i, result in enumerate(search_results[:12]): # Limit to avoid token overflow results_text += f"{i+1}. Title: {result.get('title', 'No title')}\n" results_text += f" URL: {result.get('href', 'No URL')}\n" results_text += f" Snippet: {result.get('body', 'No description')[:200]}...\n\n" prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a search result evaluator. Given a user's question and search results, identify which results are most relevant and helpful for answering the question. Return only the numbers of the most relevant results (1-5 results maximum), separated by commas. Consider: - Direct relevance to the question - Credibility of the source - Recency of information - Comprehensiveness of content Example response: 1, 3, 7 <|eot_id|><|start_header_id|>user<|end_header_id|> Question: {user_query} Search Results: {results_text} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" try: response = search_pipeline(prompt, max_new_tokens=100, temperature=0.1) generated_text = response[0]['generated_text'] # Extract assistant's response assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] thinking, cleaned_response = extract_thinking_and_response(assistant_response) # Extract numbers numbers = re.findall(r'\d+', cleaned_response) selected_indices = [int(n) - 1 for n in numbers if int(n) <= len(search_results)] return [search_results[i] for i in selected_indices if 0 <= i < len(search_results)][:5], thinking except Exception as e: print(f"Error filtering results: {e}") return search_results[:5], "" # Fallback to first 5 results @spaces.GPU def generate_final_answer(user_query: str, selected_results: List[Dict]) -> Tuple[str, str]: """Generate final answer based on selected search results""" if not selected_results: return "I couldn't find relevant information to answer your question. Please try rephrasing your query.", "" # Prepare context from selected results context = "" for i, result in enumerate(selected_results): context += f"Source {i+1}: {result.get('title', 'Unknown')}\n" context += f"Content: {result.get('body', 'No content available')}\n" context += f"URL: {result.get('href', 'No URL')}\n\n" prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a helpful research assistant. Based on the provided search results, give a comprehensive answer to the user's question. Guidelines: - Synthesize information from multiple sources - Be accurate and factual - Cite sources when possible - If information is conflicting, mention it - Keep the answer well-structured and easy to read - Include relevant URLs for further reading <|eot_id|><|start_header_id|>user<|end_header_id|> Question: {user_query} Search Results: {context} Please provide a comprehensive answer based on these sources. <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" try: response = search_pipeline(prompt, max_new_tokens=1024, temperature=0.2) generated_text = response[0]['generated_text'] # Extract assistant's response assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] thinking, answer = extract_thinking_and_response(assistant_response) return answer, thinking except Exception as e: print(f"Error generating final answer: {e}") return "I encountered an error while processing the search results. Please try again.", "" def search_agent_workflow(user_query: str, progress=gr.Progress()) -> Tuple[str, str, str]: """Main workflow that orchestrates the search agent""" if not user_query.strip(): return "Please enter a search query.", "", "" progress(0.1, desc="Initializing...") all_thinking = [] # Step 1: Generate search queries progress(0.2, desc="Generating search queries...") queries, thinking1 = generate_search_queries(user_query) if thinking1: all_thinking.append(f"**Query Generation:**\n{thinking1}") queries_text = "Generated queries:\n" + "\n".join(f"• {q}" for q in queries) # Step 2: Search the web progress(0.4, desc="Searching the web...") search_results = search_web(queries) if not search_results: return "No search results found. Please try a different query.", queries_text, "\n\n".join(all_thinking) # Step 3: Filter relevant results progress(0.6, desc="Filtering relevant results...") relevant_results, thinking2 = filter_relevant_results(user_query, search_results) if thinking2: all_thinking.append(f"**Result Filtering:**\n{thinking2}") # Step 4: Generate final answer progress(0.8, desc="Generating comprehensive answer...") final_answer, thinking3 = generate_final_answer(user_query, relevant_results) if thinking3: all_thinking.append(f"**Answer Generation:**\n{thinking3}") progress(1.0, desc="Complete!") # Prepare debug info debug_info = f"{queries_text}\n\nSelected {len(relevant_results)} relevant sources:\n" for i, result in enumerate(relevant_results): debug_info += f"{i+1}. {result.get('title', 'No title')} - {result.get('href', 'No URL')}\n" thinking_display = "\n\n".join(all_thinking) if all_thinking else "No thinking process recorded." return final_answer, debug_info, thinking_display # Custom CSS for dark blue theme and mobile responsiveness custom_css = """ /* Dark blue theme */ :root { --primary-bg: #0a1628; --secondary-bg: #1e3a5f; --accent-bg: #2563eb; --text-primary: #f8fafc; --text-secondary: #cbd5e1; --border-color: #334155; --input-bg: #1e293b; --button-bg: #3b82f6; --button-hover: #2563eb; } /* Global styles */ .gradio-container { background: linear-gradient(135deg, var(--primary-bg) 0%, var(--secondary-bg) 100%) !important; color: var(--text-primary) !important; font-family: 'Inter', 'Segoe UI', system-ui, sans-serif !important; } /* Mobile responsiveness */ @media (max-width: 768px) { .gradio-container { padding: 10px !important; } .gr-form { gap: 15px !important; } .gr-button { font-size: 16px !important; padding: 12px 20px !important; } } /* Input styling */ .gr-textbox textarea, .gr-textbox input { background: var(--input-bg) !important; border: 1px solid var(--border-color) !important; color: var(--text-primary) !important; border-radius: 8px !important; } /* Button styling */ .gr-button { background: linear-gradient(135deg, var(--button-bg) 0%, var(--accent-bg) 100%) !important; color: white !important; border: none !important; border-radius: 8px !important; font-weight: 600 !important; transition: all 0.3s ease !important; } .gr-button:hover { background: linear-gradient(135deg, var(--button-hover) 0%, var(--button-bg) 100%) !important; transform: translateY(-1px) !important; box-shadow: 0 4px 12px rgba(59, 130, 246, 0.3) !important; } /* Output styling */ .gr-markdown, .gr-textbox { background: var(--input-bg) !important; border: 1px solid var(--border-color) !important; border-radius: 8px !important; color: var(--text-primary) !important; } /* Header styling */ .gr-markdown h1 { color: var(--accent-bg) !important; text-align: center !important; margin-bottom: 20px !important; font-size: 2.5rem !important; font-weight: 700 !important; } /* Thinking section styling */ #thinking-output { background: var(--secondary-bg) !important; border: 1px solid var(--border-color) !important; border-radius: 8px !important; padding: 15px !important; font-family: 'Fira Code', 'Monaco', monospace !important; font-size: 0.9rem !important; line-height: 1.4 !important; } /* Loading animation */ .gr-loading { background: var(--secondary-bg) !important; border-radius: 8px !important; } /* Scrollbar styling */ ::-webkit-scrollbar { width: 8px; } ::-webkit-scrollbar-track { background: var(--primary-bg); } ::-webkit-scrollbar-thumb { background: var(--accent-bg); border-radius: 4px; } ::-webkit-scrollbar-thumb:hover { background: var(--button-hover); } """ def create_interface(): """Create the Gradio interface""" with gr.Blocks( theme=gr.themes.Base( primary_hue="blue", secondary_hue="slate", neutral_hue="slate", text_size="lg", spacing_size="lg", radius_size="md" ), css=custom_css, title="Just search - AI Search Agent", head="" ) as interface: gr.Markdown("# 🔍 Just search", elem_id="header") gr.Markdown( "*Part of the Just, AKA Simple series*\n\n" "**Intelligent search agent powered by Menlo/Lucy-128k**\n\n" "Ask any question and get comprehensive answers from the web.", elem_id="description" ) with gr.Row(): with gr.Column(scale=4): query_input = gr.Textbox( label="Your Question", placeholder="Ask me anything... (e.g., 'What are the latest developments in AI?')", lines=2, elem_id="query-input" ) with gr.Column(scale=1): search_btn = gr.Button( "🔎 Search", variant="primary", size="lg", elem_id="search-button" ) with gr.Row(): answer_output = gr.Markdown( label="Answer", elem_id="answer-output", height=400 ) with gr.Accordion("🤔 AI Thinking Process", open=False): thinking_output = gr.Markdown( label="Model's Chain of Thought", elem_id="thinking-output", height=300 ) with gr.Accordion("🔧 Debug Info", open=False): debug_output = gr.Textbox( label="Search Process Details", lines=8, elem_id="debug-output" ) # Event handlers search_btn.click( fn=search_agent_workflow, inputs=[query_input], outputs=[answer_output, debug_output, thinking_output], show_progress=True ) query_input.submit( fn=search_agent_workflow, inputs=[query_input], outputs=[answer_output, debug_output, thinking_output], show_progress=True ) # Example queries gr.Examples( examples=[ ["What are the latest breakthroughs in quantum computing?"], ["How does climate change affect ocean currents?"], ["What are the best practices for sustainable agriculture?"], ["Explain the recent developments in renewable energy technology"], ["What are the health benefits of the Mediterranean diet?"] ], inputs=query_input, outputs=[answer_output, debug_output, thinking_output], fn=search_agent_workflow, cache_examples=False ) gr.Markdown( "---\n**Note:** This search agent generates multiple queries, searches the web, " "filters results for relevance, and provides comprehensive answers. " "Results are sourced from DuckDuckGo search." ) return interface def main(): """Main function to initialize and launch the app""" print("🚀 Initializing Just search...") # Initialize the model if not initialize_model(): print("❌ Failed to initialize model. Please check your setup.") return print("✅ Model initialized successfully!") print("🌐 Creating interface...") # Create and launch the interface interface = create_interface() print("🎉 Just search is ready!") interface.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True, debug=True ) if __name__ == "__main__": main()