Spaces:
Sleeping
Sleeping
File size: 14,214 Bytes
4c99672 1853d1e a60e9fe 2b7e98c 1853d1e a60e9fe 1853d1e 8ae2625 1853d1e 6b3901f 8ae2625 a60e9fe 1853d1e 681be1d 1853d1e 681be1d 1853d1e 8ae2625 6b3901f a60e9fe 8ae2625 a60e9fe 8ae2625 a60e9fe 6b3901f a60e9fe 6b3901f a60e9fe 6b3901f a60e9fe 6b3901f a60e9fe 6b3901f a60e9fe 8ae2625 a60e9fe 8ae2625 a60e9fe 5cfc619 1853d1e a60e9fe 1853d1e a60e9fe 1853d1e a60e9fe 1853d1e a60e9fe 1853d1e a60e9fe 1853d1e a60e9fe 1853d1e 8ae2625 a60e9fe 8ae2625 a60e9fe 8ae2625 a60e9fe 8ae2625 a60e9fe 8ae2625 a60e9fe 8ae2625 a60e9fe 8ae2625 a60e9fe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 |
import os
import base64
import json
import inspect
import time
import trafilatura
from typing import Callable, Union
from pathlib import PurePath
from datetime import datetime, timezone
from markitdown import MarkItDown
from langchain.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import HumanMessage
from langchain_google_genai.chat_models import ChatGoogleGenerativeAIError
from langchain_tavily import TavilySearch, TavilyExtract
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.utilities.wikipedia import WikipediaAPIWrapper
from langchain_community.tools.wikipedia.tool import WikipediaQueryRun
from langchain_google_community import SpeechToTextLoader
from langchain_community.tools import YouTubeSearchTool
from youtube_transcript_api import YouTubeTranscriptApi
from langchain_community.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader
from langchain_community.tools.file_management.read import ReadFileTool
from langchain.chains.summarize import load_summarize_chain
from langchain.prompts import PromptTemplate
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI
from basic_agent import print_conversation
from dotenv import load_dotenv
from langchain.globals import set_debug
from urllib.parse import urlparse, parse_qs
set_debug(False)
CUSTOM_DEBUG = True
load_dotenv()
def encode_image_to_base64(path):
with open(path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
def print_tool_call(tool: Callable, tool_name: str, args: dict):
"""Prints the tool call for debugging purposes."""
sig = inspect.signature(tool)
print_conversation(
messages=[
{
'role': 'Tool-Call',
'content': f"Calling `{tool_name}`{sig}"
},
{
'role': 'Tool-Args',
'content': args
}
],
)
def print_tool_response(response: str):
"""Prints the tool response for debugging purposes."""
print_conversation(
messages=[
{
'role': 'Tool-Response',
'content': response
}
],
)
search_tool = TavilySearch(max_results=3)
extract_tool = TavilyExtract()
@tool
def search_and_extract(query: str) -> list[dict]:
"""Performs a web search and returns structured content extracted from top results."""
time.sleep(3) # To avoid hitting the API rate limit in the llm-apis when calling the tool multiple times in a row.
MAX_NUMBER_OF_CHARS = 15_000
if CUSTOM_DEBUG:
print_tool_call(
search_and_extract,
tool_name='search_and_extract',
args={'query': query, 'max_number_of_chars': MAX_NUMBER_OF_CHARS},
)
results = search_tool.invoke({"query": query})
raw_results = results.get("results", [])
urls = [r["url"] for r in raw_results if r.get("url")]
if not urls:
return [{"error": "No URLs found to extract from."}]
extracted = extract_tool.invoke({"urls": urls})
results = extracted.get("results", [])
structured_results = []
raw_contents = [doc.get("raw_content", "") for doc in results]
for result, doc_content in zip(raw_results, raw_contents):
doc_content_trunc = doc_content[0:MAX_NUMBER_OF_CHARS] if len(doc_content) > MAX_NUMBER_OF_CHARS else doc_content
structured_results.append({
"title": result.get("title"),
"url": result.get("url"),
"snippet": result.get("content"),
"raw_content": doc_content_trunc
})
if CUSTOM_DEBUG:
console_structured_results = [{k: v for k, v in result_dicti.items() if k != "raw_content"} for result_dicti in
structured_results]
print_tool_response(json.dumps(console_structured_results))
return structured_results
@tool
def aggregate_information(query: str, results: list[str]) -> str:
"""
Processes a list of unstructured text chunks (e.g., search results) and produces a concise, query-specific summary.
Each input text is filtered and summarized individually in the context of the provided query. Irrelevant results are discarded.
Relevant content is aggregated and synthesized into a final, coherent answer that directly addresses the query.
"""
if CUSTOM_DEBUG:
print_tool_call(
aggregate_information,
tool_name='aggregate_information',
args={'results': results, 'query': query},
)
if not results:
response = "No search results provided."
if CUSTOM_DEBUG:
print_tool_response(response)
return response
# Convert to LangChain Document objects
docs = [Document(page_content=chunk) for chunk in results]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 🔍 Map Prompt — Summarize each document in light of the query
map_prompt = PromptTemplate.from_template(
"You are analyzing a search result in the context of the question: '{query}'.\n\n"
"Search result:\n{text}\n\n"
"Instructions:\n"
"- If the result contains information relevant to answering the query, summarize the relevant parts clearly.\n"
"- If the result is not helpful or unrelated, return 'IGNORE'.\n"
"- Do not include generic information or filler.\n"
"- Focus on extracting facts, key statements, or numbers that directly support the query.\n\n"
"Relevant Summary:"
)
# 🧠 Combine Prompt — Aggregate the summaries to one final answer
combine_prompt = PromptTemplate.from_template(
"You are aggregating information to provide context to answer the following question: '{query}'.\n\n"
"Here are the summaries from filtered search results:\n{text}\n\n"
"Use the provided summaries to construct a context that directly supports the query without answering it.\n"
"Context:"
)
chain = load_summarize_chain(
llm,
chain_type="map_reduce",
map_prompt=map_prompt.partial(query=query),
combine_prompt=combine_prompt.partial(query=query),
)
summary = chain.invoke({'input_documents': docs})
output_text = summary.get('output_text', str(summary))
output_text = json.dumps({'summary': output_text})
if CUSTOM_DEBUG:
print_tool_response(output_text)
return output_text
gemini = ChatGoogleGenerativeAI(model="gemini-1.5-flash")
@tool
def image_query_tool(image_path: str, question: str) -> str:
"""
Uses Gemini Vision to answer a question about an image.
- image_path: file path to the image to analyze (.png)
- question: the query to ask about the image
"""
try:
base64_img = encode_image_to_base64(image_path)
except OSError:
response = f"OSError: Invalid argument (invalid image path or file format): {image_path}. Please provide a valid PNG image."
print_tool_response(response)
return response
base64_img_str = f"data:image/png;base64,{base64_img}"
if CUSTOM_DEBUG:
print_tool_call(
image_query_tool,
tool_name='image_query_tool',
args={'base64_image': base64_img_str[:100], 'question': question},
)
msg = HumanMessage(content=[
{"type": "text", "text": question},
{"type": "image_url", "image_url": base64_img_str},
])
try:
response = gemini.invoke([msg])
except ChatGoogleGenerativeAIError:
response = "ChatGoogleGenerativeAIError: Invalid argument provided to Gemini: 400 Provided image is not valid"
print_tool_response(response)
return response
if CUSTOM_DEBUG:
print_tool_response(response.content)
return response.content
def extract_video_id(url: str) -> str:
parsed = urlparse(url)
return parse_qs(parsed.query).get("v", [""])[0]
@tool
def get_audio_from_youtube(urls: list[str], save_dir:str="./tmp/") -> list[str | PurePath | None]:
"""Extracts audio from a YouTube video URL."""
if CUSTOM_DEBUG:
print_tool_call(
get_audio_from_youtube,
tool_name='get_audio_from_youtube',
args={'urls': urls, 'save_dir': save_dir},
)
loader = YoutubeAudioLoader(urls, save_dir)
audio_blobs = list(loader.yield_blobs())
paths = [str(blob.path) for blob in audio_blobs]
if CUSTOM_DEBUG:
print_tool_response(json.dumps({'paths': paths}))
return paths
@tool
def load_youtube_transcript(url: str) -> str:
"""Load a YouTube transcript using youtube_transcript_api."""
video_id = extract_video_id(url)
if CUSTOM_DEBUG:
print_tool_call(
load_youtube_transcript,
tool_name='load_youtube_transcript',
args={'url': url},
)
try:
youtube_api_client = YouTubeTranscriptApi()
fetched_transcript = youtube_api_client.fetch(video_id=video_id)
transcript = " ".join(entry.text for entry in fetched_transcript if entry.text.strip())
if transcript and CUSTOM_DEBUG:
print_tool_response(transcript)
return transcript
except Exception as e:
error_str = f"Error loading transcript: {e}. Assuming no transcript for this video."
print_tool_response(error_str)
return error_str
youtube_search_api = YouTubeSearchTool()
@tool
def youtube_search_tool(query: str, number_of_results:int=3) -> list:
"""Search YouTube for a query and return the top number_of_results."""
if CUSTOM_DEBUG:
print_tool_call(
youtube_search_tool,
tool_name='youtube_search_tool',
args={'query': query, number_of_results: number_of_results},
)
response = youtube_search_api.run(f"{query},{number_of_results}")
if CUSTOM_DEBUG:
print_tool_response(response)
return response
@tool
def search_and_extract_from_wikipedia(query: str) -> list:
"""Search Wikipedia for a query and extract useful information."""
wiki_api_wrapper = WikipediaAPIWrapper()
wiki_tool = WikipediaQueryRun(api_wrapper=wiki_api_wrapper)
if CUSTOM_DEBUG:
print_tool_call(
search_and_extract_from_wikipedia,
tool_name='search_and_extract_from_wikipedia',
args={'query': query},
)
response = wiki_tool.invoke(query)
if CUSTOM_DEBUG:
print_tool_response(response)
return response
@tool
def transcribe_audio(file_path: str) -> list:
"""Transcribe audio from an audio file in file_path using Google Speech-to-Text."""
docs, docs_content = [], []
if CUSTOM_DEBUG:
print_tool_call(
transcribe_audio,
tool_name='transcribe_audio',
args={'file_path': file_path},
)
try:
loader = SpeechToTextLoader(
project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
file_path=file_path,
is_long = False, # Set to True for long audio files
)
docs = loader.load()
except Exception as e:
print(f"Error loading audio file: {e}")
try:
loader = SpeechToTextLoader(
project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
file_path=file_path,
is_long=True, # Set to True for long audio files
)
docs = loader.load()
except Exception as e:
docs_content = [f"Error loading audio file: {e}"]
docs_content = [doc.page_content for doc in docs] if docs else docs_content
if CUSTOM_DEBUG:
print_tool_response(docs_content)
return docs_content
@tool
def extract_clean_text_from_url(url: str) -> str:
"""Extract the main readable content from a webpage using trafilatura."""
if CUSTOM_DEBUG:
print_tool_call(
extract_clean_text_from_url,
tool_name='extract_clean_text_from_url',
args={'url': url},
)
downloaded = trafilatura.fetch_url(url)
response = ""
if not downloaded:
response = "Failed to download the page. Please check the URL."
if not "Failed" in response:
response = trafilatura.extract(downloaded)
response = response or "No meaningful content found."
if CUSTOM_DEBUG:
print_tool_response(response)
return response
read_tool = ReadFileTool()
@tool
def smart_read_file(file_path: str) -> str:
"""
Smart tool to read a file based on its type.
- Use `read_file_tool` for simple text, CSV, code files.
- Use MarkItDown for PDFs, Word, Excel, HTML, and other complex formats.
"""
if CUSTOM_DEBUG:
print_tool_call(
smart_read_file,
tool_name='smart_read_file',
args={'file_path': file_path},
)
_, ext = os.path.splitext(file_path.lower())
if ext in [".mp3", ".wav", ".m4a", ".flac"]:
# If the file is an audio file, transcribe it
return transcribe_audio.invoke({"file_path": file_path})
if ext in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]:
# If the file is an image, use image_query_tool to analyze it
q = "What can you tell me about this image?"
return image_query_tool.invoke({"image_path": file_path, "question": q})
if any(ext in url_pattern for url_pattern in ["http://", "https://", "www."]):
if "youtube.com/watch?v=" in file_path:
transcript = load_youtube_transcript.invoke({"url": file_path})
if "Error loading" in transcript:
return get_audio_from_youtube.invoke({'urls': [file_path], 'save_dir': './tmp/'})
else:
return extract_clean_text_from_url.invoke(file_path)
md = MarkItDown()
try:
result = md.convert(file_path)
result = result.text_content
except Exception as e:
# print("Error reading file with MarkItDown:", e)
result = read_tool.invoke({"file_path": file_path})
if CUSTOM_DEBUG:
print_tool_response(result)
return result
|