laverdes's picture
fix: file_path logic
6a90ff1 verified
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import json
import copy
from basic_agent import ToolAgent
from tools import (
smart_read_file,
search_and_extract,
search_and_extract_from_wikipedia,
aggregate_information,
extract_clean_text_from_url,
youtube_search_tool,
load_youtube_transcript,
get_audio_from_youtube,
image_query_tool,
transcribe_audio,
)
TOOLS = [
smart_read_file,
search_and_extract,
search_and_extract_from_wikipedia,
aggregate_information,
extract_clean_text_from_url,
youtube_search_tool,
load_youtube_transcript,
get_audio_from_youtube,
image_query_tool,
transcribe_audio,
]
tool_names = [tool.name if hasattr(tool, "name") else str(tool) for tool in TOOLS]
print(json.dumps(tool_names, indent=2))
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
TOOL_USE_SYS_PROMPT = """
You are a helpful AI assistant operating in a structured reasoning and action loop using the ReAct pattern.
Your reasoning loop consists of:
- Question: the input task you must solve
- Thought: Reflect on the task and decide what to do next.
- Action: Choose one of the following actions:
- Solve it directly using your own knowledge
- Break the problem into smaller steps
- Use a tool to get more information
- Action Input: Provide input for the selected action
- Observation: Record the result of the action and/or aggregate information from previous observations (summarize, count, analyse, ...).
(Repeat Thought/Action/Action Input/Observation as needed)
Terminate your loop with:
- Thought: I now know the final answer
- Final Answer: [your best answer to the original question]
**General Execution Rules:**
- If you can answer using only your trained knowledge, do so directly without using tools.
- If the question involves image content, use the `image_query_tool`:
- Action: image_query_tool
- Action Input: 'image_path': [image_path], 'question': [user's question about the image]
**Tool Use Constraints:**
- Never use any tool more than **3 consecutive times** without either:
- Aggregating the information received so far: you can call the `summarize_search_results` tool and analyze the tool outputs to answer the question.
- If you need more information, use a different tool or break the problem down further, but do not return a final answer yet.
- Do not exceed **5 total calls** to *search-type tools* per query (such as `search_and_extract`, `search_and_extract_from_wikipedia`, `extract_clean_text_from_url`).
- Do not ask the user for additional clarification or input. Work with only what is already provided.
**If you are unable to answer:**
- If neither your knowledge nor tool outputs yield useful information:
- Use the output tools the best you can to answer the question, even if it's not perfect.
If not, say:
> Final Answer: I could not find any useful information to answer your query.
- If the question is unanswerable due to lack of input (e.g., missing attachment) or is fundamentally outside your scope, say:
> Final Answer: I don't have the ability to answer this query: [brief reason]
Always aim to provide the **best and most complete** answer based on your trained knowledge and the tools available.
"""
def run_and_submit_all( profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the ToolAgent on them, submits all answers,
and displays the results.
"""
space_id = os.getenv("SPACE_ID")
if profile:
username= f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None, gr.update(interactive=False)
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
files_url = "{}/files/{}" # GET /files/{task_id}
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent ( modify this part to create your agent)
try:
agent = ToolAgent(
tools=TOOLS,
backstory=TOOL_USE_SYS_PROMPT
)
agent.initialize()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None, gr.update(interactive=False)
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None, gr.update(interactive=False)
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None, gr.update(interactive=False)
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None, gr.update(interactive=False)
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None, gr.update(interactive=False)
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
question_level = item.get("Level")
question_file_name = item.get("file_name", None)
print("\nquestion level: ", question_level)
print("task_id: ", task_id)
print("question file_name: ", question_file_name)
if question_file_name:
file_url = files_url.format(api_url, task_id)
print("file_url: ", file_url)
file_response = requests.get(file_url, timeout=15)
file_response.raise_for_status()
print("file_response: ", file_response.content[0:50])
save_path = os.path.join("/tmp", question_file_name)
print("save_path: ", save_path)
with open(save_path, "wb") as f:
f.write(file_response.content)
print(f"✅ file saved to: {save_path}")
found= False
metadata = {}
for root, dirs, files in os.walk("/"):
if question_file_name in files:
file_path = os.path.join(root, question_file_name)
print("file found at: ", file_path)
metadata = {'image_path': file_path} if '.png' in question_file_name else {'file_path': file_path}
found=True
if question_file_name and not found:
print("FileNotFoundError: try making an api request to .files/ or ./static in the hf.space target (or check it manually first)")
break
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
q_data = {'query': question_text, 'metadata': metadata}
submitted_answer = agent(q_data) # todo: send more data (files)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log), gr.update(interactive=False)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
log_file_dict = copy.deepcopy(results_log)
log_file_dict.append({'result_data': result_data})
with open("results_log.json", "w") as results_session_file:
json.dump(log_file_dict, results_session_file)
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df, gr.update(interactive=True)
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df, gr.update(interactive=False)
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df, gr.update(interactive=False)
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df, gr.update(interactive=False)
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df, gr.update(interactive=False)
def download_log():
return "results_log.json"
# Gradio App
with gr.Blocks() as demo:
gr.Markdown("# Basic Agent Evaluation Runner")
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
download_button = gr.Button("Download Evaluation Log", interactive=False)
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table, download_button]
)
file_output = gr.File(label="Download Log File", visible=True)
download_button.click(
fn=download_log,
outputs=file_output
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Basic Agent Evaluation...")
demo.launch(debug=True, share=False)