G.E.N.I.EmarkII / webapp.py
nihalaninihal's picture
Update webapp.py
816f1c6 verified
# basic_webapp.py
import asyncio
import base64
import json
import os
import sys
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
import uvicorn
# Import the simplified AudioLoop
from handler import AudioLoop
app = FastAPI()
# Store active client connections
active_clients = {}
# Mount static files directory
current_dir = os.path.dirname(os.path.realpath(__file__))
app.mount("/static", StaticFiles(directory=current_dir), name="static")
@app.get("/")
async def get_index():
"""Serve the main HTML interface."""
index_path = os.path.join(current_dir, "index.html")
with open(index_path, "r", encoding="utf-8") as f:
html_content = f.read()
return HTMLResponse(content=html_content)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Handle WebSocket connections from clients."""
await websocket.accept()
print("[websocket_endpoint] Client connected.")
# Generate a unique client ID
client_id = f"client_{id(websocket)}"
# Create a new AudioLoop instance for this client
audio_loop = AudioLoop()
active_clients[client_id] = {
"websocket": websocket,
"audio_loop": audio_loop,
"repo_context": None
}
# Start the AudioLoop for this client
loop_task = asyncio.create_task(audio_loop.run())
print(f"[websocket_endpoint] Started AudioLoop for client {client_id}")
async def process_client_messages():
"""Handle messages from the client and forward to Gemini."""
try:
while True:
data = await websocket.receive_text()
msg = json.loads(data)
msg_type = msg.get("type", "")
if msg_type == "init":
# Store repository context info
active_clients[client_id]["repo_context"] = {
"repo_url": msg.get("repo_url", ""),
"github_token": msg.get("github_token", ""),
"user_type": msg.get("user_type", "coder"),
"response_detail": msg.get("response_detail", "normal")
}
print(f"[process_client_messages] Stored context for {client_id}: {msg.get('repo_url', '')}")
# Send confirmation
await websocket.send_text(json.dumps({
"type": "status",
"status": "initialized",
"message": "Ready to assist with this repository."
}))
elif msg_type == "audio":
# Forward audio data to Gemini
raw_pcm = base64.b64decode(msg["payload"])
forward_msg = {
"realtime_input": {
"media_chunks": [
{
"data": base64.b64encode(raw_pcm).decode(),
"mime_type": "audio/pcm"
}
]
}
}
await audio_loop.out_queue.put(forward_msg)
elif msg_type == "text":
# Process text query from client
user_text = msg.get("content", "")
# Add repository context if available
context = active_clients[client_id]["repo_context"]
if context and context["repo_url"]:
# Format context info for Gemini
context_text = (
f"The GitHub repository being discussed is: {context['repo_url']}\n"
f"User role: {context['user_type']}\n"
f"Preferred detail level: {context['response_detail']}\n\n"
f"Please consider this context when answering the following question:\n"
)
user_text = context_text + user_text
print(f"[process_client_messages] Sending text to Gemini: {user_text[:100]}...")
# Format message for Gemini
forward_msg = {
"client_content": {
"turn_complete": True,
"turns": [
{
"role": "user",
"parts": [
{"text": user_text}
]
}
]
}
}
await audio_loop.out_queue.put(forward_msg)
elif msg_type == "interrupt":
# For now, just acknowledge the interrupt
# This is a simple implementation because true interruption
# may require additional API support
print(f"[process_client_messages] Interrupt requested by {client_id}")
await websocket.send_text(json.dumps({
"type": "status",
"status": "interrupted",
"message": "Processing interrupted by user."
}))
else:
print(f"[process_client_messages] Unknown message type: {msg_type}")
except WebSocketDisconnect:
print(f"[process_client_messages] Client {client_id} disconnected")
cleanup_client(client_id, loop_task)
except Exception as e:
print(f"[process_client_messages] Error: {e}")
cleanup_client(client_id, loop_task)
async def forward_gemini_responses():
"""Read responses from Gemini and send them to the client."""
try:
while True:
# Check for audio data
try:
pcm_data = await asyncio.wait_for(audio_loop.audio_in_queue.get(), 0.5)
b64_pcm = base64.b64encode(pcm_data).decode()
# Send audio to client
out_msg = {
"type": "audio",
"payload": b64_pcm
}
print(f"[forward_gemini_responses] Sending audio chunk to client {client_id}")
await websocket.send_text(json.dumps(out_msg))
except asyncio.TimeoutError:
# No audio available, continue checking
pass
# We could add additional processing for text responses here
# if we had a separate queue for text content
except WebSocketDisconnect:
print(f"[forward_gemini_responses] Client {client_id} disconnected")
cleanup_client(client_id, loop_task)
except Exception as e:
print(f"[forward_gemini_responses] Error: {e}")
cleanup_client(client_id, loop_task)
def cleanup_client(client_id, task):
"""Clean up resources when a client disconnects."""
if client_id in active_clients:
client_data = active_clients[client_id]
# Stop the AudioLoop
if "audio_loop" in client_data:
client_data["audio_loop"].stop()
# Cancel the task if it's still running
if task and not task.done():
task.cancel()
# Remove from active clients
del active_clients[client_id]
print(f"[cleanup_client] Cleaned up resources for {client_id}")
# Run both tasks concurrently
try:
await asyncio.gather(
process_client_messages(),
forward_gemini_responses()
)
finally:
print(f"[websocket_endpoint] WebSocket handler finished for {client_id}")
cleanup_client(client_id, loop_task)
if __name__ == "__main__":
# Verify API key is present
if "GOOGLE_API_KEY" not in os.environ:
print("Error: GOOGLE_API_KEY environment variable not set")
print("Please set it with: export GOOGLE_API_KEY='your_api_key_here'")
sys.exit(1)
# Start the server
uvicorn.run("webapp:app", host="0.0.0.0", port=7860, reload=True)