File size: 14,214 Bytes
4c99672
1853d1e
 
 
 
a60e9fe
 
 
2b7e98c
1853d1e
a60e9fe
1853d1e
 
 
 
 
 
 
 
 
 
8ae2625
 
1853d1e
6b3901f
8ae2625
a60e9fe
 
 
 
1853d1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
681be1d
1853d1e
 
 
 
 
 
 
681be1d
1853d1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ae2625
6b3901f
a60e9fe
 
 
 
 
 
8ae2625
 
a60e9fe
 
 
8ae2625
a60e9fe
 
 
 
 
 
 
 
 
6b3901f
a60e9fe
6b3901f
a60e9fe
 
 
 
 
 
 
 
 
 
 
6b3901f
a60e9fe
6b3901f
a60e9fe
6b3901f
 
a60e9fe
 
 
 
 
 
 
 
 
 
 
 
 
8ae2625
a60e9fe
8ae2625
a60e9fe
5cfc619
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1853d1e
 
 
 
 
 
a60e9fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1853d1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a60e9fe
1853d1e
 
a60e9fe
 
1853d1e
 
a60e9fe
 
 
1853d1e
a60e9fe
1853d1e
a60e9fe
 
1853d1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ae2625
 
 
 
 
a60e9fe
 
8ae2625
 
 
 
 
 
a60e9fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ae2625
a60e9fe
 
 
 
 
8ae2625
 
 
 
 
 
a60e9fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ae2625
 
 
 
a60e9fe
 
 
 
 
 
 
8ae2625
 
a60e9fe
 
8ae2625
 
a60e9fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
import os
import base64
import json
import inspect
import time
import trafilatura

from typing import Callable, Union
from pathlib import PurePath
from datetime import datetime, timezone
from markitdown import MarkItDown

from langchain.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import HumanMessage
from langchain_google_genai.chat_models import ChatGoogleGenerativeAIError

from langchain_tavily import TavilySearch, TavilyExtract
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.utilities.wikipedia import WikipediaAPIWrapper
from langchain_community.tools.wikipedia.tool import WikipediaQueryRun
from langchain_google_community import SpeechToTextLoader
from langchain_community.tools import YouTubeSearchTool
from youtube_transcript_api import YouTubeTranscriptApi
from langchain_community.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader
from langchain_community.tools.file_management.read import ReadFileTool
from langchain.chains.summarize import load_summarize_chain
from langchain.prompts import PromptTemplate
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI

from basic_agent import print_conversation

from dotenv import load_dotenv
from langchain.globals import set_debug
from urllib.parse import urlparse, parse_qs


set_debug(False)
CUSTOM_DEBUG = True

load_dotenv()


