Iamvincent commited on
Commit
f8df6da
·
verified ·
1 Parent(s): d49e11a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +272 -17
app.py CHANGED
@@ -1,6 +1,14 @@
 
 
 
 
 
 
 
 
 
1
  import os
2
  import gradio as gr
3
- import requests
4
  import inspect
5
  import pandas as pd
6
  from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel
@@ -9,27 +17,274 @@ from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel
9
  # --- Constants ---
10
  DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
11
 
12
- # --- Basic Agent Definition ---
13
- # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
14
- class BasicAgent:
15
- def __init__(self):
16
- print("BasicAgent initialized.")
17
 
18
- hf_token = os.getenv("HF_TOKEN")
19
- if not hf_token:
20
- raise ValueError("HF_TOKEN environment variable is not set.")
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  self.agent = CodeAgent(
23
- tools=[DuckDuckGoSearchTool()],
24
- model=HfApiModel(token=hf_token, model="microsoft/phi-2")
 
 
 
 
 
 
 
25
  )
 
 
 
 
 
 
 
26
 
27
- def __call__(self, question: str) -> str:
28
- print(f"Agent received question (first 50 chars): {question[:50]}...")
29
- fixed_answer = self.agent.run("You are a helpful assistant answering short factual questions. Answer the question: " + question, max_steps=1)
30
- print(fixed_answer)
31
- print(f"Agent returning fixed answer: {fixed_answer}")
32
- return fixed_answer
33
 
34
  def run_and_submit_all( profile: gr.OAuthProfile | None):
