import os from queue import Queue import gradio as gr import argilla as rg from argilla.webhooks import webhook_listener # Initialize Argilla client client = rg.Argilla( api_url=os.getenv("ARGILLA_API_URL"), api_key=os.getenv("ARGILLA_API_KEY"), ) # Get the webhook server server = rg.get_webhook_server() # Queue to store events for display incoming_events = Queue() # Set up the webhook listener for response creation @webhook_listener(events=["response.created"]) async def update_validation_space_on_answer(response, type, timestamp): """ Webhook listener that triggers when a new response is added to an answering space. It will automatically update the corresponding validation space with the new response. """ try: # Store the event for display in the UI incoming_events.put({"event": type, "timestamp": str(timestamp)}) # Get the record from the response record = response.record # Check if this is from an answering space dataset_name = record.dataset.name if not dataset_name.endswith("_responder_preguntas"): print(f"Ignoring event from non-answering dataset: {dataset_name}") return # Not an answering space, ignore # Extract the country from the dataset name country = dataset_name.replace("_responder_preguntas", "") print(f"Processing response for country: {country}") # Connect to the validation space validation_dataset_name = f"{country}_validar_respuestas" try: validation_dataset = client.datasets(validation_dataset_name) print(f"Found validation dataset: {validation_dataset_name}") except Exception as e: print(f"Error connecting to validation dataset: {e}") # You would need to import the create_validation_space function from build_space import create_validation_space validation_dataset = create_validation_space(country) print(response) response_dict = response.to_dict() print("response dict", response_dict) answer = response_dict["values"]['text']['value'] # if hasattr(response, 'values') and 'text' in response.values: # if isinstance(response.values['text'], dict) and 'value' in response.values['text']: # response_value = response.values['text']['value'] # else: # response_value = str(response.values['text']) # Create a validation record with the correct attribute # Instead of response.value, we need to find the correct attribute # Based on the error, let's try to get the value differently validation_record = { "question": record.fields["question"], "answer": answer, } # Add the record to the validation space validation_dataset.records.log(records=[validation_record]) print(f"Added new response to validation space for {country}") except Exception as e: print(f"Error in webhook handler: {e}") # Store the error in the queue for display incoming_events.put({"event": "error", "error": str(e)}) # Function to read the next event from the queue def read_next_event(): if not incoming_events.empty(): return incoming_events.get() return {} # Create Gradio interface with gr.Blocks() as demo: argilla_server = client.http_client.base_url gr.Markdown("## Argilla Webhooks - Validation Space Updater") gr.Markdown(f""" This application listens for new responses in Argilla answering spaces and automatically adds them to the corresponding validation spaces. Connected to Argilla server: {argilla_server} The webhook listens for: - `response.created` events from datasets ending with `_responder_preguntas` You can view the incoming events and any errors in the JSON component below. """) json_component = gr.JSON(label="Incoming events and errors:", value={}) gr.Timer(1, active=True).tick(read_next_event, outputs=json_component) # Mount the Gradio app to the FastAPI server gr.mount_gradio_app(server, demo, path="/") # Start the FastAPI server if __name__ == "__main__": import uvicorn uvicorn.run(server, host="0.0.0.0", port=7860)