def encode_image_to_base64(path):
    with open(path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")


def print_tool_call(tool: Callable, tool_name: str, args: dict):
    """Prints the tool call for debugging purposes."""
    sig = inspect.signature(tool)
    print_conversation(
        messages=[
            {
                'role': 'Tool-Call',
                'content': f"Calling `{tool_name}`{sig}"
            },
            {
                'role': 'Tool-Args',
                'content': args
            }
        ],
    )


def print_tool_response(response: str):
    """Prints the tool response for debugging purposes."""
    print_conversation(
        messages=[
            {
                'role': 'Tool-Response',
                'content': response
            }
        ],
    )


search_tool = TavilySearch(max_results=3)
extract_tool = TavilyExtract()


@tool
def search_and_extract(query: str) -> list[dict]:
    """Performs a web search and returns structured content extracted from top results."""
    time.sleep(3)  # To avoid hitting the API rate limit in the llm-apis when calling the tool multiple times in a row.
    MAX_NUMBER_OF_CHARS = 15_000

    if CUSTOM_DEBUG:
        print_tool_call(
            search_and_extract,
            tool_name='search_and_extract',
            args={'query': query, 'max_number_of_chars': MAX_NUMBER_OF_CHARS},
        )

    results = search_tool.invoke({"query": query})
    raw_results = results.get("results", [])
    urls = [r["url"] for r in raw_results if r.get("url")]

    if not urls:
        return [{"error": "No URLs found to extract from."}]

    extracted = extract_tool.invoke({"urls": urls})
    results = extracted.get("results", [])

    structured_results = []
    raw_contents = [doc.get("raw_content", "") for doc in results]

    for result, doc_content in zip(raw_results, raw_contents):
        doc_content_trunc = doc_content[0:MAX_NUMBER_OF_CHARS] if len(doc_content) > MAX_NUMBER_OF_CHARS else doc_content
        structured_results.append({
            "title": result.get("title"),
            "url": result.get("url"),
            "snippet": result.get("content"),
            "raw_content": doc_content_trunc
        })

    if CUSTOM_DEBUG:
        console_structured_results = [{k: v for k, v in result_dicti.items() if k != "raw_content"} for result_dicti in
                                      structured_results]
        print_tool_response(json.dumps(console_structured_results))
    return structured_results


@tool
def aggregate_information(query: str, results: list[str]) -> str:
    """
    Processes a list of unstructured text chunks (e.g., search results) and produces a concise, query-specific summary.

    Each input text is filtered and summarized individually in the context of the provided query. Irrelevant results are discarded.
    Relevant content is aggregated and synthesized into a final, coherent answer that directly addresses the query.
    """
    if CUSTOM_DEBUG:
        print_tool_call(
            aggregate_information,
            tool_name='aggregate_information',
            args={'results': results, 'query': query},
        )
    if not results:
        response = "No search results provided."
        if CUSTOM_DEBUG:
            print_tool_response(response)
        return response

    # Convert to LangChain Document objects
    docs = [Document(page_content=chunk) for chunk in results]

    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

    # 🔍 Map Prompt — Summarize each document in light of the query
    map_prompt = PromptTemplate.from_template(
        "You are analyzing a search result in the context of the question: '{query}'.\n\n"
        "Search result:\n{text}\n\n"
        "Instructions:\n"
        "- If the result contains information relevant to answering the query, summarize the relevant parts clearly.\n"
        "- If the result is not helpful or unrelated, return 'IGNORE'.\n"
        "- Do not include generic information or filler.\n"
        "- Focus on extracting facts, key statements, or numbers that directly support the query.\n\n"
        "Relevant Summary:"
    )

    # 🧠 Combine Prompt — Aggregate the summaries to one final answer
    combine_prompt = PromptTemplate.from_template(
        "You are aggregating information to provide context to answer the following question: '{query}'.\n\n"
        "Here are the summaries from filtered search results:\n{text}\n\n"
        "Use the provided summaries to construct a context that directly supports the query without answering it.\n"
        "Context:"
    )

    chain = load_summarize_chain(
        llm,
        chain_type="map_reduce",
        map_prompt=map_prompt.partial(query=query),
        combine_prompt=combine_prompt.partial(query=query),
    )

    summary = chain.invoke({'input_documents': docs})
    output_text = summary.get('output_text', str(summary))
    output_text = json.dumps({'summary': output_text})

    if CUSTOM_DEBUG:
        print_tool_response(output_text)

    return output_text


gemini = ChatGoogleGenerativeAI(model="gemini-1.5-flash")


@tool
def image_query_tool(image_path: str, question: str) -> str:
    """
    Uses Gemini Vision to answer a question about an image.
    - image_path: file path to the image to analyze (.png)
    - question: the query to ask about the image
    """
    try:
        base64_img = encode_image_to_base64(image_path)
    except OSError:
        response = f"OSError: Invalid argument (invalid image path or file format): {image_path}. Please provide a valid PNG image."
        print_tool_response(response)
        return response

    base64_img_str = f"data:image/png;base64,{base64_img}"
    if CUSTOM_DEBUG:
        print_tool_call(
            image_query_tool,
            tool_name='image_query_tool',
            args={'base64_image': base64_img_str[:100], 'question': question},
        )
    msg = HumanMessage(content=[
        {"type": "text", "text": question},
        {"type": "image_url", "image_url": base64_img_str},
    ])
    try:
        response = gemini.invoke([msg])
    except ChatGoogleGenerativeAIError:
        response = "ChatGoogleGenerativeAIError: Invalid argument provided to Gemini: 400 Provided image is not valid"
        print_tool_response(response)
        return response
    if CUSTOM_DEBUG:
        print_tool_response(response.content)
    return response.content


def extract_video_id(url: str) -> str:
    parsed = urlparse(url)
    return parse_qs(parsed.query).get("v", [""])[0]


@tool
def get_audio_from_youtube(urls: list[str], save_dir:str="./tmp/") -> list[str | PurePath | None]:
    """Extracts audio from a YouTube video URL."""

    if CUSTOM_DEBUG:
        print_tool_call(
            get_audio_from_youtube,
            tool_name='get_audio_from_youtube',
            args={'urls': urls, 'save_dir': save_dir},
        )
    loader = YoutubeAudioLoader(urls, save_dir)
    audio_blobs = list(loader.yield_blobs())
    paths = [str(blob.path) for blob in audio_blobs]

    if CUSTOM_DEBUG:
        print_tool_response(json.dumps({'paths': paths}))

    return paths


@tool
def load_youtube_transcript(url: str) -> str:
    """Load a YouTube transcript using youtube_transcript_api."""

    video_id = extract_video_id(url)

    if CUSTOM_DEBUG:
        print_tool_call(
            load_youtube_transcript,
            tool_name='load_youtube_transcript',
            args={'url': url},
        )
    try:
        youtube_api_client = YouTubeTranscriptApi()
        fetched_transcript = youtube_api_client.fetch(video_id=video_id)
        transcript = " ".join(entry.text for entry in fetched_transcript if entry.text.strip())

        if transcript and CUSTOM_DEBUG:
            print_tool_response(transcript)

        return transcript

    except Exception as e:
        error_str = f"Error loading transcript: {e}. Assuming no transcript for this video."
        print_tool_response(error_str)
        return error_str


youtube_search_api = YouTubeSearchTool()

@tool
def youtube_search_tool(query: str, number_of_results:int=3) -> list:
    """Search YouTube for a query and return the top number_of_results."""
    if CUSTOM_DEBUG:
        print_tool_call(
            youtube_search_tool,
            tool_name='youtube_search_tool',
            args={'query': query, number_of_results: number_of_results},
        )
    response = youtube_search_api.run(f"{query},{number_of_results}")
    if CUSTOM_DEBUG:
        print_tool_response(response)
    return response


@tool
def search_and_extract_from_wikipedia(query: str) -> list:
    """Search Wikipedia for a query and extract useful information."""
    wiki_api_wrapper = WikipediaAPIWrapper()
    wiki_tool = WikipediaQueryRun(api_wrapper=wiki_api_wrapper)
    if CUSTOM_DEBUG:
        print_tool_call(
            search_and_extract_from_wikipedia,
            tool_name='search_and_extract_from_wikipedia',
            args={'query': query},
        )
    response = wiki_tool.invoke(query)
    if CUSTOM_DEBUG:
        print_tool_response(response)
    return response


@tool
def transcribe_audio(file_path: str) -> list:
    """Transcribe audio from an audio file in file_path using Google Speech-to-Text."""
    docs, docs_content = [], []
    if CUSTOM_DEBUG:
        print_tool_call(
            transcribe_audio,
            tool_name='transcribe_audio',
            args={'file_path': file_path},
        )
    try:
        loader = SpeechToTextLoader(
            project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
            file_path=file_path,
            is_long = False,  # Set to True for long audio files
        )

        docs = loader.load()
    except Exception as e:
        print(f"Error loading audio file: {e}")
        try:
            loader = SpeechToTextLoader(
                project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
                file_path=file_path,
                is_long=True,  # Set to True for long audio files
            )

            docs = loader.load()
        except Exception as e:
            docs_content = [f"Error loading audio file: {e}"]

    docs_content = [doc.page_content for doc in docs] if docs else docs_content

    if CUSTOM_DEBUG:
        print_tool_response(docs_content)
    return docs_content


@tool
def extract_clean_text_from_url(url: str) -> str:
    """Extract the main readable content from a webpage using trafilatura."""
    if CUSTOM_DEBUG:
        print_tool_call(
            extract_clean_text_from_url,
            tool_name='extract_clean_text_from_url',
            args={'url': url},
        )
    downloaded = trafilatura.fetch_url(url)
    response = ""
    if not downloaded:
        response = "Failed to download the page. Please check the URL."

    if not "Failed" in response:
        response = trafilatura.extract(downloaded)

    response = response or "No meaningful content found."
    if CUSTOM_DEBUG:
        print_tool_response(response)
    return response


read_tool = ReadFileTool()


@tool
def smart_read_file(file_path: str) -> str:
    """
    Smart tool to read a file based on its type.

    - Use `read_file_tool` for simple text, CSV, code files.
    - Use MarkItDown for PDFs, Word, Excel, HTML, and other complex formats.
    """
    if CUSTOM_DEBUG:
        print_tool_call(
            smart_read_file,
            tool_name='smart_read_file',
            args={'file_path': file_path},
        )
    _, ext = os.path.splitext(file_path.lower())

    if ext in [".mp3", ".wav", ".m4a", ".flac"]:
        # If the file is an audio file, transcribe it
        return transcribe_audio.invoke({"file_path": file_path})

    if ext in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]:
        # If the file is an image, use image_query_tool to analyze it
        q = "What can you tell me about this image?"
        return image_query_tool.invoke({"image_path": file_path, "question": q})

    if any(ext in url_pattern for url_pattern in ["http://", "https://", "www."]):
        if "youtube.com/watch?v=" in file_path:
            transcript = load_youtube_transcript.invoke({"url": file_path})
            if "Error loading" in transcript:
                return get_audio_from_youtube.invoke({'urls': [file_path], 'save_dir': './tmp/'})
        else:
            return extract_clean_text_from_url.invoke(file_path)

    md = MarkItDown()
    try:
        result = md.convert(file_path)
        result = result.text_content
    except Exception as e:
        # print("Error reading file with MarkItDown:", e)
        result = read_tool.invoke({"file_path": file_path})

    if CUSTOM_DEBUG:
        print_tool_response(result)
    return result