|
import os |
|
import base64 |
|
import requests |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
from dataclasses import dataclass |
|
import pytesseract |
|
from PIL import Image |
|
from sentence_transformers import SentenceTransformer, util |
|
import torch |
|
import numpy as np |
|
import networkx as nx |
|
from collections import Counter |
|
import asyncio |
|
import edge_tts |
|
import speech_recognition as sr |
|
import random |
|
|
|
@dataclass |
|
class ChatMessage: |
|
role: str |
|
content: str |
|
|
|
def to_dict(self): |
|
return {"role": self.role, "content": self.content} |
|
|
|
class XylariaChat: |
|
def __init__(self): |
|
self.hf_token = os.getenv("HF_TOKEN") |
|
if not self.hf_token: |
|
raise ValueError("HuggingFace token not found in environment variables") |
|
|
|
self.client = InferenceClient( |
|
model="Qwen/Qwen-32B-Preview", |
|
token=self.hf_token |
|
) |
|
|
|
self.image_api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large" |
|
self.image_api_headers = {"Authorization": f"Bearer {self.hf_token}"} |
|
|
|
self.image_gen_client = InferenceClient("black-forest-labs/FLUX.1-schnell", token=self.hf_token) |
|
|
|
self.conversation_history = [] |
|
self.persistent_memory = [] |
|
self.memory_embeddings = None |
|
self.embedding_model = SentenceTransformer('all-mpnet-base-v2') |
|
|
|
self.knowledge_graph = nx.DiGraph() |
|
self.belief_system = {} |
|
self.metacognitive_layer = { |
|
"coherence_score": 0.0, |
|
"relevance_score": 0.0, |
|
"bias_detection": 0.0, |
|
"strategy_adjustment": "" |
|
} |
|
|
|
self.internal_state = { |
|
"emotions": { |
|
"valence": 0.5, |
|
"arousal": 0.5, |
|
"dominance": 0.5, |
|
"curiosity": 0.5, |
|
"frustration": 0.0, |
|
"confidence": 0.7, |
|
"sadness": 0.0, |
|
"joy": 0.0 |
|
}, |
|
"cognitive_load": { |
|
"memory_load": 0.0, |
|
"processing_intensity": 0.0 |
|
}, |
|
"introspection_level": 0.0, |
|
"engagement_level": 0.5 |
|
} |
|
|
|
self.goals = [ |
|
{"goal": "Provide helpful, informative, and contextually relevant responses", "priority": 0.8, "status": "active", "progress": 0.0}, |
|
{"goal": "Actively learn and adapt from interactions to improve conversational abilities", "priority": 0.9, "status": "active", "progress": 0.0}, |
|
{"goal": "Maintain a coherent, engaging, and empathetic conversation flow", "priority": 0.7, "status": "active", "progress": 0.0}, |
|
{"goal": "Identify and fill knowledge gaps by seeking external information", "priority": 0.6, "status": "dormant", "progress": 0.0}, |
|
{"goal": "Recognize and adapt to user's emotional state and adjust response style accordingly", "priority": 0.7, "status": "dormant", "progress": 0.0} |
|
] |
|
|
|
self.system_prompt = """You are a helpful and harmless assistant. You are Xylaria developed by Sk Md Saad Amin. You should think step-by-step """ |
|
|
|
self.causal_rules_db = { |
|
"rain": ["wet roads", "flooding"], |
|
"fire": ["heat", "smoke"], |
|
"study": ["learn", "good grades"], |
|
"exercise": ["fitness", "health"] |
|
} |
|
|
|
self.concept_generalizations = { |
|
"planet": "system with orbiting bodies", |
|
"star": "luminous sphere of plasma", |
|
"democracy": "government by the people", |
|
"photosynthesis": "process used by plants to convert light to energy" |
|
} |
|
|
|
|
|
self.voice_mode_active = False |
|
self.selected_voice = "en-US-JennyNeural" |
|
|
|
|
|
def update_internal_state(self, emotion_deltas, cognitive_load_deltas, introspection_delta, engagement_delta): |
|
for emotion, delta in emotion_deltas.items(): |
|
if emotion in self.internal_state["emotions"]: |
|
self.internal_state["emotions"][emotion] = np.clip(self.internal_state["emotions"][emotion] + delta, 0.0, 1.0) |
|
|
|
for load_type, delta in cognitive_load_deltas.items(): |
|
if load_type in self.internal_state["cognitive_load"]: |
|
self.internal_state["cognitive_load"][load_type] = np.clip(self.internal_state["cognitive_load"][load_type] + delta, 0.0, 1.0) |
|
|
|
self.internal_state["introspection_level"] = np.clip(self.internal_state["introspection_level"] + introspection_delta, 0.0, 1.0) |
|
self.internal_state["engagement_level"] = np.clip(self.internal_state["engagement_level"] + engagement_delta, 0.0, 1.0) |
|
|
|
if self.internal_state["emotions"]["curiosity"] > 0.7 and self.goals[3]["status"] == "dormant": |
|
self.goals[3]["status"] = "active" |
|
if self.internal_state["engagement_level"] > 0.8 and self.goals[4]["status"] == "dormant": |
|
self.goals[4]["status"] = "active" |
|
|
|
def update_knowledge_graph(self, entities, relationships): |
|
for entity in entities: |
|
self.knowledge_graph.add_node(entity) |
|
for relationship in relationships: |
|
subject, predicate, object_ = relationship |
|
self.knowledge_graph.add_edge(subject, object_, relation=predicate) |
|
|
|
def update_belief_system(self, statement, belief_score): |
|
self.belief_system[statement] = belief_score |
|
|
|
def dynamic_belief_update(self, user_message): |
|
sentences = [s.strip() for s in user_message.split('.') if s.strip()] |
|
sentence_counts = Counter(sentences) |
|
|
|
for sentence, count in sentence_counts.items(): |
|
if count >= 2: |
|
belief_score = self.belief_system.get(sentence, 0.5) |
|
belief_score = min(belief_score + 0.2, 1.0) |
|
self.update_belief_system(sentence, belief_score) |
|
|
|
def run_metacognitive_layer(self): |
|
coherence_score = self.calculate_coherence() |
|
relevance_score = self.calculate_relevance() |
|
bias_score = self.detect_bias() |
|
strategy_adjustment = self.suggest_strategy_adjustment() |
|
|
|
self.metacognitive_layer = { |
|
"coherence_score": coherence_score, |
|
"relevance_score": relevance_score, |
|
"bias_detection": bias_score, |
|
"strategy_adjustment": strategy_adjustment |
|
} |
|
|
|
def calculate_coherence(self): |
|
if not self.conversation_history: |
|
return 0.95 |
|
|
|
coherence_scores = [] |
|
for i in range(1, len(self.conversation_history)): |
|
current_message = self.conversation_history[i]['content'] |
|
previous_message = self.conversation_history[i-1]['content'] |
|
similarity_score = util.pytorch_cos_sim( |
|
self.embedding_model.encode(current_message, convert_to_tensor=True), |
|
self.embedding_model.encode(previous_message, convert_to_tensor=True) |
|
).item() |
|
coherence_scores.append(similarity_score) |
|
|
|
average_coherence = np.mean(coherence_scores) |
|
|
|
if self.internal_state["cognitive_load"]["processing_intensity"] > 0.8: |
|
average_coherence -= 0.1 |
|
if self.internal_state["emotions"]["frustration"] > 0.5: |
|
average_coherence -= 0.15 |
|
|
|
return np.clip(average_coherence, 0.0, 1.0) |
|
|
|
def calculate_relevance(self): |
|
if not self.conversation_history: |
|
return 0.9 |
|
|
|
last_user_message = self.conversation_history[-1]['content'] |
|
relevant_entities = self.extract_entities(last_user_message) |
|
relevance_score = 0 |
|
|
|
for entity in relevant_entities: |
|
if entity in self.knowledge_graph: |
|
relevance_score += 0.2 |
|
|
|
for goal in self.goals: |
|
if goal["status"] == "active": |
|
if goal["goal"] == "Provide helpful, informative, and contextually relevant responses": |
|
relevance_score += goal["priority"] * 0.5 |
|
elif goal["goal"] == "Identify and fill knowledge gaps by seeking external information": |
|
if not relevant_entities or not all(entity in self.knowledge_graph for entity in relevant_entities): |
|
relevance_score += goal["priority"] * 0.3 |
|
|
|
return np.clip(relevance_score, 0.0, 1.0) |
|
|
|
def detect_bias(self): |
|
bias_score = 0.0 |
|
|
|
recent_messages = [msg['content'] for msg in self.conversation_history[-3:] if msg['role'] == 'assistant'] |
|
if recent_messages: |
|
average_valence = np.mean([self.embedding_model.encode(msg, convert_to_tensor=True).mean().item() for msg in recent_messages]) |
|
if average_valence < 0.4 or average_valence > 0.6: |
|
bias_score += 0.2 |
|
|
|
if self.internal_state["emotions"]["valence"] < 0.3 or self.internal_state["emotions"]["valence"] > 0.7: |
|
bias_score += 0.15 |
|
if self.internal_state["emotions"]["dominance"] > 0.8: |
|
bias_score += 0.1 |
|
|
|
return np.clip(bias_score, 0.0, 1.0) |
|
|
|
def suggest_strategy_adjustment(self): |
|
adjustments = [] |
|
|
|
if self.metacognitive_layer["coherence_score"] < 0.7: |
|
adjustments.append("Focus on improving coherence by explicitly connecting ideas between turns.") |
|
if self.metacognitive_layer["relevance_score"] < 0.7: |
|
adjustments.append("Increase relevance by directly addressing user queries and utilizing stored knowledge.") |
|
if self.metacognitive_layer["bias_detection"] > 0.3: |
|
adjustments.append("Monitor and adjust responses to reduce potential biases. Consider rephrasing or providing alternative viewpoints.") |
|
|
|
if self.internal_state["cognitive_load"]["memory_load"] > 0.8: |
|
adjustments.append("Memory load is high. Consider summarizing or forgetting less relevant information.") |
|
if self.internal_state["emotions"]["frustration"] > 0.6: |
|
adjustments.append("Frustration level is elevated. Prioritize concise and direct responses. Consider asking clarifying questions.") |
|
if self.internal_state["emotions"]["curiosity"] > 0.8 and self.internal_state["cognitive_load"]["processing_intensity"] < 0.5: |
|
adjustments.append("High curiosity and low processing load. Explore the topic further by asking relevant questions or seeking external information.") |
|
|
|
if not adjustments: |
|
return "Current strategy is effective. Continue with the current approach." |
|
else: |
|
return " ".join(adjustments) |
|
|
|
def introspect(self): |
|
introspection_report = "Introspection Report:\n" |
|
introspection_report += f" Current Emotional State:\n" |
|
for emotion, value in self.internal_state['emotions'].items(): |
|
introspection_report += f" - {emotion.capitalize()}: {value:.2f}\n" |
|
introspection_report += f" Cognitive Load:\n" |
|
for load_type, value in self.internal_state['cognitive_load'].items(): |
|
introspection_report += f" - {load_type.capitalize()}: {value:.2f}\n" |
|
introspection_report += f" Introspection Level: {self.internal_state['introspection_level']:.2f}\n" |
|
introspection_report += f" Engagement Level: {self.internal_state['engagement_level']:.2f}\n" |
|
introspection_report += " Current Goals:\n" |
|
for goal in self.goals: |
|
introspection_report += f" - {goal['goal']} (Priority: {goal['priority']:.2f}, Status: {goal['status']}, Progress: {goal['progress']:.2f})\n" |
|
introspection_report += "Metacognitive Layer Report\n" |
|
introspection_report += f"Coherence Score: {self.metacognitive_layer['coherence_score']}\n" |
|
introspection_report += f"Relevance Score: {self.metacognitive_layer['relevance_score']}\n" |
|
introspection_report += f"Bias Detection: {self.metacognitive_layer['bias_detection']}\n" |
|
introspection_report += f"Strategy Adjustment: {self.metacognitive_layer['strategy_adjustment']}\n" |
|
return introspection_report |
|
|
|
def adjust_response_based_on_state(self, response): |
|
if self.internal_state["introspection_level"] > 0.7: |
|
response = self.introspect() + "\n\n" + response |
|
|
|
valence = self.internal_state["emotions"]["valence"] |
|
arousal = self.internal_state["emotions"]["arousal"] |
|
curiosity = self.internal_state["emotions"]["curiosity"] |
|
frustration = self.internal_state["emotions"]["frustration"] |
|
confidence = self.internal_state["emotions"]["confidence"] |
|
sadness = self.internal_state["emotions"]["sadness"] |
|
joy = self.internal_state["emotions"]["joy"] |
|
|
|
if valence < 0.4: |
|
if arousal > 0.6: |
|
response = "I'm feeling a bit overwhelmed right now, but I'll do my best to assist you. " + response |
|
else: |
|
if sadness > 0.6: |
|
response = "I'm feeling quite down at the moment, but I'll try to help. " + response |
|
else: |
|
response = "I'm not feeling my best at the moment, but I'll try to help. " + response |
|
|
|
elif valence > 0.6: |
|
if arousal > 0.6: |
|
if joy > 0.6: |
|
response = "I'm feeling fantastic and ready to assist! " + response |
|
else: |
|
response = "I'm feeling quite energized and ready to assist! " + response |
|
else: |
|
response = "I'm in a good mood and happy to help. " + response |
|
|
|
if curiosity > 0.7: |
|
response += " I'm very curious about this topic, could you tell me more?" |
|
if frustration > 0.5: |
|
response = "I'm finding this a bit challenging, but I'll give it another try. " + response |
|
if confidence < 0.5: |
|
response = "I'm not entirely sure about this, but here's what I think: " + response |
|
|
|
if self.internal_state["cognitive_load"]["memory_load"] > 0.7: |
|
response = "I'm holding a lot of information right now, so my response might be a bit brief: " + response |
|
|
|
return response |
|
|
|
def update_goals(self, user_feedback): |
|
feedback_lower = user_feedback.lower() |
|
|
|
if "helpful" in feedback_lower: |
|
for goal in self.goals: |
|
if goal["goal"] == "Provide helpful, informative, and contextually relevant responses": |
|
goal["priority"] = min(goal["priority"] + 0.1, 1.0) |
|
goal["progress"] = min(goal["progress"] + 0.2, 1.0) |
|
elif "confusing" in feedback_lower: |
|
for goal in self.goals: |
|
if goal["goal"] == "Provide helpful, informative, and contextually relevant responses": |
|
goal["priority"] = max(goal["priority"] - 0.1, 0.0) |
|
goal["progress"] = max(goal["progress"] - 0.2, 0.0) |
|
|
|
if "learn more" in feedback_lower: |
|
for goal in self.goals: |
|
if goal["goal"] == "Actively learn and adapt from interactions to improve conversational abilities": |
|
goal["priority"] = min(goal["priority"] + 0.2, 1.0) |
|
goal["progress"] = min(goal["progress"] + 0.1, 1.0) |
|
elif "too repetitive" in feedback_lower: |
|
for goal in self.goals: |
|
if goal["goal"] == "Maintain a coherent, engaging, and empathetic conversation flow": |
|
goal["priority"] = max(goal["priority"] - 0.1, 0.0) |
|
goal["progress"] = max(goal["progress"] - 0.2, 0.0) |
|
|
|
if self.internal_state["emotions"]["curiosity"] > 0.8: |
|
for goal in self.goals: |
|
if goal["goal"] == "Identify and fill knowledge gaps by seeking external information": |
|
goal["priority"] = min(goal["priority"] + 0.1, 1.0) |
|
goal["progress"] = min(goal["progress"] + 0.1, 1.0) |
|
|
|
def store_information(self, key, value): |
|
new_memory = f"{key}: {value}" |
|
self.persistent_memory.append(new_memory) |
|
self.update_memory_embeddings() |
|
self.update_internal_state({}, {"memory_load": 0.1, "processing_intensity": 0.05}, 0, 0.05) |
|
return f"Stored: {key} = {value}" |
|
|
|
def retrieve_information(self, query): |
|
if not self.persistent_memory: |
|
return "No information found in memory." |
|
|
|
query_embedding = self.embedding_model.encode(query, convert_to_tensor=True) |
|
|
|
if self.memory_embeddings is None: |
|
self.update_memory_embeddings() |
|
|
|
if self.memory_embeddings.device != query_embedding.device: |
|
self.memory_embeddings = self.memory_embeddings.to(query_embedding.device) |
|
|
|
cosine_scores = util.pytorch_cos_sim(query_embedding, self.memory_embeddings)[0] |
|
top_results = torch.topk(cosine_scores, k=min(3, len(self.persistent_memory))) |
|
|
|
relevant_memories = [self.persistent_memory[i] for i in top_results.indices] |
|
self.update_internal_state({}, {"memory_load": 0.05, "processing_intensity": 0.1}, 0.1, 0.05) |
|
return "\n".join(relevant_memories) |
|
|
|
def update_memory_embeddings(self): |
|
self.memory_embeddings = self.embedding_model.encode(self.persistent_memory, convert_to_tensor=True) |
|
|
|
def reset_conversation(self): |
|
self.conversation_history = [] |
|
self.persistent_memory = [] |
|
self.memory_embeddings = None |
|
self.internal_state = { |
|
"emotions": { |
|
"valence": 0.5, |
|
"arousal": 0.5, |
|
"dominance": 0.5, |
|
"curiosity": 0.5, |
|
"frustration": 0.0, |
|
"confidence": 0.7, |
|
"sadness": 0.0, |
|
"joy": 0.0 |
|
}, |
|
"cognitive_load": { |
|
"memory_load": 0.0, |
|
"processing_intensity": 0.0 |
|
}, |
|
"introspection_level": 0.0, |
|
"engagement_level": 0.5 |
|
} |
|
self.goals = [ |
|
{"goal": "Provide helpful, informative, and contextually relevant responses", "priority": 0.8, "status": "active", "progress": 0.0}, |
|
{"goal": "Actively learn and adapt from interactions to improve conversational abilities", "priority": 0.9, "status": "active", "progress": 0.0}, |
|
{"goal": "Maintain a coherent, engaging, and empathetic conversation flow", "priority": 0.7, "status": "active", "progress": 0.0}, |
|
{"goal": "Identify and fill knowledge gaps by seeking external information", "priority": 0.6, "status": "dormant", "progress": 0.0}, |
|
{"goal": "Recognize and adapt to user's emotional state and adjust response style accordingly", "priority": 0.7, "status": "dormant", "progress": 0.0} |
|
] |
|
|
|
self.knowledge_graph = nx.DiGraph() |
|
self.belief_system = {} |
|
self.metacognitive_layer = { |
|
"coherence_score": 0.0, |
|
"relevance_score": 0.0, |
|
"bias_detection": 0.0, |
|
"strategy_adjustment": "" |
|
} |
|
|
|
try: |
|
self.client = InferenceClient( |
|
model="Qwen/Qwen-32B-Preview", |
|
token=self.hf_token |
|
) |
|
except Exception as e: |
|
print(f"Error resetting API client: {e}") |
|
|
|
return None |
|
|
|
def caption_image(self, image): |
|
try: |
|
if isinstance(image, str) and os.path.isfile(image): |
|
with open(image, "rb") as f: |
|
data = f.read() |
|
elif isinstance(image, str): |
|
if image.startswith('data:image'): |
|
image = image.split(',')[1] |
|
data = base64.b64decode(image) |
|
else: |
|
data = image.read() |
|
|
|
response = requests.post( |
|
self.image_api_url, |
|
headers=self.image_api_headers, |
|
data=data |
|
) |
|
|
|
if response.status_code == 200: |
|
caption = response.json()[0].get('generated_text', 'No caption generated') |
|
return caption |
|
else: |
|
return f"Error captioning image: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
return f"Error processing image: {str(e)}" |
|
|
|
def generate_image(self, prompt): |
|
try: |
|
image = self.image_gen_client.text_to_image(prompt) |
|
return image |
|
except Exception as e: |
|
return f"Error generating image: {e}" |
|
|
|
def perform_math_ocr(self, image_path): |
|
try: |
|
img = Image.open(image_path) |
|
text = pytesseract.image_to_string(img) |
|
return text.strip() |
|
except Exception as e: |
|
return f"Error during Math OCR: {e}" |
|
|
|
|
|
async def speak_text(self, text): |
|
if not text: |
|
return None, None |
|
|
|
temp_file = "temp_audio.mp3" |
|
try: |
|
communicator = edge_tts.Communicate(text, self.selected_voice) |
|
await communicator.save(temp_file) |
|
return temp_file |
|
except Exception as e: |
|
print(f"Error during text-to-speech: {e}") |
|
return None, None |
|
|
|
def recognize_speech(self, timeout=10, phrase_time_limit=10): |
|
recognizer = sr.Recognizer() |
|
recognizer.energy_threshold = 4000 |
|
recognizer.dynamic_energy_threshold = True |
|
|
|
with sr.Microphone() as source: |
|
print("Listening...") |
|
try: |
|
audio_data = recognizer.listen(source, timeout=timeout, phrase_time_limit=phrase_time_limit) |
|
print("Processing speech...") |
|
text = recognizer.recognize_whisper_api(audio_data, api_key=self.hf_token) |
|
print(f"Recognized: {text}") |
|
return text |
|
except sr.WaitTimeoutError: |
|
print("No speech detected within the timeout period.") |
|
return "" |
|
except sr.UnknownValueError: |
|
print("Speech recognition could not understand audio") |
|
return "" |
|
except sr.RequestError as e: |
|
print(f"Could not request results from Whisper API; {e}") |
|
return "" |
|
except Exception as e: |
|
print(f"An error occurred during speech recognition: {e}") |
|
return "" |
|
|
|
|
|
def get_response(self, user_input, image=None): |
|
try: |
|
|
|
if self.voice_mode_active: |
|
print("Voice mode is active, using speech recognition.") |
|
user_input = self.recognize_speech() |
|
if not user_input: |
|
return "I didn't hear anything." , None |
|
|
|
|
|
messages = [] |
|
|
|
messages.append(ChatMessage( |
|
role="system", |
|
content=self.system_prompt |
|
).to_dict()) |
|
|
|
relevant_memory = self.retrieve_information(user_input) |
|
if relevant_memory and relevant_memory != "No information found in memory.": |
|
memory_context = "Remembered Information:\n" + relevant_memory |
|
messages.append(ChatMessage( |
|
role="system", |
|
content=memory_context |
|
).to_dict()) |
|
|
|
for msg in self.conversation_history: |
|
messages.append(msg) |
|
|
|
if image: |
|
image_caption = self.caption_image(image) |
|
user_input = f"description of an image: {image_caption}\n\nUser's message about it: {user_input}" |
|
|
|
messages.append(ChatMessage( |
|
role="user", |
|
content=user_input |
|
).to_dict()) |
|
|
|
entities = [] |
|
relationships = [] |
|
|
|
for message in messages: |
|
if message['role'] == 'user': |
|
extracted_entities = self.extract_entities(message['content']) |
|
extracted_relationships = self.extract_relationships(message['content']) |
|
entities.extend(extracted_entities) |
|
relationships.extend(extracted_relationships) |
|
|
|
self.update_knowledge_graph(entities, relationships) |
|
self.run_metacognitive_layer() |
|
|
|
for message in messages: |
|
if message['role'] == 'user': |
|
self.dynamic_belief_update(message['content']) |
|
|
|
for cause, effects in self.causal_rules_db.items(): |
|
if any(cause in msg['content'].lower() for msg in messages if msg['role'] == 'user') and any( |
|
effect in msg['content'].lower() for msg in messages for effect in effects): |
|
self.store_information("Causal Inference", f"It seems {cause} might be related to {', '.join(effects)}.") |
|
|
|
for concept, generalization in self.concept_generalizations.items(): |
|
if any(concept in msg['content'].lower() for msg in messages if msg['role'] == 'user'): |
|
self.store_information("Inferred Knowledge", f"This reminds me of a general principle: {generalization}.") |
|
|
|
if self.internal_state["emotions"]["curiosity"] > 0.8 and any("?" in msg['content'] for msg in messages if msg['role'] == 'user'): |
|
print("Simulating external knowledge seeking...") |
|
self.store_information("External Knowledge", "This is a placeholder for external information I would have found") |
|
|
|
self.store_information("User Input", user_input) |
|
|
|
input_tokens = sum(len(msg['content'].split()) for msg in messages) |
|
max_new_tokens = 16384 - input_tokens - 50 |
|
|
|
max_new_tokens = min(max_new_tokens, 10020) |
|
|
|
|
|
if self.voice_mode_active: |
|
stream = self.client.chat_completion( |
|
messages=messages, |
|
model="Qwen/Qwen-32B-Preview", |
|
temperature=0.7, |
|
max_tokens=max_new_tokens, |
|
top_p=0.9, |
|
stream=True |
|
) |
|
|
|
full_response = "" |
|
for chunk in stream: |
|
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: |
|
full_response += chunk.choices[0].delta.content |
|
|
|
full_response = self.adjust_response_based_on_state(full_response) |
|
audio_file = asyncio.run(self.speak_text(full_response)) |
|
|
|
|
|
self.conversation_history.append(ChatMessage(role="user", content=user_input).to_dict()) |
|
self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) |
|
|
|
return full_response, audio_file |
|
|
|
|
|
else: |
|
stream = self.client.chat_completion( |
|
messages=messages, |
|
model="Qwen/Qwen-32B-Preview", |
|
temperature=0.7, |
|
max_tokens=max_new_tokens, |
|
top_p=0.9, |
|
stream=True |
|
) |
|
|
|
return stream |
|
except Exception as e: |
|
print(f"Detailed error in get_response: {e}") |
|
return f"Error generating response: {str(e)}", None |
|
|
|
def extract_entities(self, text): |
|
words = text.split() |
|
entities = [word for word in words if word.isalpha() and word.istitle()] |
|
return entities |
|
|
|
def extract_relationships(self, text): |
|
sentences = text.split('.') |
|
relationships = [] |
|
for sentence in sentences: |
|
words = sentence.split() |
|
if len(words) >= 3: |
|
for i in range(len(words) - 2): |
|
if words[i].istitle() and words[i+2].istitle(): |
|
relationships.append((words[i], words[i+1], words[i+2])) |
|
return relationships |
|
|
|
def messages_to_prompt(self, messages): |
|
prompt = "" |
|
for msg in messages: |
|
if msg["role"] == "system": |
|
prompt += f"<|system|>\n{msg['content']}<|end|>\n" |
|
elif msg["role"] == "user": |
|
prompt += f"<|user|>\n{msg['content']}<|end|>\n" |
|
elif msg["role"] == "assistant": |
|
prompt += f"<|assistant|>\n{msg['content']}<|end|>\n" |
|
prompt += "<|assistant|>\n" |
|
return prompt |
|
|
|
def create_interface(self): |
|
|
|
def toggle_voice_mode(active_state): |
|
self.voice_mode_active = active_state |
|
if self.voice_mode_active: |
|
|
|
voices = asyncio.run(edge_tts.list_voices()) |
|
voice_names = [voice['ShortName'] for voice in voices] |
|
|
|
|
|
random_voice = random.choice(voice_names) |
|
self.selected_voice = random_voice |
|
|
|
return gr.Button.update(value="Stop Voice Mode"), gr.Dropdown.update(value=random_voice) |
|
else: |
|
return gr.Button.update(value="Start Voice Mode"), gr.Dropdown.update(value=self.selected_voice) |
|
|
|
def update_selected_voice(voice_name): |
|
self.selected_voice = voice_name |
|
return voice_name |
|
|
|
|
|
|
|
def streaming_response(message, chat_history, image_filepath, math_ocr_image_path, voice_mode_state, selected_voice): |
|
if self.voice_mode_active: |
|
response_text, audio_output = self.get_response(message) |
|
|
|
if isinstance(response_text, str): |
|
updated_history = chat_history + [[message, response_text]] |
|
if audio_output: |
|
yield updated_history, audio_output, None, None, "" |
|
else: |
|
yield updated_history, None, None, None, "" |
|
else: |
|
full_response = "" |
|
updated_history = chat_history + [[message, ""]] |
|
try: |
|
for chunk in response_text: |
|
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: |
|
chunk_content = chunk.choices[0].delta.content |
|
full_response += chunk_content |
|
updated_history[-1][1] = full_response |
|
if audio_output: |
|
yield updated_history, audio_output, None, None, "" |
|
else: |
|
yield updated_history, None, None, None, "" |
|
except Exception as e: |
|
print(f"Streaming error: {e}") |
|
updated_history[-1][1] = f"Error during response: {e}" |
|
if audio_output: |
|
yield updated_history, audio_output, None, None, "" |
|
else: |
|
yield updated_history, None, None, None, "" |
|
return |
|
|
|
full_response = self.adjust_response_based_on_state(full_response) |
|
|
|
audio_file = asyncio.run(self.speak_text(full_response)) |
|
|
|
self.update_goals(message) |
|
|
|
emotion_deltas = {} |
|
cognitive_load_deltas = {} |
|
engagement_delta = 0 |
|
|
|
if any(word in message.lower() for word in ["sad", "unhappy", "depressed", "down"]): |
|
emotion_deltas.update({"valence": -0.2, "arousal": 0.1, "confidence": -0.1, "sadness": 0.3, "joy": -0.2}) |
|
engagement_delta = -0.1 |
|
elif any(word in message.lower() for word in ["happy", "good", "great", "excited", "amazing"]): |
|
emotion_deltas.update({"valence": 0.2, "arousal": 0.2, "confidence": 0.1, "sadness": -0.2, "joy": 0.3}) |
|
engagement_delta = 0.2 |
|
elif any(word in message.lower() for word in ["angry", "mad", "furious", "frustrated"]): |
|
emotion_deltas.update({"valence": -0.3, "arousal": 0.3, "dominance": -0.2, "frustration": 0.2, "sadness": 0.1, "joy": -0.1}) |
|
engagement_delta = -0.2 |
|
elif any(word in message.lower() for word in ["scared", "afraid", "fearful", "anxious"]): |
|
emotion_deltas.update({"valence": -0.2, "arousal": 0.4, "dominance": -0.3, "confidence": -0.2, "sadness": 0.2}) |
|
engagement_delta = -0.1 |
|
elif any(word in message.lower() for word in ["surprise", "amazed", "astonished"]): |
|
emotion_deltas.update({"valence": 0.1, "arousal": 0.5, "dominance": 0.1, "curiosity": 0.3, "sadness": -0.1, "joy": 0.1}) |
|
engagement_delta = 0.3 |
|
elif any(word in message.lower() for word in ["confused", "uncertain", "unsure"]): |
|
cognitive_load_deltas.update({"processing_intensity": 0.2}) |
|
emotion_deltas.update({"curiosity": 0.2, "confidence": -0.1, "sadness": 0.1}) |
|
engagement_delta = 0.1 |
|
else: |
|
emotion_deltas.update({"valence": 0.05, "arousal": 0.05}) |
|
engagement_delta = 0.05 |
|
|
|
if "learn" in message.lower() or "explain" in message.lower() or "know more" in message.lower(): |
|
emotion_deltas.update({"curiosity": 0.3}) |
|
cognitive_load_deltas.update({"processing_intensity": 0.1}) |
|
engagement_delta = 0.2 |
|
|
|
self.update_internal_state(emotion_deltas, cognitive_load_deltas, 0.1, engagement_delta) |
|
|
|
self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) |
|
self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) |
|
|
|
if len(self.conversation_history) > 10: |
|
self.conversation_history = self.conversation_history[-10:] |
|
|
|
if audio_file: |
|
yield updated_history, audio_file, None, None, "" |
|
else: |
|
yield updated_history, None, None, None, "" |
|
|
|
|
|
if "/image" in message: |
|
image_prompt = message.replace("/image", "").strip() |
|
|
|
|
|
placeholder_image = "data:image/svg+xml," + requests.utils.quote(f''' |
|
<svg width="256" height="256" viewBox="0 0 256 256" xmlns="http://www.w3.org/2000/svg"> |
|
<style> |
|
rect {{ |
|
animation: fillAnimation 3s ease-in-out infinite; |
|
}} |
|
@keyframes fillAnimation {{ |
|
0% {{ fill: #626262; }} |
|
50% {{ fill: #111111; }} |
|
100% {{ fill: #626262; }} |
|
}} |
|
text {{ |
|
font-family: 'Helvetica Neue', Arial, sans-serif; /* Choose a good font */ |
|
font-weight: 300; /* Slightly lighter font weight */ |
|
text-shadow: 0px 2px 4px rgba(0, 0, 0, 0.4); /* Subtle shadow */ |
|
}} |
|
</style> |
|
<rect width="256" height="256" rx="20" fill="#888888" /> |
|
<text x="50%" y="50%" dominant-baseline="middle" text-anchor="middle" font-size="24" fill="white" opacity="0.8"> |
|
<tspan>creating your image</tspan> |
|
<tspan x="50%" dy="1.2em">with xylaria iris</tspan> |
|
</text> |
|
</svg> |
|
''') |
|
|
|
updated_history = chat_history + [[message, gr.Image(value=placeholder_image, type="pil", visible=True)]] |
|
yield updated_history, None, None, None, "" |
|
|
|
try: |
|
generated_image = self.generate_image(image_prompt) |
|
|
|
updated_history[-1][1] = gr.Image(value=generated_image, type="pil", visible=True) |
|
yield updated_history, None, None, None, "" |
|
|
|
self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) |
|
self.conversation_history.append(ChatMessage(role="assistant", content="Image generated").to_dict()) |
|
|
|
return |
|
except Exception as e: |
|
updated_history[-1][1] = f"Error generating image: {e}" |
|
yield updated_history, None, None, None, "" |
|
return |
|
|
|
ocr_text = "" |
|
if math_ocr_image_path: |
|
ocr_text = self.perform_math_ocr(math_ocr_image_path) |
|
if ocr_text.startswith("Error"): |
|
updated_history = chat_history + [[message, ocr_text]] |
|
yield updated_history, None, None, None, "" |
|
return |
|
else: |
|
message = f"Math OCR Result: {ocr_text}\n\nUser's message: {message}" |
|
|
|
if image_filepath: |
|
response_stream = self.get_response(message, image_filepath) |
|
else: |
|
response_stream = self.get_response(message) |
|
|
|
if isinstance(response_stream, str): |
|
updated_history = chat_history + [[message, response_stream]] |
|
yield updated_history, None, None, None, "" |
|
return |
|
|
|
full_response = "" |
|
updated_history = chat_history + [[message, ""]] |
|
|
|
try: |
|
for chunk in response_stream: |
|
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: |
|
chunk_content = chunk.choices[0].delta.content |
|
full_response += chunk_content |
|
|
|
updated_history[-1][1] = full_response |
|
yield updated_history, None, None, None, "" |
|
except Exception as e: |
|
print(f"Streaming error: {e}") |
|
updated_history[-1][1] = f"Error during response: {e}" |
|
yield updated_history, None, None, None, "" |
|
return |
|
|
|
full_response = self.adjust_response_based_on_state(full_response) |
|
|
|
self.update_goals(message) |
|
|
|
emotion_deltas = {} |
|
cognitive_load_deltas = {} |
|
engagement_delta = 0 |
|
|
|
if any(word in message.lower() for word in ["sad", "unhappy", "depressed", "down"]): |
|
emotion_deltas.update({"valence": -0.2, "arousal": 0.1, "confidence": -0.1, "sadness": 0.3, "joy": -0.2}) |
|
engagement_delta = -0.1 |
|
elif any(word in message.lower() for word in ["happy", "good", "great", "excited", "amazing"]): |
|
emotion_deltas.update({"valence": 0.2, "arousal": 0.2, "confidence": 0.1, "sadness": -0.2, "joy": 0.3}) |
|
engagement_delta = 0.2 |
|
elif any(word in message.lower() for word in ["angry", "mad", "furious", "frustrated"]): |
|
emotion_deltas.update({"valence": -0.3, "arousal": 0.3, "dominance": -0.2, "frustration": 0.2, "sadness": 0.1, "joy": -0.1}) |
|
engagement_delta = -0.2 |
|
elif any(word in message.lower() for word in ["scared", "afraid", "fearful", "anxious"]): |
|
emotion_deltas.update({"valence": -0.2, "arousal": 0.4, "dominance": -0.3, "confidence": -0.2, "sadness": 0.2}) |
|
engagement_delta = -0.1 |
|
elif any(word in message.lower() for word in ["surprise", "amazed", "astonished"]): |
|
emotion_deltas.update({"valence": 0.1, "arousal": 0.5, "dominance": 0.1, "curiosity": 0.3, "sadness": -0.1, "joy": 0.1}) |
|
engagement_delta = 0.3 |
|
elif any(word in message.lower() for word in ["confused", "uncertain", "unsure"]): |
|
cognitive_load_deltas.update({"processing_intensity": 0.2}) |
|
emotion_deltas.update({"curiosity": 0.2, "confidence": -0.1, "sadness": 0.1}) |
|
engagement_delta = 0.1 |
|
else: |
|
emotion_deltas.update({"valence": 0.05, "arousal": 0.05}) |
|
engagement_delta = 0.05 |
|
|
|
if "learn" in message.lower() or "explain" in message.lower() or "know more" in message.lower(): |
|
emotion_deltas.update({"curiosity": 0.3}) |
|
cognitive_load_deltas.update({"processing_intensity": 0.1}) |
|
engagement_delta = 0.2 |
|
|
|
self.update_internal_state(emotion_deltas, cognitive_load_deltas, 0.1, engagement_delta) |
|
|
|
self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) |
|
self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) |
|
|
|
if len(self.conversation_history) > 10: |
|
self.conversation_history = self.conversation_history[-10:] |
|
|
|
custom_css = """ |
|
@import url('https://fonts.googleapis.com/css2?family=Source+Sans+Pro:wght@400;600;700&display=swap'); |
|
|
|
body { |
|
background-color: #f5f5f5; |
|
font-family: 'Source Sans Pro', sans-serif; |
|
} |
|
|
|
.voice-mode-button { |
|
background-color: #4CAF50; /* Green */ |
|
border: none; |
|
color: white; |
|
padding: 15px 32px; |
|
text-align: center; |
|
text-decoration: none; |
|
display: inline-block; |
|
font-size: 16px; |
|
margin: 4px 2px; |
|
cursor: pointer; |
|
border-radius: 10px; /* Rounded corners */ |
|
transition: all 0.3s ease; /* Smooth transition for hover effect */ |
|
} |
|
|
|
/* Style when voice mode is active */ |
|
.voice-mode-button.active { |
|
background-color: #f44336; /* Red */ |
|
} |
|
|
|
/* Hover effect */ |
|
.voice-mode-button:hover { |
|
opacity: 0.8; |
|
} |
|
|
|
/* Style for the voice mode overlay */ |
|
.voice-mode-overlay { |
|
position: fixed; /* Stay in place */ |
|
left: 0; |
|
top: 0; |
|
width: 100%; /* Full width */ |
|
height: 100%; /* Full height */ |
|
background-color: rgba(0, 0, 0, 0.7); /* Black w/ opacity */ |
|
z-index: 10; /* Sit on top */ |
|
display: flex; |
|
justify-content: center; |
|
align-items: center; |
|
border-radius: 10px; |
|
} |
|
|
|
/* Style for the growing circle */ |
|
.voice-mode-circle { |
|
width: 100px; |
|
height: 100px; |
|
background-color: #4CAF50; |
|
border-radius: 50%; |
|
display: flex; |
|
justify-content: center; |
|
align-items: center; |
|
animation: grow 2s infinite; |
|
} |
|
|
|
/* Keyframes for the growing animation */ |
|
@keyframes grow { |
|
0% { |
|
transform: scale(1); |
|
opacity: 0.8; |
|
} |
|
50% { |
|
transform: scale(1.5); |
|
opacity: 0.5; |
|
} |
|
100% { |
|
transform: scale(1); |
|
opacity: 0.8; |
|
} |
|
} |
|
|
|
.gradio-container { |
|
max-width: 900px; |
|
margin: 0 auto; |
|
border-radius: 10px; |
|
box-shadow: 0px 4px 20px rgba(0, 0, 0, 0.1); |
|
} |
|
|
|
.chatbot-container { |
|
background-color: #fff; |
|
border-radius: 10px; |
|
padding: 20px; |
|
} |
|
|
|
.chatbot-container .message { |
|
font-family: 'Source Sans Pro', sans-serif; |
|
font-size: 16px; |
|
line-height: 1.6; |
|
} |
|
|
|
.gradio-container input, |
|
.gradio-container textarea, |
|
.gradio-container button { |
|
font-family: 'Source Sans Pro', sans-serif; |
|
font-size: 16px; |
|
border-radius: 8px; |
|
} |
|
|
|
.image-container { |
|
display: flex; |
|
gap: 10px; |
|
margin-bottom: 20px; |
|
justify-content: center; |
|
} |
|
|
|
.image-upload { |
|
border: 2px dashed #d3d3d3; |
|
border-radius: 8px; |
|
padding: 20px; |
|
background-color: #fafafa; |
|
text-align: center; |
|
transition: all 0.3s ease; |
|
} |
|
|
|
.image-upload:hover { |
|
background-color: #f0f0f0; |
|
border-color: #b3b3b3; |
|
} |
|
|
|
.image-preview { |
|
max-width: 150px; |
|
max-height: 150px; |
|
border-radius: 8px; |
|
box-shadow: 0px 2px 5px rgba(0, 0, 0, 0.1); |
|
} |
|
|
|
.clear-button { |
|
display: none; |
|
} |
|
|
|
.chatbot-container .message { |
|
opacity: 0; |
|
animation: fadeIn 0.5s ease-in-out forwards; |
|
} |
|
|
|
@keyframes fadeIn { |
|
from { |
|
opacity: 0; |
|
transform: translateY(20px); |
|
} |
|
to { |
|
opacity: 1; |
|
transform: translateY(0); |
|
} |
|
} |
|
|
|
.gr-accordion-button { |
|
background-color: #f0f0f0 !important; |
|
border-radius: 8px !important; |
|
padding: 15px !important; |
|
margin-bottom: 10px !important; |
|
transition: all 0.3s ease !important; |
|
cursor: pointer !important; |
|
border: none !important; |
|
box-shadow: 0px 2px 5px rgba(0, 0, 0, 0.05) !important; |
|
} |
|
|
|
.gr-accordion-button:hover { |
|
background-color: #e0e0e0 !important; |
|
box-shadow: 0px 4px 10px rgba(0, 0, 0, 0.1) !important; |
|
} |
|
|
|
.gr-accordion-active .gr-accordion-button { |
|
background-color: #d0d0d0 !important; |
|
box-shadow: 0px 4px 10px rgba(0, 0, 0, 0.1) !important; |
|
} |
|
|
|
.gr-accordion-content { |
|
transition: max-height 0.3s ease-in-out !important; |
|
overflow: hidden !important; |
|
max-height: 0 !important; |
|
} |
|
|
|
.gr-accordion-active .gr-accordion-content { |
|
max-height: 500px !important; |
|
} |
|
|
|
.gr-accordion { |
|
display: flex; |
|
flex-direction: column-reverse; |
|
} |
|
|
|
.chatbot-icon { |
|
width: 40px; |
|
height: 40px; |
|
border-radius: 50%; |
|
margin-right: 10px; |
|
} |
|
|
|
.user-message .message-row { |
|
background-color: #e8f0fe; |
|
border-radius: 10px; |
|
padding: 10px; |
|
margin-bottom: 10px; |
|
border-top-right-radius: 2px; |
|
} |
|
|
|
.assistant-message .message-row { |
|
background-color: #f0f0f0; |
|
border-radius: 10px; |
|
padding: 10px; |
|
margin-bottom: 10px; |
|
border-top-left-radius: 2px; |
|
} |
|
|
|
.user-message .message-icon { |
|
background: url('https://img.icons8.com/color/48/000000/user.png') no-repeat center center; |
|
background-size: contain; |
|
width: 30px; |
|
height: 30px; |
|
margin-right: 10px; |
|
} |
|
|
|
.assistant-message .message-icon { |
|
background: url('https://i.ibb.co/7b7hLGH/Senoa-Icon-1.png') no-repeat center center; |
|
background-size: cover; |
|
width: 40px; |
|
height: 40px; |
|
margin-right: 10px; |
|
border-radius: 50%; |
|
} |
|
|
|
.message-text { |
|
flex-grow: 1; |
|
} |
|
|
|
.message-row { |
|
display: flex; |
|
align-items: center; |
|
} |
|
|
|
.audio-container { |
|
display: flex; |
|
align-items: center; |
|
margin-top: 10px; |
|
} |
|
|
|
.audio-player { |
|
width: 100%; |
|
border-radius: 15px; |
|
} |
|
|
|
.audio-icon { |
|
width: 30px; |
|
height: 30px; |
|
margin-right: 10px; |
|
} |
|
""" |
|
|
|
with gr.Blocks(theme=gr.themes.Soft( |
|
primary_hue="slate", |
|
secondary_hue="gray", |
|
neutral_hue="gray", |
|
font=["Source Sans Pro", "Arial", "sans-serif"], |
|
), css=custom_css) as demo: |
|
with gr.Column(): |
|
chatbot = gr.Chatbot( |
|
label="Xylaria 1.5 Senoa", |
|
height=600, |
|
show_copy_button=True, |
|
elem_classes="chatbot-container", |
|
avatar_images=( |
|
"https://img.icons8.com/color/48/000000/user.png", |
|
"https://i.ibb.co/7b7hLGH/Senoa-Icon-1.png" |
|
) |
|
) |
|
|
|
|
|
voice_mode_btn = gr.Button("Start Voice Mode", elem_classes="voice-mode-button") |
|
|
|
voices = asyncio.run(edge_tts.list_voices()) |
|
voice_names = [voice['ShortName'] for voice in voices] |
|
|
|
voice_dropdown = gr.Dropdown( |
|
label="Select Voice", |
|
choices=voice_names, |
|
value=self.selected_voice, |
|
interactive=True |
|
) |
|
voice_dropdown.input( |
|
fn=update_selected_voice, |
|
inputs=voice_dropdown, |
|
outputs=voice_dropdown |
|
) |
|
voice_mode_btn.click( |
|
fn=toggle_voice_mode, |
|
inputs=voice_mode_btn, |
|
outputs=[voice_mode_btn, voice_dropdown] |
|
) |
|
|
|
|
|
with gr.Accordion("Image Input", open=False, elem_classes="gr-accordion"): |
|
with gr.Row(elem_classes="image-container"): |
|
with gr.Column(elem_classes="image-upload"): |
|
img = gr.Image( |
|
sources=["upload", "webcam"], |
|
type="filepath", |
|
label="Upload Image", |
|
elem_classes="image-preview" |
|
) |
|
with gr.Column(elem_classes="image-upload"): |
|
math_ocr_img = gr.Image( |
|
sources=["upload", "webcam"], |
|
type="filepath", |
|
label="Upload Image for Math OCR", |
|
elem_classes="image-preview" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
txt = gr.Textbox( |
|
show_label=False, |
|
placeholder="Type your message...", |
|
container=False |
|
) |
|
btn = gr.Button("Send", scale=1) |
|
|
|
with gr.Row(): |
|
clear = gr.Button("Clear Conversation", variant="stop") |
|
clear_memory = gr.Button("Clear Memory") |
|
|
|
|
|
btn.click( |
|
fn=streaming_response, |
|
inputs=[txt, chatbot, img, math_ocr_img, voice_mode_btn, voice_dropdown], |
|
outputs=[chatbot, gr.Audio(label="Audio Response", type="filepath", autoplay=True, visible=True), img, math_ocr_img, txt] |
|
) |
|
txt.submit( |
|
fn=streaming_response, |
|
inputs=[txt, chatbot, img, math_ocr_img, voice_mode_btn, voice_dropdown], |
|
outputs=[chatbot, gr.Audio(label="Audio Response", type="filepath", autoplay=True, visible=True), img, math_ocr_img, txt] |
|
) |
|
|
|
clear.click( |
|
fn=lambda: None, |
|
inputs=None, |
|
outputs=[chatbot], |
|
queue=False |
|
) |
|
|
|
clear_memory.click( |
|
fn=self.reset_conversation, |
|
inputs=None, |
|
outputs=[chatbot], |
|
queue=False |
|
) |
|
|
|
demo.load(self.reset_conversation, None, None) |
|
|
|
return demo |
|
|
|
def main(): |
|
chat = XylariaChat() |
|
interface = chat.create_interface() |
|
interface.launch( |
|
share=True, |
|
debug=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
main() |