eustlb's picture
eustlb HF Staff
update
5923f2b
raw
history blame
8.46 kB
import pandas as pd
import gradio as gr
import requests
import yaml
import os
import re
import asyncio
import aiohttp
import pandas as pd
from tqdm import tqdm
def get_audio_models():
url = "https://raw.githubusercontent.com/huggingface/transformers/main/docs/source/en/_toctree.yml"
response = requests.get(url)
if response.status_code != 200:
print("Failed to fetch the YAML file")
return []
toctree_content = yaml.safe_load(response.text)
for section in toctree_content:
if section.get('title') == 'API':
for subsection in section.get('sections', []):
if subsection.get('title') == 'Models':
for model_section in subsection.get('sections', []):
if model_section.get('title') == 'Audio models':
return [audio_model.get('local').split('/')[-1].lower().replace('-', '_') for audio_model in model_section.get('sections', []) if 'local' in audio_model]
return []
def fetch_and_process_ci_results(job_id):
github_token = os.environ.get('GITHUB_TOKEN')
if not github_token:
raise ValueError("GitHub token not found in environment variables")
headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github+json"
}
audio_models = get_audio_models()
non_tested_models = [
"xls_r",
"speech_to_text_2",
"mctct",
"xlsr_wav2vec2",
"mms"
]
url = f"https://api.github.com/repos/huggingface/transformers/actions/runs/{job_id}/jobs"
audio_model_jobs = {audio_model: [] for audio_model in audio_models}
def process_jobs(jobs_data):
for job in jobs_data['jobs']:
if "Model CI" in job['name'] and "models" in job['name']:
match = re.search(r'models/([^/)]+)', job['name'])
if match:
model_name = match.group(1).lower()
if model_name in audio_model_jobs:
audio_model_jobs[model_name].append(job['id'])
async def fetch_and_process_jobs(session, url):
async with session.get(url, headers=headers) as response:
jobs_data = await response.json()
process_jobs(jobs_data)
return response.links.get('next', {}).get('url')
async def fetch_all_jobs():
async with aiohttp.ClientSession() as session:
next_url = url
with tqdm(desc="Fetching jobs", unit="page") as pbar:
while next_url:
next_url = await fetch_and_process_jobs(session, next_url)
pbar.update(1)
def parse_test_results(text):
pattern = r'=+ (?:(\d+) failed,?\s*)?(?:(\d+) passed,?\s*)?(?:(\d+) skipped,?\s*)?(?:\d+ warnings?\s*)?in \d+\.\d+s'
match = re.search(pattern, text)
if match:
failed = int(match.group(1)) if match.group(1) else 0
passed = int(match.group(2)) if match.group(2) else 0
skipped = int(match.group(3)) if match.group(3) else 0
return {'failed': failed, 'passed': passed, 'skipped': skipped}
raise Exception("Could not find test summary in logs")
def retrieve_job_logs(job_id, job_name):
url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}"
response = requests.get(url, headers=headers)
logs_url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}/logs"
logs_response = requests.get(logs_url, headers=headers)
logs = logs_response.text
test_summary = parse_test_results(logs)
test_summary["model"] = job_name
test_summary["conclusion"] = response.json()['conclusion']
return test_summary
# Fetch initial jobs and run asynchronous job fetching
response = requests.get(url, headers=headers)
jobs = response.json()
process_jobs(jobs)
asyncio.run(fetch_all_jobs())
# Retrieve job logs and process results
results = []
for job_name, job_ids in tqdm(audio_model_jobs.items()):
for job_id in job_ids:
result = retrieve_job_logs(job_id, job_name)
results.append(result)
# Process results into DataFrame and save to CSV
df = (pd.DataFrame(results)
.melt(id_vars=['model', 'conclusion'],
value_vars=['failed', 'passed', 'skipped'],
var_name='test_type',
value_name='number_of_tests')
.groupby(['model', 'conclusion', 'test_type'])
.agg({'number_of_tests': 'sum'})
.reset_index())
df.to_csv('test_results_by_type.csv', index=False)
def load_and_process_data():
# Load the CSV file
model_test_results = pd.read_csv('test_results_by_type.csv')
# Get models with failed tests and their failure counts
failed_models_counts = model_test_results[
(model_test_results['test_type'] == 'failed') &
(model_test_results['number_of_tests'] > 0)
].groupby('model')['number_of_tests'].first().to_dict()
# Add ❌ and failure count to model names that have failures, βœ… for passing models
model_test_results['model'] = model_test_results.apply(
lambda row: f"{row['model']} ❌ ({failed_models_counts[row['model']]})" if row['model'] in failed_models_counts else f"{row['model']} βœ…",
axis=1
)
# Separate failed tests and other tests
failed_tests = model_test_results[model_test_results['test_type'] == 'failed'].sort_values('number_of_tests', ascending=False)
other_tests = model_test_results[model_test_results['test_type'] != 'failed']
# Concatenate the dataframes
model_test_results = pd.concat([failed_tests, other_tests])
# Sort models by success/failure and number of failed tests
model_order = model_test_results.sort_values(
by=['conclusion', 'test_type', 'number_of_tests'],
ascending=[True, False, False]
)['model'].unique().tolist()
return model_test_results, model_order, failed_models_counts
def create_bar_plot(model_test_results, model_order, failed_models_counts):
return gr.BarPlot(
model_test_results,
x="model",
y="number_of_tests", # Base layer
color="test_type", # Color by pass/fail status
color_map={"passed": "#008550", "skipped": "#F0B702", "failed": "#8B1710"},
title="Test Results by Model",
x_title=f"Models ({len(failed_models_counts)} failing / {len(model_order)} total)",
y_title="Number of Tests",
height=600,
width=1000,
x_label_angle=45, # Rotate x-axis labels by 45 degrees
x_order=model_order # Set custom order of x-axis
)
# Create the Gradio interface
with gr.Blocks() as results_viz:
gr.Markdown("# Test Results by Model")
model_test_results, model_order, failed_models_counts = load_and_process_data()
test_results_plot = create_bar_plot(model_test_results, model_order, failed_models_counts)
with gr.Row():
refresh_btn = gr.Button(
value="Refresh CI Results (~2mn)",
variant="primary"
)
refresh_status = gr.Textbox()
def check_and_refresh():
# For now just return hardcoded ID
latest_ci_id = "15549432276"
try:
with open("ci_id.txt", "r") as f:
current_ci_id = f.read().strip()
except FileNotFoundError:
current_ci_id = ""
if latest_ci_id == current_ci_id:
return "No new CI results available yet.", test_results_plot
else:
fetch_and_process_ci_results(latest_ci_id)
with open("ci_id.txt", "w") as f:
f.write(latest_ci_id)
# Reload and reprocess the data
new_model_test_results, new_model_order, new_failed_models_counts = load_and_process_data()
# Create a new BarPlot with the updated data
new_test_results_plot = create_bar_plot(new_model_test_results, new_model_order, new_failed_models_counts)
return "CI results refreshed successfully!", new_test_results_plot
refresh_btn.click(fn=check_and_refresh, outputs=[refresh_status, test_results_plot])
if __name__ == "__main__":
results_viz.launch()