35
  """
 
1
+ from __future__ import annotations
2
+
3
+ from functools import lru_cache
4
+ from pathlib import Path
5
+ from typing import Optional, Union, List
6
+ import re
7
+ import tempfile
8
+ import requests
9
+ import urllib.parse as _urlparse
10
  import os
11
  import gradio as gr
 
12
  import inspect
13
  import pandas as pd
14
  from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel
 
17
  # --- Constants ---
18
  DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
19
 
 
 
 
 
 
20
 
 
 
 
21
 
22
+ # ‑‑‑ smol‑agents base imports (provided by the framework) ‑‑‑
23
+ from smol_agents import (
24
+ Tool,
25
+ PipelineTool,
26
+ CodeAgent,
27
+ DuckDuckGoSearchTool,
28
+ WikipediaSearchTool,
29
+ OpenAIServerModel,
30
+ )
31
+
32
+ # ---------------------------------------------------------------------------
33
+ # Speech‑to‑Text (OpenAI Whisper)
34
+ # ---------------------------------------------------------------------------
35
+ class SpeechToTextTool(PipelineTool):
36
+ """Transcribe *local* audio files via OpenAI Whisper (cached)."""
37
+
38
+ name = "transcriber"
39
+ description = (
40
+ "Send a local audio file to OpenAI Whisper (model **whisper‑1**) and "
41
+ "return the plain‑text transcript."
42
+ )
43
+ inputs = {
44
+ "audio": {
45
+ "type": "string",
46
+ "description": "Absolute or relative path to a local audio file.",
47
+ }
48
+ }
49
+ output_type = "string"
50
+
51
+ def __call__(self, audio: str) -> str: # noqa: D401
52
+ return self._transcribe(audio)
53
+
54
+ @staticmethod
55
+ @lru_cache(maxsize=64)
56
+ def _transcribe(audio_path: str) -> str:
57
+ path = Path(audio_path).expanduser().resolve()
58
+ if not path.is_file():
59
+ raise FileNotFoundError(f"No such audio file: {path}")
60
+
61
+ from openai import audio as _audio # late import
62
+
63
+ with path.open("rb") as fp:
64
+ resp = _audio.transcriptions.create(
65
+ file=fp,
66
+ model="whisper-1",
67
+ response_format="text",
68
+ )
69
+ return resp
70
+
71
+
72
+ # ---------------------------------------------------------------------------
73
+ # Excel → Markdown helper
74
+ # ---------------------------------------------------------------------------
75
+ class ExcelToTextTool(Tool):
76
+ """Render an Excel worksheet as a Markdown table (GitHub flavour)."""
77
+
78
+ name = "excel_to_text"
79
+ description = (
80
+ "Convert an Excel sheet to Markdown. Accepts sheet name *or* index "
81
+ "(as string). Returns a GitHub‑style table without index column."
82
+ )
83
+
84
+ inputs = {
85
+ "excel_path": {
86
+ "type": "string",
87
+ "description": "Path to the Excel file (.xlsx / .xls).",
88
+ },
89
+ "sheet_name": {
90
+ "type": "string",
91
+ "nullable": True,
92
+ "description": (
93
+ "Worksheet name or 0‑based index *as string* (optional; "
94
+ "default=first sheet)."
95
+ ),
96
+ },
97
+ }
98
+ output_type = "string"
99
+
100
+ @lru_cache(maxsize=32)
101
+ def forward(self, excel_path: str, sheet_name: Optional[str] = None) -> str: # type: ignore[override]
102
+ path = Path(excel_path).expanduser().resolve()
103
+ if not path.is_file():
104
+ return f"Error: Excel file not found at {path}"
105
+
106
+ import importlib.util as _imp
107
+ if not _imp.find_spec("pandas"):
108
+ return "Error: pandas library not available in this environment."
109
+ import pandas as pd
110
+
111
+ try:
112
+ sheet: Union[int, str] = 0
113
+ if sheet_name and sheet_name.strip():
114
+ sheet = int(sheet_name) if sheet_name.isdigit() else sheet_name
115
+ df = pd.read_excel(path, sheet_name=sheet)
116
+ if hasattr(pd.DataFrame, "to_markdown"):
117
+ return df.to_markdown(index=False)
118
+ from tabulate import tabulate # pragma: no cover
119
+ return tabulate(df, headers="keys", tablefmt="github", showindex=False)
120
+ except Exception as exc: # pragma: no cover – user‑visible error
121
+ return f"Error reading Excel file: {exc}"
122
+
123
+
124
+ # ---------------------------------------------------------------------------
125
+ # NEW: YouTube Question‑Answer Tool
126
+ # ---------------------------------------------------------------------------
127
+ class YouTubeQATool(PipelineTool):
128
+ """Answer questions about the spoken content of a YouTube video.
129
+
130
+ • Downloads the auto‑generated or creator‑provided transcript using
131
+ **youtube‑transcript‑api** (no API key needed for most public videos).
132
+ • Feeds a compressed transcript + user question to GPT‑4o for an answer.
133
+ • Caches transcripts locally to avoid repeated network calls.
134
+ """
135
+
136
+ name = "youtube_qa"
137
+ description = (
138
+ "Given a YouTube URL and a natural‑language *question*, return an answer "
139
+ "based solely on the video transcript (no hallucinations)."
140
+ )
141
+
142
+ inputs = {
143
+ "url": {
144
+ "type": "string",
145
+ "description": "Full YouTube video URL or just the watch ID.",
146
+ },
147
+ "question": {
148
+ "type": "string",
149
+ "description": "Question about the video content (English / French).",
150
+ },
151
+ }
152
+ output_type = "string"
153
+
154
+ # ––––– internal helpers ––––– ------------------------------------------------
155
+ _TRANSCRIPT_CACHE: dict[str, str] = {} # simple in‑proc cache
156
+
157
+ @staticmethod
158
+ def _extract_video_id(url: str) -> str:
159
+ """Return the 11‑char YouTube ID from a watch/shorts URL or raw ID."""
160
+ if len(url) == 11 and "/" not in url:
161
+ return url
162
+ parsed = _urlparse.urlparse(url)
163
+ if parsed.hostname in ("youtu.be",):
164
+ return parsed.path.lstrip("/")
165
+ if parsed.hostname and "youtube" in parsed.hostname:
166
+ qs = _urlparse.parse_qs(parsed.query)
167
+ if "v" in qs:
168
+ return qs["v"][0]
169
+ # shorts/embedded
170
+ return parsed.path.split("/")[-1]
171
+ raise ValueError("Could not parse YouTube video ID from URL")
172
+
173
+ @classmethod
174
+ def _get_transcript(cls, video_id: str) -> str:
175
+ if video_id in cls._TRANSCRIPT_CACHE:
176
+ return cls._TRANSCRIPT_CACHE[video_id]
177
+ try:
178
+ from youtube_transcript_api import YouTubeTranscriptApi # type: ignore
179
+ except ModuleNotFoundError:
180
+ return "Error: youtube‑transcript‑api library not installed."
181
+ try:
182
+ segments: List[dict] = YouTubeTranscriptApi.get_transcript(video_id)
183
+ except Exception as exc: # private video, disabled captions, …
184
+ return f"Error fetching transcript: {exc}"
185
+ text = " ".join(seg["text"] for seg in segments)
186
+ cls._TRANSCRIPT_CACHE[video_id] = text
187
+ return text
188
+
189
+ # ––––– main entry point ––––– -------------------------------------------
190
+ def forward(self, url: str, question: str) -> str: # type: ignore[override]
191
+ try:
192
+ vid = self._extract_video_id(url)
193
+ except ValueError as e:
194
+ return str(e)
195
+
196
+ transcript = self._get_transcript(vid)
197
+ if transcript.startswith("Error"):
198
+ return transcript
199
+
200
+ # Keep prompt under ~15k chars – truncate transcript if necessary
201
+ max_chars = 15000
202
+ if len(transcript) > max_chars:
203
+ transcript = transcript[:max_chars] + " …(truncated)…"
204
+
205
+ from openai import chat # lazy import OpenAI client only here
206
+
207
+ system = (
208
+ "You are a meticulous assistant. Answer the user's question about "
209
+ "the provided YouTube transcript. If the transcript lacks the "
210
+ "information, reply 'I don't know based on the transcript.'"
211
+ )
212
+ messages = [
213
+ {"role": "system", "content": system},
214
+ {"role": "user", "content": f"Transcript:\n{transcript}"},
215
+ {"role": "user", "content": f"Question: {question}"},
216
+ ]
217
+ try:
218
+ resp = chat.completions.create(
219
+ model="gpt-4o", # uses the same hosted model as the agent
220
+ messages=messages,
221
+ temperature=0.2,
222
+ max_tokens=256,
223
+ )
224
+ return resp.choices[0].message.content.strip()
225
+ except Exception as exc: # pragma: no cover
226
+ return f"Error generating answer: {exc}"
227
+
228
+
229
+ # ---------------------------------------------------------------------------
230
+ # Helper: download attachment (if any)
231
+ # ---------------------------------------------------------------------------
232
+
233
+ def download_file_if_any(base_api_url: str, task_id: str) -> str | None:
234
+ url = f"{base_api_url}/files/{task_id}"
235
+ try:
236
+ resp = requests.get(url, timeout=30)
237
+ if resp.status_code == 404:
238
+ return None
239
+ resp.raise_for_status()
240
+ except requests.HTTPError:
241
+ raise
242
+
243
+ filename = task_id
244
+ if cd := resp.headers.get("content-disposition"):
245
+ if match := re.search(r'filename="([^"]+)"', cd):
246
+ filename = match.group(1)
247
+
248
+ tmp_dir = Path(tempfile.gettempdir(), "gaia_files")
249
+ tmp_dir.mkdir(exist_ok=True)
250
+ file_path = tmp_dir / filename
251
+ file_path.write_bytes(resp.content)
252
+ return str(file_path)
253
+
254
+
255
+ # ---------------------------------------------------------------------------
256
+ # Minimal agent wired with our custom tools
257
+ # ---------------------------------------------------------------------------
258
+ class BasicAgent:
259
+ _model = OpenAIServerModel(model_id="gpt-4o")
260
+ _tools = [
261
+ DuckDuckGoSearchTool(),
262
+ WikipediaSearchTool(),
263
+ SpeechToTextTool(),
264
+ ExcelToTextTool(),
265
+ YouTubeQATool(), # <-- NEW
266
+ ]
267
+
268
+ def __init__(self) -> None:
269
  self.agent = CodeAgent(
270
+ model=self._model,
271
+ tools=self._tools,
272
+ add_base_tools=True,
273
+ additional_authorized_imports=[
274
+ "numpy",
275
+ "pandas",
276
+ "csv",
277
+ "subprocess",
278
+ ],
279
  )
280
+ print("BasicAgent initialized with YouTubeQATool.")
281
+
282
+ def __call__(self, question: str) -> str: # noqa: D401
283
+ print(f"Agent received question (first 80 chars): {question[:80]}…")
284
+ answer = self.agent.run(question)
285
+ print(f"Agent returning answer: {answer}")
286
+ return answer
287
 
 
 
 
 
 
 
288
 
289
  def run_and_submit_all( profile: gr.OAuthProfile | None):
290
  """