import os from queue import Queue import gradio as gr import argilla as rg from argilla.webhooks import webhook_listener client = rg.Argilla( api_url=os.getenv("ARGILLA_API_URL"), api_key=os.getenv("ARGILLA_API_KEY"), ) server = rg.get_webhook_server() incoming_events = Queue() countries = { "Argentina": { "iso": "ARG", "emoji": "๐Ÿ‡ฆ๐Ÿ‡ท" }, "Bolivia": { "iso": "BOL", "emoji": "๐Ÿ‡ง๐Ÿ‡ด" }, "Chile": { "iso": "CHL", "emoji": "๐Ÿ‡จ๐Ÿ‡ฑ" }, "Colombia": { "iso": "COL", "emoji": "๐Ÿ‡จ๐Ÿ‡ด" }, "Costa Rica": { "iso": "CRI", "emoji": "๐Ÿ‡จ๐Ÿ‡ท" }, "Cuba": { "iso": "CUB", "emoji": "๐Ÿ‡จ๐Ÿ‡บ" }, "Ecuador": { "iso": "ECU", "emoji": "๐Ÿ‡ช๐Ÿ‡จ" }, "El Salvador": { "iso": "SLV", "emoji": "๐Ÿ‡ธ๐Ÿ‡ป" }, "Espaรฑa": { "iso": "ESP", "emoji": "๐Ÿ‡ช๐Ÿ‡ธ" }, "Guatemala": { "iso": "GTM", "emoji": "๐Ÿ‡ฌ๐Ÿ‡น" }, "Honduras": { "iso": "HND", "emoji": "๐Ÿ‡ญ๐Ÿ‡ณ" }, "Mรฉxico": { "iso": "MEX", "emoji": "๐Ÿ‡ฒ๐Ÿ‡ฝ" }, "Nicaragua": { "iso": "NIC", "emoji": "๐Ÿ‡ณ๐Ÿ‡ฎ" }, "Panamรก": { "iso": "PAN", "emoji": "๐Ÿ‡ต๐Ÿ‡ฆ" }, "Paraguay": { "iso": "PRY", "emoji": "๐Ÿ‡ต๐Ÿ‡พ" }, "Perรบ": { "iso": "PER", "emoji": "๐Ÿ‡ต๐Ÿ‡ช" }, "Puerto Rico": { "iso": "PRI", "emoji": "๐Ÿ‡ต๐Ÿ‡ท" }, "Repรบblica Dominicana": { "iso": "DOM", "emoji": "๐Ÿ‡ฉ๐Ÿ‡ด" }, "Uruguay": { "iso": "URY", "emoji": "๐Ÿ‡บ๐Ÿ‡พ" }, "Venezuela": { "iso": "VEN", "emoji": "๐Ÿ‡ป๐Ÿ‡ช" } } @webhook_listener(events=["response.created"]) async def update_validation_space_on_answer(response, type, timestamp): """ Webhook listener that triggers when a new response is added to an answering space. It will automatically update the corresponding validation space with the new response. """ try: incoming_events.put({"event": type, "timestamp": str(timestamp)}) record = response.record dataset_name = record.dataset.name if not dataset_name.endswith("Responder"): print(f"Ignoring event from non-answering dataset: {dataset_name}") return # Not an answering space, ignore country = " ".join(dataset_name.split("-").split(" ")[1:]) print(f"Processing response for country: {country}") iso = countries[country]["iso"] emoji = countries[country]["emoji"] validation_dataset_name = f"{emoji} {country} - {iso} - Responder" try: validation_dataset = client.datasets(validation_dataset_name) print(f"Found validation dataset: {validation_dataset_name}") except Exception as e: print(f"Error connecting to validation dataset: {e}") response_dict = response.to_dict() answer_1 = response_dict["values"]['answer_1']['value'] answer_2 = response_dict["values"]['answer_2']['value'] answer_3 = response_dict["values"]['answer_3']['value'] original_user_id = str(response.user_id) new_records = [] for answer in [answer_1, answer_2, answer_3]: if answer: validation_record = { "question": record.fields["question"], "answer": answer, "metadata": { "original_responder_id": original_user_id, "original_dataset": dataset_name } } new_records.append(answer) # Add the record to the validation space validation_dataset.records.log(records=new_records) print(f"Added new response to validation space for {country}") except Exception as e: print(f"Error in webhook handler: {e}") # Store the error in the queue for display incoming_events.put({"event": "error", "error": str(e)}) # Function to read the next event from the queue def read_next_event(): if not incoming_events.empty(): return incoming_events.get() return {} # Create Gradio interface with gr.Blocks() as demo: argilla_server = client.http_client.base_url gr.Markdown("## Argilla Webhooks - Validation Space Updater") gr.Markdown(f""" This application listens for new responses in Argilla answering spaces and automatically adds them to the corresponding validation spaces. Connected to Argilla server: {argilla_server} The webhook listens for: - `response.created` events from datasets ending with `_responder_preguntas` You can view the incoming events and any errors in the JSON component below. """) json_component = gr.JSON(label="Incoming events and errors:", value={}) gr.Timer(1, active=True).tick(read_next_event, outputs=json_component) # Mount the Gradio app to the FastAPI server gr.mount_gradio_app(server, demo, path="/") # Start the FastAPI server if __name__ == "__main__": import uvicorn uvicorn.run(server, host="0.0.0.0", port=7860)