Alina Lozovskaya
Improve session management and require auth
50871c5
raw
history blame
11.1 kB
import asyncio
import os
import sys
import time
import gradio as gr
import uuid
from datasets import load_dataset
from huggingface_hub import whoami
from loguru import logger
from pathlib import Path
from yourbench_space.config import generate_and_save_config
from yourbench_space.utils import (
SubprocessManagerGroup,
save_files,
update_dataset,
STAGES,
is_running_locally
)
from yourbench_space.evaluation import create_eval_file, run_evaluations
from yourbench_space.leaderboard_space.env import HF_TOKEN
project_description = """
# YourBench 🚀
**Dynamic Benchmark Generation for Language Models**
Quickly create zero-shot benchmarks from your documents – keeping models accurate and adaptable
- 📖 [FAQ](#)
- 💻 [GitHub](https://github.com/huggingface/yourbench/tree/v0.2-alpha-space)
"""
logger.remove()
logger.add(sys.stderr, level="INFO")
# Global to store all managers per session
MANAGERS = SubprocessManagerGroup()
USER_ID_SESSION_MAP: dict[str, str] = dict()
docs_path = Path(__file__).parent / "docs.md"
citation_content = (
docs_path.read_text().split("# Citation")[-1].strip()
if docs_path.exists()
else "# Citation\n\nDocumentation file not found."
)
def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State):
manager = MANAGERS.get(session_state)
if manager is None: # should not be possible
return (
"❌ Config generation failed.",
gr.update(visible=False, interactive=False),
)
session_uid = session_state.value
config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path)
for _ in range(5):
time.sleep(0.5)
if config_path.exists():
return (
"✅ Config saved!",
gr.update(value=str(config_path), visible=True, interactive=True),
)
return (
"❌ Config generation failed.",
gr.update(visible=False, interactive=False),
)
final_dataset = None
def update_process_status(session_state: gr.State):
"""Update process status and include exit details if process has terminated"""
if session_state is None:
return gr.update(value=False, label="Not running")
manager = MANAGERS.get(session_state.value)
if manager is None:
return gr.update(value=False, label="Not running")
is_running = manager.is_running()
if not is_running:
exit_code, exit_reason = manager.get_exit_details()
status_text = f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}" if exit_reason else "Process Status: Stopped"
return gr.update(value=False, label=status_text)
return gr.update(value=True, label="Process Status: Running")
def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
if oauth_token is None and not is_running_locally():
gr.Warning('You need to log in to use this Space')
return
new_env = os.environ.copy()
if oauth_token:
new_env["HF_TOKEN"] = oauth_token.token
new_env["DATASET_PREFIX"] = hf_dataset_name
MANAGERS.start_process(session_uid, custom_env=new_env)
def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None):
if oauth_token is None:
return gr.Dropdown([], label="Organization")
try:
user_info = whoami(oauth_token.token)
org_names = [org["name"] for org in user_info.get("orgs", [])]
user_name = user_info.get("name", "Unknown User")
org_names.insert(0, user_name)
return gr.Dropdown(org_names, value=user_name, label="Organization")
except Exception as e:
return gr.Dropdown([], label="Organization")
def switch_to_run_generation_tab():
return gr.Tabs(selected=1)
def enable_button(files):
return gr.update(interactive=bool(files))
def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name):
# Test dataset existence
eval_ds_name = f"{org_name}/{eval_name}"
# Test dataset existence
try:
load_dataset(eval_ds_name, streaming=True, token=oauth_token.token)
except Exception as e:
print(f"Error while loading the dataset: {e}")
return
# Run evaluations
create_eval_file(eval_ds_name)
status = asyncio.run(run_evaluations(eval_ds_name=eval_ds_name, org=org_name))
# Create space
from huggingface_hub import HfApi
repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}"
api = HfApi()
try:
api.create_repo(repo_id=repo_id, repo_type="space", space_sdk="gradio", token=oauth_token.token)
api.upload_folder(repo_id=repo_id, repo_type="space", folder_path="src/", token=oauth_token.token)
api.add_space_secret(repo_id=repo_id, key="HF_TOKEN", value=oauth_token.token, token=oauth_token.token)
api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token)
api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token)
except Exception as e:
status = "Evaluation" + status + "\nLeaderboard creation:" + e
return status
def init_session(profile: gr.OAuthProfile | None):
"""Update session on load"""
if profile is None: # if we running localy or user is not logged in
local_uuid = "local" if is_running_locally() else str(uuid.uuid4())
else:
local_uuid = USER_ID_SESSION_MAP.get(profile.username, str(uuid.uuid4()))
manager = MANAGERS.get(local_uuid)
if manager and manager.is_running():
logger.info(f"Found existing, running session for {local_uuid}")
return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
if profile:
USER_ID_SESSION_MAP[profile.username] = local_uuid
MANAGERS.create(local_uuid)
logger.info(f"Started session for {local_uuid}")
return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
with gr.Blocks(theme=gr.themes.Default()) as app:
# We initialize the session state with the user randomly generated uuid
# Using uuid4 makes collision cases extremely unlikely even for concurrent users
session_state = gr.State()
gr.Markdown(project_description)
with gr.Tabs() as tabs:
with gr.Tab("Setup", id=0):
with gr.Row():
with gr.Accordion("Hugging Face Settings"):
login_btn = gr.LoginButton()
hf_org_dropdown = gr.Dropdown(
choices=[], label="Organization", allow_custom_value=True
)
app.load(
update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown
)
hf_dataset_name = gr.Textbox(
label="Dataset name",
value="yourbench",
info="Name of your new evaluation dataset",
)
with gr.Accordion("Upload documents"):
file_input = gr.File(
label="Upload text files",
file_count="multiple",
file_types=[".txt", ".md", ".html", ".pdf"],
)
output = gr.Textbox(label="Log")
file_input.upload(
save_files,
inputs=[session_state, file_input],
outputs = output,
)
preview_button = gr.Button("Generate New Config", interactive=False)
log_message = gr.Textbox(label="Log Message", visible=True)
download_button = gr.File(
label="Download Config", visible=False, interactive=False
)
file_input.change(enable_button, inputs=file_input, outputs=preview_button)
preview_button.click(
generate_and_return,
inputs=[hf_org_dropdown, hf_dataset_name, session_state],
outputs=[log_message, download_button],
)
preview_button.click(
switch_to_run_generation_tab,
inputs=None,
outputs=tabs,
)
with gr.Tab("Run Generation", id=1):
with gr.Row():
start_button = gr.Button("Start Task")
start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name])
stop_button = gr.Button("Stop Task")
stop_button.click(MANAGERS.stop_process, inputs=session_state)
kill_button = gr.Button("Kill Task")
kill_button.click(MANAGERS.kill_process, inputs=session_state)
with gr.Row():
with gr.Column():
with gr.Accordion("Log Output", open=True):
log_output = gr.Code(language=None, lines=20, interactive=False)
process_status = gr.Checkbox(label="Process Status", interactive=False)
status_timer = gr.Timer(2.0, active=True)
status_timer.tick(update_process_status, inputs=session_state, outputs=process_status)
with gr.Column():
with gr.Accordion("Stages", open=True):
stages_table = gr.CheckboxGroup(
choices=STAGES,
value=[],
label="Pipeline Stages Completed",
interactive=False,
)
with gr.Accordion("Ingestion"):
ingestion_df = gr.DataFrame()
with gr.Accordion("Summarization"):
summarization_df = gr.DataFrame()
with gr.Accordion("Single-Hop"):
single_hop = gr.DataFrame()
with gr.Accordion("Answer Generation"):
answers_df = gr.DataFrame()
stages_table.change(
update_dataset, inputs=[stages_table, hf_org_dropdown, hf_dataset_name], outputs=[ingestion_df, summarization_df, single_hop, answers_df]
)
# TODO: this timer should only be active when the second tab is passed to active for the first time
log_timer = gr.Timer(1.0, active=True)
log_timer.tick(
MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table]
)
with gr.Tab("Evaluate", id=2, visible=False):
with gr.Row():
btn_launch_evals = gr.Button("Launch evaluations")
status = gr.Textbox(label="Status")
btn_launch_evals.click(run_evaluation_pipeline, [hf_org_dropdown, hf_dataset_name], status)
app.load(init_session, outputs=session_state)
app.launch(allowed_paths=["/app"])