import os import gradio as gr import aiohttp import asyncio import json from datasets import Dataset, DatasetDict, load_dataset from huggingface_hub import HfFolder import subprocess def upgrade_pip(): try: subprocess.check_call([os.sys.executable, "-m", "pip", "install", "--upgrade", "pip"]) print("pip 升級成功") except subprocess.CalledProcessError: print("pip 升級失敗") # 呼叫升級函數 upgrade_pip() # 從環境變量中獲取 Hugging Face API 令牌和其他配置 HF_API_TOKEN = os.environ.get("Feedback_API_TOKEN") LLM_API = os.environ.get("LLM_API") LLM_URL = os.environ.get("LLM_URL") USER_ID = "HuggingFace Space" DATASET_NAME = os.environ.get("DATASET_NAME") # 確保令牌不為空 if HF_API_TOKEN is None: raise ValueError("HF_API_TOKEN 環境變量未設置。請在 Hugging Face Space 的設置中添加該環境變量。") # 設置 Hugging Face API 令牌 HfFolder.save_token(HF_API_TOKEN) # 定義數據集特徵 features = { "user_input": "string", "response": "string", "feedback_type": "string", "improvement": "string" } # 加載或創建數據集 try: dataset = load_dataset(DATASET_NAME) except: dataset = DatasetDict({ "feedback": Dataset.from_dict({ "user_input": [], "response": [], "feedback_type": [], "improvement": [] }) }) async def send_chat_message(LLM_URL, LLM_API, user_input): payload = { "inputs": {}, "query": user_input, "response_mode": "streaming", "conversation_id": "", "user": USER_ID, } print("Sending chat message payload:", payload) full_response = [] buffer = b"" # 使用字節緩衝區來處理不完整的行 async with aiohttp.ClientSession() as session: try: async with session.post( url=f"{LLM_URL}/chat-messages", headers={"Authorization": f"Bearer {LLM_API}"}, json=payload, timeout=aiohttp.ClientTimeout(total=60) ) as response: if response.status != 200: error_text = await response.text() print(f"Error: {response.status}, Body: {error_text}") return f"Error: {response.status}" # 手動迭代數據塊 (chunk) async for chunk in response.content.iter_any(): buffer += chunk # 嘗試按行分割緩衝區中的數據 while b'\n' in buffer: line, buffer = buffer.split(b'\n', 1) line_str = line.decode('utf-8').strip() if not line_str or "data: " not in line_str: continue try: # 檢查是否是我們要的答案,而不是工作流日誌 # 這些日誌訊息 (e.g., event: "node_started") 就是造成問題的元兇 # 我們只關心包含 "answer" 的 message_chunk if '"event": "message_chunk"' in line_str or '"answer"' in line_str: print("Received line:", line_str) # 只打印有用的行 data_part = line_str.split("data: ", 1)[1] data = json.loads(data_part) if "answer" in data: full_response.append(data["answer"]) # 如果是 Dify API,答案可能在 data['data']['answer'] elif "data" in data and "answer" in data["data"]: full_response.append(data["data"]["answer"]) except (IndexError, json.JSONDecodeError) as e: # 忽略無法解析的行,它們很可能是日誌 # print(f"Skipping unparsable line: {line_str}, error: {e}") pass if full_response: return ''.join(full_response).strip() else: # 如果循環結束都沒有收到答案,返回提示 return "抱歉,我無法從 API 取得有效的回覆。" except Exception as e: print(f"Exception: {e}") return f"Exception: {e}" async def handle_input_async(user_input): print(f"Handling input: {user_input}") chat_response = await send_chat_message(LLM_URL, LLM_API, user_input) print("Chat response:", chat_response) # Debug information return chat_response def handle_input(user_input): print(f"Handling input synchronously: {user_input}") return asyncio.run(handle_input_async(user_input)) def save_feedback(user_input, response, feedback_type, improvement): feedback = { "user_input": user_input, "response": response, "feedback_type": feedback_type, "improvement": improvement } print(f"Saving feedback: {feedback}") # Append to the dataset new_data = { "user_input": [user_input], "response": [response], "feedback_type": [feedback_type], "improvement": [improvement] } global dataset dataset["feedback"] = Dataset.from_dict({ "user_input": dataset["feedback"]["user_input"] + [user_input], "response": dataset["feedback"]["response"] + [response], "feedback_type": dataset["feedback"]["feedback_type"] + [feedback_type], "improvement": dataset["feedback"]["improvement"] + [improvement] }) dataset.push_to_hub(DATASET_NAME) def handle_feedback(response, feedback_type, improvement): global last_user_input save_feedback(last_user_input, response, feedback_type, improvement) return "感謝您的反饋!" # 讀取並顯示反饋內容的函數 def show_feedback(): try: feedbacks = dataset["feedback"].to_pandas().to_dict(orient="records") print(f"Feedbacks: {feedbacks}") # Debug information return feedbacks except Exception as e: print(f"Error: {e}") # Debug information return {"error": str(e)} TITLE = """