Spaces:
Running
Running
File size: 14,564 Bytes
f36ab64 9700f95 640b1c8 b953016 f36ab64 640b1c8 e87abff 9700f95 0739c8b 9700f95 640b1c8 b953016 640b1c8 9700f95 640b1c8 f36ab64 b953016 9700f95 640b1c8 f36ab64 9700f95 0739c8b 9700f95 0739c8b be32fd8 0739c8b f36ab64 640b1c8 f36ab64 9700f95 f36ab64 be32fd8 9700f95 be32fd8 f36ab64 be32fd8 f36ab64 be32fd8 f36ab64 be32fd8 f36ab64 be32fd8 9700f95 f36ab64 be32fd8 f36ab64 9700f95 f36ab64 9700f95 f36ab64 9700f95 f36ab64 be32fd8 9700f95 f36ab64 9700f95 f36ab64 be32fd8 f36ab64 be32fd8 f36ab64 be32fd8 b953016 f36ab64 b953016 f36ab64 b953016 f36ab64 be32fd8 9700f95 f36ab64 9700f95 f36ab64 9700f95 f36ab64 be32fd8 f36ab64 b953016 be32fd8 f36ab64 be32fd8 b953016 be32fd8 b953016 f36ab64 9700f95 be32fd8 9700f95 f36ab64 9700f95 f36ab64 9700f95 f36ab64 be32fd8 f36ab64 9700f95 f36ab64 9700f95 f36ab64 be32fd8 f36ab64 9700f95 f36ab64 9700f95 f36ab64 9700f95 f36ab64 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
from typing import List, Dict, Optional, Tuple
import uuid
from .excel_aware_rag import ExcelAwareRAGAgent
from .enhanced_context_manager import EnhancedContextManager
from ..llms.base_llm import BaseLLM
from src.embeddings.base_embedding import BaseEmbedding
from src.vectorstores.base_vectorstore import BaseVectorStore
from src.utils.conversation_manager import ConversationManager
from src.db.mongodb_store import MongoDBStore
from src.models.rag import RAGResponse
from src.utils.logger import logger
class RAGAgent(ExcelAwareRAGAgent):
def __init__(
self,
llm: BaseLLM,
embedding: BaseEmbedding,
vector_store: BaseVectorStore,
mongodb: MongoDBStore,
max_history_tokens: int = 4000,
max_history_messages: int = 10
):
"""Initialize RAG Agent with enhanced context management"""
super().__init__() # Initialize ExcelAwareRAGAgent
self.llm = llm
self.embedding = embedding
self.vector_store = vector_store
self.mongodb = mongodb
self.conversation_manager = ConversationManager(
max_tokens=max_history_tokens,
max_messages=max_history_messages
)
# Add enhanced context management while preserving existing functionality
self.context_manager = EnhancedContextManager()
logger.info("RAGAgent initialized with enhanced context management")
async def generate_response(
self,
query: str,
conversation_id: Optional[str],
temperature: float,
max_tokens: Optional[int] = None,
context_docs: Optional[List[str]] = None,
stream: bool = False,
custom_roles: Optional[List[Dict[str, str]]] = None
) -> RAGResponse:
"""
Generate a response with comprehensive context and role management
Args:
query (str): User query
conversation_id (Optional[str]): Conversation identifier
temperature (float): LLM temperature for response generation
max_tokens (Optional[int]): Maximum tokens for response
context_docs (Optional[List[str]]): Pre-retrieved context documents
stream (bool): Whether to stream the response
custom_roles (Optional[List[Dict[str, str]]]): Custom role instructions
Returns:
RAGResponse: Generated response with context and metadata
"""
try:
logger.info(f"Generating response for query: {query}")
# Apply custom roles if provided
if custom_roles:
for role in custom_roles:
# Modify query or context based on role
if role.get('name') == 'introduction_specialist':
query += " Provide a concise, welcoming response."
elif role.get('name') == 'knowledge_based_specialist':
query += " Ensure response is precise and directly from available knowledge."
# Introduction Handling
is_introduction = (
"wants support" in query and
"This is Introduction" in query and
("A new user with name:" in query or "An old user with name:" in query)
)
if is_introduction:
logger.info("Processing introduction message")
welcome_message = self._handle_contact_query(query)
return RAGResponse(
response=welcome_message,
context_docs=[],
sources=[],
scores=None
)
# Conversation History Processing
history = []
last_context = None
if conversation_id:
logger.info(f"Retrieving conversation history for ID: {conversation_id}")
history = await self.mongodb.get_recent_messages(
conversation_id,
limit=self.conversation_manager.max_messages
)
# Process history for conversation manager
history = self.conversation_manager.get_relevant_history(
messages=history,
current_query=query
)
# Process in enhanced context manager
for msg in history:
self.context_manager.process_turn(
msg.get('query', ''),
msg.get('response', '')
)
# Get last context if available
if history and history[-1].get('response'):
last_context = history[-1]['response']
# Query Enhancement
enhanced_query = self.context_manager.enhance_query(query)
# Manual Pronoun Handling Fallback
if enhanced_query == query:
pronoun_map = {
'his': 'he',
'her': 'she',
'their': 'they'
}
words = query.lower().split()
for pronoun, replacement in pronoun_map.items():
if pronoun in words:
# Try to use last context
if last_context:
self.context_manager.record_last_context(last_context)
enhanced_query = self.context_manager.enhance_query(query)
break
logger.info(f"Enhanced query: {enhanced_query}")
# Context Retrieval
if not context_docs:
logger.info("Retrieving context for enhanced query")
context_docs, sources, scores = await self.retrieve_context(
enhanced_query,
conversation_history=history
)
else:
sources = []
scores = None
# Context Fallback Mechanism
if not context_docs:
# If no context and last context exists, use it
if last_context:
context_docs = [last_context]
sources = [{"source": "previous_context"}]
scores = [1.0]
else:
logger.info("No relevant context found")
return RAGResponse(
response="Information about this is not available, do you want to inquire about something else?",
context_docs=[],
sources=[],
scores=None
)
# Excel-specific Content Handling
has_excel_content = any('Sheet:' in doc for doc in context_docs)
if has_excel_content:
logger.info("Processing Excel-specific content")
try:
context_docs = self._process_excel_context(context_docs, enhanced_query)
except Exception as e:
logger.warning(f"Error processing Excel context: {str(e)}")
# Prompt Generation with Conversation History
prompt = self.conversation_manager.generate_prompt_with_history(
current_query=enhanced_query,
history=history,
context_docs=context_docs
)
# Streaming Response Generation
if stream:
# TODO: Implement actual streaming logic
# This is a placeholder and needs proper implementation
logger.warning("Streaming not fully implemented")
# Standard Response Generation
response = self.llm.generate(
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens
)
# Response Cleaning
cleaned_response = self._clean_response(response)
# Excel Response Enhancement
if has_excel_content:
try:
enhanced_response = await self.enhance_excel_response(
query=enhanced_query,
response=cleaned_response,
context_docs=context_docs
)
if enhanced_response:
cleaned_response = enhanced_response
except Exception as e:
logger.warning(f"Error enhancing Excel response: {str(e)}")
# Context Tracking
self.context_manager.process_turn(query, cleaned_response)
# Metadata Generation
metadata = {
'llm_provider': getattr(self.llm, 'model_name', 'unknown'),
'temperature': temperature,
'conversation_id': conversation_id,
'context_sources': sources,
'has_excel_content': has_excel_content
}
logger.info("Successfully generated response")
return RAGResponse(
response=cleaned_response,
context_docs=context_docs,
sources=sources,
scores=scores,
metadata=metadata # Added metadata
)
except Exception as e:
logger.error(f"Error in generate_response: {str(e)}")
raise
async def retrieve_context(
self,
query: str,
conversation_history: Optional[List[Dict]] = None,
top_k: int = 3
) -> Tuple[List[str], List[Dict], Optional[List[float]]]:
"""Retrieve context with both original and enhanced handling"""
try:
logger.info(f"Retrieving context for query: {query}")
# Enhance query using both managers
if conversation_history:
# Get the last two messages for immediate context
recent_messages = conversation_history[-2:]
# Extract queries and responses for context
context_parts = []
for msg in recent_messages:
if msg.get('query'):
context_parts.append(msg['query'])
if msg.get('response'):
response = msg['response']
if "Information about this is not available" not in response:
context_parts.append(response)
# Combine with current query
enhanced_query = f"{' '.join(context_parts)} {query}".strip()
logger.info(f"Enhanced query with history: {enhanced_query}")
else:
enhanced_query = query
# Debug log the enhanced query
logger.info(f"Final enhanced query: {enhanced_query}")
# Embed the enhanced query
query_embedding = self.embedding.embed_query(enhanced_query)
# Debug log embedding shape
logger.info(f"Query embedding shape: {len(query_embedding)}")
# Retrieve similar documents
results = self.vector_store.similarity_search(
query_embedding,
top_k=top_k
)
# Debug log search results
logger.info(f"Number of search results: {len(results)}")
for i, result in enumerate(results):
logger.info(f"Result {i} score: {result.get('score', 'N/A')}")
logger.info(f"Result {i} text preview: {result.get('text', '')[:100]}...")
if not results:
logger.info("No results found in similarity search")
return [], [], None
# Process results
documents = [doc['text'] for doc in results]
sources = [self._convert_metadata_to_strings(doc['metadata'])
for doc in results]
scores = [doc['score'] for doc in results
if doc.get('score') is not None]
# Return scores only if available for all documents
if len(scores) != len(documents):
scores = None
logger.info(f"Retrieved {len(documents)} relevant documents")
return documents, sources, scores
except Exception as e:
logger.error(f"Error in retrieve_context: {str(e)}")
raise
def _clean_response(self, response: str) -> str:
"""Clean response text while preserving key information"""
if not response:
return response
# Keep only the most common phrases to remove
phrases_to_remove = [
"Based on the context,",
"According to the documents,",
"From the information available,",
"Based on the provided information,",
"I apologize,"
]
cleaned_response = response
for phrase in phrases_to_remove:
cleaned_response = cleaned_response.replace(phrase, "").strip()
cleaned_response = " ".join(cleaned_response.split())
if not cleaned_response:
return response
if cleaned_response[0].islower():
cleaned_response = cleaned_response[0].upper() + cleaned_response[1:]
return cleaned_response
def _convert_metadata_to_strings(self, metadata: Dict) -> Dict:
"""Convert metadata values to strings"""
try:
return {
key: str(value) if isinstance(value, (int, float)) else value
for key, value in metadata.items()
}
except Exception as e:
logger.error(f"Error converting metadata: {str(e)}")
return metadata
def _handle_contact_query(self, query: str) -> str:
"""Handle contact/introduction queries"""
try:
name_start = query.find('name: "') + 7
name_end = query.find('"', name_start)
name = query[name_start:name_end] if name_start > 6 and name_end != -1 else "there"
is_returning = (
"An old user with name:" in query and
"wants support again" in query
)
return f"Welcome back {name}, How can I help you?" if is_returning else f"Welcome {name}, How can I help you?"
except Exception as e:
logger.error(f"Error handling contact query: {str(e)}")
return "Welcome, How can I help you?" |