import base64 import json import os import time import zipfile from pathlib import Path import re import uuid import pymupdf ############################### # 환경 설정 ############################### # 필요 없다면 주석 처리 가능 os.system('pip uninstall -y magic-pdf') os.system('pip install git+https://github.com/opendatalab/MinerU.git@dev') os.system('wget https://github.com/opendatalab/MinerU/raw/dev/scripts/download_models_hf.py -O download_models_hf.py') # 모델 다운로드 (네트워크가 없는 환경이라면 try/except로 묶거나 주석 처리) try: os.system('python download_models_hf.py') except Exception as e: print("모델 다운로드 중 에러가 발생했습니다. 네트워크 연결을 확인하거나, 수동으로 모델을 배치하세요.") print("에러 메시지:", e) ############################### # magic-pdf.json 처리 ############################### json_path = "/home/user/magic-pdf.json" if os.path.exists(json_path): # 기존에 파일이 있으면 로드 with open(json_path, 'r', encoding='utf-8') as file: data = json.load(file) else: # 없으면 기본값 생성 data = { "device-mode": "cuda", # CPU만 쓰려면 "cpu" "llm-aided-config": { "title_aided": { "api_key": os.getenv('apikey', ""), "enable": bool(os.getenv('apikey')) } } } # 파일 생성 (필요 없으면 생략) with open(json_path, 'w', encoding='utf-8') as file: json.dump(data, file, indent=4) # 이후 device-mode나 llm-aided-config 필요한 경우 수정 data['device-mode'] = "cuda" # 원하는 디바이스로 세팅 if os.getenv('apikey'): data['llm-aided-config']['title_aided']['api_key'] = os.getenv('apikey') data['llm-aided-config']['title_aided']['enable'] = True # 변경사항 다시 저장 with open(json_path, 'w', encoding='utf-8') as file: json.dump(data, file, indent=4) # paddleocr 복사 os.system('cp -r paddleocr /home/user/.paddleocr') ############################### # 그 외 라이브러리 ############################### import gradio as gr from loguru import logger from gradio_pdf import PDF ############################### # magic_pdf 관련 모듈 ############################### from magic_pdf.data.data_reader_writer import FileBasedDataReader from magic_pdf.libs.hash_utils import compute_sha256 from magic_pdf.tools.common import do_parse, prepare_env ############################### # 공통 함수들 ############################### def create_css(): """ 기본 CSS 스타일. """ return """ .gradio-container { width: 100vw !important; min-height: 100vh !important; margin: 0 !important; padding: 0 !important; background: linear-gradient(135deg, #EFF6FF 0%, #F5F3FF 100%); display: flex; flex-direction: column; overflow-y: auto !important; } .title-area { text-align: center; margin: 1rem auto; padding: 1rem; background: white; border-radius: 1rem; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); max-width: 800px; } .title-area h1 { background: linear-gradient(90deg, #2563EB 0%, #7C3AED 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-size: 2.5rem; font-weight: bold; margin-bottom: 0.5rem; } .title-area p { color: #6B7280; font-size: 1.1rem; } .gr-block, .gr-box { padding: 0.5rem !important; } """ def read_fn(path): disk_rw = FileBasedDataReader(os.path.dirname(path)) return disk_rw.read(os.path.basename(path)) def parse_pdf(doc_path, output_dir, end_page_id, is_ocr, layout_mode, formula_enable, table_enable, language): os.makedirs(output_dir, exist_ok=True) try: file_name = f"{str(Path(doc_path).stem)}_{time.time()}" pdf_data = read_fn(doc_path) parse_method = "ocr" if is_ocr else "auto" local_image_dir, local_md_dir = prepare_env(output_dir, file_name, parse_method) do_parse( output_dir, file_name, pdf_data, [], parse_method, False, end_page_id=end_page_id, layout_model=layout_mode, formula_enable=formula_enable, table_enable=table_enable, lang=language, f_dump_orig_pdf=False ) return local_md_dir, file_name except Exception as e: logger.exception(e) def compress_directory_to_zip(directory_path, output_zip_path): try: with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(directory_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, directory_path) zipf.write(file_path, arcname) return 0 except Exception as e: logger.exception(e) return -1 def image_to_base64(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def replace_image_with_base64(markdown_text, image_dir_path): pattern = r'\!\[(?:[^\]]*)\]\(([^)]+)\)' def replace(match): relative_path = match.group(1) full_path = os.path.join(image_dir_path, relative_path) base64_image = image_to_base64(full_path) return f"" return re.sub(pattern, replace, markdown_text) def to_pdf(file_path): """ 이미지(JPG/PNG 등)를 PDF로 컨버팅. TXT, CSV 파일인 경우 변환 없이 원본 경로를 그대로 반환. """ ext = Path(file_path).suffix.lower() if ext in ['.txt', '.csv']: return file_path with pymupdf.open(file_path) as f: if f.is_pdf: return file_path else: pdf_bytes = f.convert_to_pdf() unique_filename = f"{uuid.uuid4()}.pdf" tmp_file_path = os.path.join(os.path.dirname(file_path), unique_filename) with open(tmp_file_path, 'wb') as tmp_pdf_file: tmp_pdf_file.write(pdf_bytes) return tmp_file_path def to_markdown(file_path, end_pages, is_ocr, layout_mode, formula_enable, table_enable, language, progress=gr.Progress(track_tqdm=False)): """ 업로드된 PDF/이미지 또는 TXT/CSV -> 마크다운 변환 (프로그레스 바 표시용) """ ext = Path(file_path).suffix.lower() if ext in ['.txt', '.csv']: progress(0, "파일 읽는 중...") with open(file_path, 'r', encoding='utf-8') as f: txt_content = f.read() time.sleep(0.5) progress(50, "파일 내용 처리 중...") progress(100, "변환 완료!") return f"```{txt_content}```\n\n**변환 완료 (텍스트/CSV 파일)**" else: progress(0, "PDF로 변환 중...") file_path = to_pdf(file_path) time.sleep(0.5) if end_pages > 20: end_pages = 20 progress(20, "문서 파싱 중...") local_md_dir, file_name = parse_pdf(file_path, './output', end_pages - 1, is_ocr, layout_mode, formula_enable, table_enable, language) time.sleep(0.5) progress(50, "압축(zip) 생성 중...") archive_zip_path = os.path.join("./output", compute_sha256(local_md_dir) + ".zip") zip_archive_success = compress_directory_to_zip(local_md_dir, archive_zip_path) if zip_archive_success == 0: logger.info("압축 성공") status_message = "\n\n**변환 완료 (압축 성공)**" else: logger.error("압축 실패") status_message = "\n\n**변환 완료 (압축 실패)**" time.sleep(0.5) progress(70, "마크다운 읽는 중...") md_path = os.path.join(local_md_dir, file_name + ".md") with open(md_path, 'r', encoding='utf-8') as f: txt_content = f.read() time.sleep(0.5) progress(90, "이미지 base64 변환 중...") md_content = replace_image_with_base64(txt_content, local_md_dir) time.sleep(0.5) progress(100, "변환 완료!") return md_content + status_message def to_markdown_comparison(file_a, file_b, end_pages, is_ocr, layout_mode, formula_enable, table_enable, language, progress=gr.Progress(track_tqdm=False)): """ 두 개의 파일을 변환하여 A/B 비교용 마크다운 생성. 각 파일은 "문서 A", "문서 B" 헤더로 구분되며, 두 파일 모두 업로드된 경우 추가로 비교 분석 지시사항을 포함한다. """ combined_md = "" if file_a is not None: combined_md += "### 문서 A\n" md_a = to_markdown(file_a, end_pages, is_ocr, layout_mode, formula_enable, table_enable, language, progress=progress) combined_md += md_a + "\n" if file_b is not None: combined_md += "### 문서 B\n" md_b = to_markdown(file_b, end_pages, is_ocr, layout_mode, formula_enable, table_enable, language, progress=progress) combined_md += md_b + "\n" if file_a is not None and file_b is not None: combined_md += "### 비교 분석:\n두 문서의 차이점, 장단점 및 주요 내용을 비교 분석하십시오.\n" return combined_md def init_model(): """ magic-pdf 모델 초기화 """ from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton try: model_manager = ModelSingleton() txt_model = model_manager.get_model(False, False) logger.info("txt_model init final") ocr_model = model_manager.get_model(True, False) logger.info("ocr_model init final") return 0 except Exception as e: logger.exception(e) return -1 model_init = init_model() logger.info(f"model_init: {model_init}") ############################### # 언어 목록 ############################### latin_lang = [ 'af','az','bs','cs','cy','da','de','es','et','fr','ga','hr','hu','id','is','it','ku', 'la','lt','lv','mi','ms','mt','nl','no','oc','pi','pl','pt','ro','rs_latin','sk','sl', 'sq','sv','sw','tl','tr','uz','vi','french','german' ] arabic_lang = ['ar','fa','ug','ur'] cyrillic_lang = ['ru','rs_cyrillic','be','bg','uk','mn','abq','ady','kbd','ava','dar','inh','che','lbe','lez','tab'] devanagari_lang = ['hi','mr','ne','bh','mai','ang','bho','mah','sck','new','gom','sa','bgc'] other_lang = ['ch','en','korean','japan','chinese_cht','ta','te','ka'] all_lang = ['', 'auto'] all_lang.extend([*other_lang, *latin_lang, *arabic_lang, *cyrillic_lang, *devanagari_lang]) ############################### # (1) PDF Chat 용 LLM 관련 ############################### import google.generativeai as genai from gradio import ChatMessage from typing import Iterator GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-1219") def format_chat_history(messages: list) -> list: """ Gemini가 이해할 수 있는 (role, parts[]) 형식으로 변환 """ formatted_history = [] for message in messages: if not (message.role == "assistant" and hasattr(message, "metadata")): formatted_history.append({ "role": "user" if message.role == "user" else "assistant", "parts": [message.content] }) return formatted_history def convert_chat_messages_to_gradio_format(messages): """ ChatMessage list -> [(유저발화, 봇응답), ...] 형태로 변환 """ gradio_chat = [] user_text, assistant_text = None, None for msg in messages: if msg.role == "user": if user_text is not None or assistant_text is not None: gradio_chat.append((user_text or "", assistant_text or "")) user_text = msg.content assistant_text = None else: if user_text is None: user_text = "" if assistant_text is None: assistant_text = msg.content else: assistant_text += msg.content if user_text is not None or assistant_text is not None: gradio_chat.append((user_text or "", assistant_text or "")) return gradio_chat def stream_gemini_response(user_message: str, messages: list) -> Iterator[list]: """ Gemini 응답을 스트리밍 형태로 출력 (user_message 공백 시 임시 문구 사용) """ if not user_message.strip(): user_message = "...(No content from user)..." try: print(f"\n=== [Gemini] New Request ===\nUser message: '{user_message}'") chat_history = format_chat_history(messages) chat = model.start_chat(history=chat_history) response = chat.send_message(user_message, stream=True) thought_buffer = "" response_buffer = "" thinking_complete = False # "Thinking" 역할 추가 messages.append( ChatMessage( role="assistant", content="", metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) ) yield convert_chat_messages_to_gradio_format(messages) for chunk in response: parts = chunk.candidates[0].content.parts current_chunk = parts[0].text if len(parts) == 2 and not thinking_complete: thought_buffer += current_chunk messages[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield convert_chat_messages_to_gradio_format(messages) response_buffer = parts[1].text messages.append(ChatMessage(role="assistant", content=response_buffer)) thinking_complete = True elif thinking_complete: response_buffer += current_chunk messages[-1] = ChatMessage(role="assistant", content=response_buffer) else: thought_buffer += current_chunk messages[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield convert_chat_messages_to_gradio_format(messages) print(f"\n=== [Gemini] Final Response ===\n{response_buffer}") except Exception as e: print(f"\n=== [Gemini] Error ===\n{str(e)}") messages.append(ChatMessage(role="assistant", content=f"오류가 발생했습니다: {str(e)}")) yield convert_chat_messages_to_gradio_format(messages) def user_message(msg: str, history: list, doc_text: str) -> tuple[str, list]: """ 문서 변환 결과(문자열)와 함께 질의를 결합하여 history에 추가 """ if doc_text.strip(): user_query = f"다음 문서를 참고하여 답변:\n\n{doc_text}\n\n질문: {msg}" else: user_query = msg history.append(ChatMessage(role="user", content=user_query)) return "", history def reset_states(file_a, file_b): """ 새 파일 업로드 시 chat_history, md_state, chatbot을 초기화 """ return [], "", "" ############################### # UI 통합 ############################### if __name__ == "__main__": with gr.Blocks(title="Compare RAG CHAT", css=create_css()) as demo: with gr.Tab("PDF Chat with LLM"): gr.HTML("""
두 개의 PDF/이미지/텍스트/CSV 파일을 업로드하여 A/B 비교 후, 추론 LLM과 대화합니다.
한 파일만 업로드하면 해당 파일로 분석합니다.