|
import openai |
|
from langchain_community.document_loaders import PyPDFDirectoryLoader |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from langchain_openai import ChatOpenAI, OpenAIEmbeddings |
|
from langchain_chroma import Chroma |
|
import chromadb |
|
import uuid |
|
|
|
from docx import Document |
|
from datetime import datetime |
|
from docx.shared import Pt, RGBColor |
|
from docx import Document |
|
from docx.shared import Pt, RGBColor |
|
from docx.oxml import OxmlElement, ns |
|
from datetime import datetime |
|
import os |
|
import re |
|
|
|
import uuid |
|
import tempfile |
|
import shutil |
|
import time |
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
HFS_vs_GoogleColab = 1 |
|
|
|
|
|
if HFS_vs_GoogleColab == 0: |
|
from google.colab import drive |
|
drive.mount('/content/drive') |
|
|
|
class CFG: |
|
BASE_PATH = r'/content/drive/MyDrive/Colab Notebooks/MyProjects/Asystent_Analityka/' if HFS_vs_GoogleColab == 0 else "./" |
|
nazwa_projektu_HF = "etfy" |
|
rola = "Jesteś asystentem doradcy finansowego" |
|
kolekcja_bd = "etfy" |
|
|
|
model_llm = "gpt-4o-mini" |
|
temperature = 0.6 |
|
model_embeddings = "text-embedding-3-small" |
|
dimensions_embeddings = 1536 |
|
chunk_size = 3200 |
|
chunk_overlap = 500 |
|
|
|
|
|
retriever_num_base_results = 5 |
|
reranked_num_results = 3 |
|
|
|
if HFS_vs_GoogleColab == 1: |
|
|
|
openai_api_key = os.getenv("OPENAI_API_KEY") |
|
openai.api_key = openai_api_key |
|
|
|
else: |
|
|
|
from google.colab import userdata |
|
os.environ["OPENAI_API_KEY"] = userdata.get('Elephant-key') |
|
|
|
|
|
|
|
|
|
|
|
DATA_PATH = os.path.join(CFG.BASE_PATH, f"data_{CFG.kolekcja_bd}") |
|
CHROMA_PATH = os.path.join(CFG.BASE_PATH, f"chroma_db_{CFG.kolekcja_bd}") |
|
TEMP_PATH = os.path.join(CFG.BASE_PATH, f"answers_{CFG.kolekcja_bd}") |
|
|
|
|
|
|
|
|
|
os.makedirs(DATA_PATH, exist_ok=True) |
|
|
|
|
|
os.makedirs(CHROMA_PATH, exist_ok=True) |
|
|
|
|
|
os.makedirs(TEMP_PATH, exist_ok=True) |
|
|
|
|
|
|
|
def initiate_embeding_model(model_embeddings=CFG.model_embeddings, model_dimensions=CFG.dimensions_embeddings): |
|
|
|
embeddings_model = OpenAIEmbeddings( |
|
model = model_embeddings, |
|
dimensions =model_dimensions |
|
) |
|
|
|
return embeddings_model |
|
|
|
embeddings_model = initiate_embeding_model(CFG.model_embeddings, CFG.dimensions_embeddings) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_vector_store(embeddings_model, CHROMA_PATH): |
|
|
|
client = chromadb.PersistentClient(path=CHROMA_PATH) |
|
|
|
|
|
vector_store = Chroma( |
|
embedding_function = embeddings_model, |
|
persist_directory = CHROMA_PATH, |
|
) |
|
|
|
|
|
print("Dostępne kolekcje:", client.list_collections()) |
|
|
|
return vector_store, client |
|
|
|
vector_store, client = create_vector_store(embeddings_model, CHROMA_PATH) |
|
|
|
|
|
|
|
def read_data_from_chroma(collection_name): |
|
|
|
|
|
|
|
vector_store = Chroma( |
|
collection_name=collection_name, |
|
embedding_function=embeddings_model, |
|
persist_directory=CHROMA_PATH, |
|
) |
|
|
|
|
|
base_num_results = CFG.retriever_num_base_results |
|
|
|
|
|
|
|
|
|
|
|
retriever = vector_store.as_retriever(search_kwargs={"k": base_num_results}) |
|
|
|
return retriever |
|
|
|
|
|
collection_name = CFG.kolekcja_bd |
|
retriever = read_data_from_chroma(collection_name) |
|
|
|
|
|
|
|
|
|
llm = ChatOpenAI(temperature=CFG.temperature, model=CFG.model_llm) |
|
|
|
|
|
|
|
def response(query, historia=None): |
|
|
|
relevant_chunks = retriever.invoke(query) |
|
|
|
|
|
|
|
knowledge = "" |
|
sources_markdown = "" |
|
zrodla = "" |
|
cytaty = "" |
|
nazwa_projektu = CFG.nazwa_projektu_HF |
|
|
|
|
|
if HFS_vs_GoogleColab == 1: |
|
|
|
|
|
HF_SPACE_ID = os.getenv("SPACE_ID", "smolinski/test") |
|
|
|
|
|
BASE_URL = f"https://huggingface.co/spaces/{HF_SPACE_ID}/resolve/main/data_{CFG.kolekcja_bd}/" |
|
else: |
|
BASE_URL = DATA_PATH + "/" |
|
|
|
for relevant_chunk in relevant_chunks: |
|
|
|
full_source = relevant_chunk.metadata.get("source", "Nieznane źródło") |
|
file_name = os.path.basename(full_source) |
|
file_link = f"{BASE_URL}{file_name}" |
|
file_link = file_link.replace(' ', '%20') |
|
|
|
|
|
page_number_raw = relevant_chunk.metadata.get("page_number", None) |
|
try: |
|
page_number = int(page_number_raw) + 1 |
|
page_number = str(page_number) |
|
except (ValueError, TypeError): |
|
page_number = "nieznana strona" |
|
|
|
|
|
sources_markdown += f"\n- {file_name}, str. {page_number}: [otwórz]({file_link})" |
|
|
|
|
|
cytaty += f"Cytat z {file_name}, strona {page_number}:\n\n{relevant_chunk.page_content}\n\n---\n\n" |
|
|
|
knowledge += relevant_chunk.page_content + "\n\n---\n\n" |
|
|
|
|
|
|
|
|
|
historia_text = "" |
|
if historia: |
|
for i, (q, a) in enumerate(historia[-5:], 1): |
|
historia_text += f"\nPoprzednia rozmowa {i}:\nPytanie: {q}\nOdpowiedź: {a}\n" |
|
|
|
|
|
if query is not None: |
|
rag_prompt = f""" |
|
{CFG.rola}, który szczegółowo i dokładnie odpowiada na pytania w oparciu o przekazaną wiedzę. |
|
Dziel się wszystkimi posiadanymi informacjami na dany temat. |
|
Na pytanie o ETF-y na GPW wymieniaj wszystkie dostępne, chyba że to pytanie szczegółowe o etf-y long, short, lewarowane itp. |
|
Na pytania o aktualne notowania odpowiadaj: Aktualne notowania dostępne są na stronie GPW. |
|
Do podkreślenia lub wypunktowania najważniejszych rzeczy używaj pogrubionej czcionki, a do ciekawostek i dodatkowych rzeczy kursywy. |
|
Podczas udzielania odpowiedzi korzystaj wyłącznie z poniższych informacji zawartych w sekcji „Wiedza”. |
|
Bądź miły i uprzejmy, ale rzeczowy. Przykładaj większą wagę do nowszych informacji. |
|
Jeśli pytanie jest zbyt ogólne nie odpowiadaj na nie, lecz poproś o doprecyzowanie. |
|
Jeśli nie znasz odpowiedzi, napisz: Niestety nie posiadam informacji na ten temat. NIE WYMYŚLAJ NICZEGO.\n\n |
|
|
|
{historia_text} |
|
|
|
Pytanie: {query} |
|
|
|
|
|
|
|
Wiedza:\n {knowledge} |
|
|
|
""" |
|
|
|
|
|
response = llm(rag_prompt) |
|
|
|
return response.content if response and response.content else "Brak odpowiedzi.", sources_markdown, cytaty |
|
|
|
|
|
|
|
|
|
def zaktualizuj_historie(pytanie, odpowiedz, historia): |
|
historia.append((pytanie, odpowiedz)) |
|
return historia[-5:] |
|
|
|
def wyczysc_formularz(): |
|
return "", "", "", [] |
|
|
|
|
|
def dodaj_cytaty(odpowiedz, cytaty): |
|
return f"{odpowiedz}\n\n---\n**Cytaty ze śródeł**\n---\n\n{cytaty}" if cytaty else odpowiedz |
|
|
|
|
|
|
|
def init_user_session(): |
|
cleanup_old_sessions(base_path=tempfile.gettempdir(), max_age_days=1) |
|
session_id = str(uuid.uuid4()) |
|
user_temp_dir = os.path.join(tempfile.gettempdir(), f"asystent_{session_id}") |
|
os.makedirs(user_temp_dir, exist_ok=True) |
|
return user_temp_dir |
|
|
|
|
|
|
|
def cleanup_old_sessions(base_path, max_age_days=1): |
|
now = time.time() |
|
for folder in os.listdir(base_path): |
|
if folder.startswith("asystent_"): |
|
folder_path = os.path.join(base_path, folder) |
|
if os.path.isdir(folder_path): |
|
folder_age = now - os.path.getctime(folder_path) |
|
if folder_age > max_age_days * 86400: |
|
shutil.rmtree(folder_path) |
|
print(f"Usunięto stary folder sesji: {folder_path}") |
|
|
|
|
|
|
|
def zapisz_odpowiedz(odpowiedz, pytanie, sources, user_path): |
|
if not odpowiedz or odpowiedz.strip() == "" or not pytanie.strip(): |
|
print("Błąd: Odpowiedź lub pytanie są puste!") |
|
return None |
|
|
|
date_str = datetime.now().strftime("%Y-%m-%d") |
|
file_name = "".join(c if c.isalnum() or c in (" ", "_", "-") else "_" for c in pytanie)[:50] |
|
file_path = os.path.join(user_path, f"{file_name}_{date_str}.docx") |
|
|
|
try: |
|
doc = Document() |
|
|
|
def formatuj_naglowek(paragraph, text, font_size=14, color=(0, 0, 0), bold=True): |
|
run = paragraph.add_run(text) |
|
run.bold = bold |
|
run.font.size = Pt(font_size) |
|
run.font.color.rgb = RGBColor(*color) |
|
run.font.name = "Calibri" |
|
paragraph.paragraph_format.line_spacing = 1.25 |
|
paragraph.paragraph_format.space_before = Pt(5) |
|
paragraph.paragraph_format.space_after = Pt(0) |
|
|
|
def formatuj_paragraf(paragraph): |
|
for run in paragraph.runs: |
|
run.font.name = "Calibri" |
|
run.font.size = Pt(12) |
|
paragraph.paragraph_format.line_spacing = 1.25 |
|
paragraph.paragraph_format.space_before = Pt(0) |
|
paragraph.paragraph_format.space_after = Pt(5) |
|
|
|
def dodaj_markdown_tekst(doc, text): |
|
lines = text.splitlines() |
|
for line in lines: |
|
|
|
if line.strip().startswith(("- ", "* ")): |
|
para = doc.add_paragraph(style='List Bullet') |
|
content = line.strip()[2:] |
|
|
|
elif line.strip().startswith("> "): |
|
para = doc.add_paragraph() |
|
para.paragraph_format.left_indent = Pt(20) |
|
content = line.strip()[2:] |
|
else: |
|
para = doc.add_paragraph() |
|
content = line |
|
|
|
|
|
pattern = r"(\*\*.*?\*\*|\*.*?\*|~~.*?~~|[^*~]+)" |
|
parts = re.findall(pattern, content) |
|
|
|
for part in parts: |
|
clean = part.replace("**", "").replace("*", "").replace("~~", "") |
|
run = para.add_run(clean) |
|
|
|
if part.startswith("**") and part.endswith("**"): |
|
run.bold = True |
|
elif part.startswith("*") and part.endswith("*"): |
|
run.italic = True |
|
elif part.startswith("~~") and part.endswith("~~"): |
|
run.font.strike = True |
|
|
|
run.font.name = "Calibri" |
|
run.font.size = Pt(12) |
|
|
|
p1 = doc.add_paragraph() |
|
formatuj_naglowek(p1, "Pytanie:") |
|
p1 = doc.add_paragraph(pytanie) |
|
formatuj_paragraf(p1) |
|
|
|
doc.add_paragraph(" ") |
|
|
|
p2 = doc.add_paragraph() |
|
formatuj_naglowek(p2, "Odpowiedź:") |
|
dodaj_markdown_tekst(doc, odpowiedz) |
|
|
|
doc.add_paragraph(" ") |
|
|
|
if sources and sources.strip(): |
|
p3 = doc.add_paragraph() |
|
formatuj_naglowek(p3, "Źródła:") |
|
p3 = doc.add_paragraph(re.sub(r":.*", "", sources)) |
|
formatuj_paragraf(p3) |
|
|
|
doc.save(file_path) |
|
print(f"Plik zapisany: {file_path}") |
|
return file_path if os.path.exists(file_path) else None |
|
|
|
except Exception as e: |
|
print(f"Błąd podczas zapisu pliku: {e}") |
|
return None |
|
|
|
|
|
|
|
def lista_plikow(user_path): |
|
pliki = [os.path.join(user_path, f) for f in os.listdir(user_path) if f.endswith(".docx")] |
|
pliki.sort(key=os.path.getctime, reverse=True) |
|
return pliki if pliki else None |
|
|
|
|
|
|
|
def wyczysc_folder(user_path): |
|
if os.path.exists(user_path): |
|
shutil.rmtree(user_path) |
|
|
|
|
|
|
|
|
|
def stream_response(query, history): |
|
"""Obsługuje strumieniowanie i poprawnie czyści pole tekstowe.""" |
|
|
|
history = history or [] |
|
|
|
|
|
relevant_chunks = retriever.invoke(query) |
|
knowledge = "\n\n---\n\n".join([relevant_chunk.page_content for relevant_chunk in relevant_chunks]) |
|
|
|
|
|
rag_prompt = f""" |
|
{CFG.rola}, który szczegółowo i dokładnie odpowiada na pytania w oparciu o przekazaną wiedzę. |
|
Dziel się wszystkimi posiadanymi informacjami na dany temat, tak by Twoje odpowiedzi były wyczerpujące. |
|
Na pytanie o ETF-y na GPW wymieniaj wszystkie dostępne, chyba że to pytanie szczegółowe o etf-y long, short, lewarowane itp. |
|
Na pytania o aktualne notowania odpowiadaj: Aktualne notowania dostępne są na stronie GPW. |
|
Do podkreślenia lub wypunktowania najważniejszych rzeczy używaj pogrubionej czcionki, a do ciekawostek i dodatkowych rzeczy kursywy. |
|
Podczas udzielania odpowiedzi korzystaj wyłącznie z poniższych informacji zawartych w sekcji „Wiedza”. |
|
Bądź miły i uprzejmy, ale rzeczowy. Przykładaj większą wagę do nowszych informacji. |
|
Jeśli pytanie jest zbyt ogólne nie odpowiadaj na nie, lecz poproś o doprecyzowanie. |
|
Jeśli nie znasz odpowiedzi, napisz: Niestety nie posiadam informacji na ten temat. NIE WYMYŚLAJ NICZEGO.\n\n |
|
|
|
Pytanie: {query}\n\n |
|
|
|
Historia rozmowy:\n {history} |
|
|
|
Wiedza:\n {knowledge} |
|
""" |
|
|
|
print("Prompt:\n", rag_prompt) |
|
print("Odpowiedź:") |
|
|
|
|
|
partial_message = "" |
|
|
|
for response in llm.stream(rag_prompt): |
|
partial_message += response.content |
|
yield history + [(query, partial_message)], query |
|
|
|
|
|
history.append((query, partial_message)) |
|
yield history, "" |
|
|
|
|
|
|
|
with gr.Blocks(css=""" |
|
.button_wyczysc-color { |
|
background-color: #A9A9A9 !important; |
|
color: white !important; |
|
} |
|
|
|
#markdown_odpowiedz { |
|
border: 1px solid #ccc; |
|
border-radius: 6px; |
|
padding: 12px; |
|
background-color: #f9f9f9; |
|
margin-top: 6px; |
|
min-height: 8em; |
|
} |
|
""") as gui: |
|
|
|
session_dir = gr.State(value=init_user_session) |
|
historia_formularza = gr.State([]) |
|
|
|
gr.Markdown("# Asystent Finansowy") |
|
gr.Markdown("### Odpowiadam na pytania z zakresu ETF-ów notowanych na GPW.") |
|
gr.Markdown("###### Pamiętaj: jestem tylko chatbotem i czasami się mylę, a moje odpowiedzi nie mogą być traktowane jako rekomendacje inwestycyjne!") |
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.TabItem("💬 Chat"): |
|
chatbot = gr.Chatbot() |
|
input_text_chat = gr.Textbox(placeholder="Napisz tutaj pytanie...", container=False, autoscroll=True, scale=7) |
|
input_text_chat.submit(fn=stream_response, inputs=[input_text_chat, chatbot], outputs=[chatbot, input_text_chat]) |
|
|
|
|
|
with gr.TabItem("📝 Formularz"): |
|
input_text_form = gr.Textbox(label="Zadaj pytanie:", placeholder="Napisz tutaj pytanie...", lines=2, interactive=True) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
submit_button = gr.Button("Wyślij pytanie") |
|
with gr.Column(scale=1): |
|
clear_answer_button = gr.Button("Wyczyść formularz", elem_classes="button_wyczysc-color") |
|
with gr.Column(scale=7): |
|
gr.Markdown("") |
|
|
|
gr.Markdown("### Odpowiedź:") |
|
output_answer = gr.Markdown( |
|
value="", |
|
elem_id="markdown_odpowiedz" |
|
) |
|
|
|
output_cytaty = gr.State("") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
zacytuj_button = gr.Button("Przytocz źródła") |
|
with gr.Column(scale=8): |
|
gr.Markdown("") |
|
|
|
gr.Markdown("### Źródła:") |
|
output_sources = gr.Markdown() |
|
|
|
gr.Markdown("### Pobierz odpowiedzi:") |
|
download_files = gr.File(label="Pliki do pobrania", interactive=False, file_types=[".docx"]) |
|
|
|
|
|
submit_button.click( |
|
response, |
|
inputs=[input_text_form, historia_formularza], |
|
outputs=[output_answer, output_sources, output_cytaty] |
|
).then( |
|
zaktualizuj_historie, |
|
inputs=[input_text_form, output_answer, historia_formularza], |
|
outputs=historia_formularza |
|
).then( |
|
zapisz_odpowiedz, |
|
inputs=[output_answer, input_text_form, output_sources, session_dir], |
|
outputs=None |
|
).then( |
|
lista_plikow, |
|
inputs=session_dir, |
|
outputs=download_files |
|
) |
|
|
|
clear_answer_button.click( |
|
wyczysc_formularz, |
|
inputs=[], |
|
outputs=[output_answer, input_text_form, output_sources, historia_formularza] |
|
) |
|
|
|
zacytuj_button.click( |
|
dodaj_cytaty, |
|
inputs=[output_answer, output_cytaty], |
|
outputs=output_answer |
|
).then( |
|
zapisz_odpowiedz, |
|
inputs=[output_answer, input_text_form, output_sources, session_dir], |
|
outputs=None |
|
).then( |
|
lista_plikow, |
|
inputs=session_dir, |
|
outputs=download_files |
|
) |
|
|
|
gui.launch() |