import pandas as pd
import gradio as gr
import requests
import yaml
import os
import re
import asyncio
import aiohttp
import pandas as pd
from tqdm import tqdm
from get_last_ci_run import get_last_ci_run_id
def get_audio_models():
url = "https://raw.githubusercontent.com/huggingface/transformers/main/docs/source/en/_toctree.yml"
response = requests.get(url)
if response.status_code != 200:
print("Failed to fetch the YAML file")
return []
toctree_content = yaml.safe_load(response.text)
for section in toctree_content:
if section.get('title') == 'API':
for subsection in section.get('sections', []):
if subsection.get('title') == 'Models':
for model_section in subsection.get('sections', []):
if model_section.get('title') == 'Audio models':
return [audio_model.get('local').split('/')[-1].lower().replace('-', '_') for audio_model in model_section.get('sections', []) if 'local' in audio_model]
return []
def fetch_and_process_ci_results(job_id):
github_token = os.environ.get('GITHUB_TOKEN')
if not github_token:
raise ValueError("GitHub token not found in environment variables")
headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github+json"
}
audio_models = get_audio_models()
non_tested_models = [
"xls_r",
"speech_to_text_2",
"mctct",
"xlsr_wav2vec2",
"mms"
]
url = f"https://api.github.com/repos/huggingface/transformers/actions/runs/{job_id}/jobs"
audio_model_jobs = {audio_model: [] for audio_model in audio_models}
def process_jobs(jobs_data):
for job in jobs_data['jobs']:
if "Model CI" in job['name'] and "models" in job['name']:
match = re.search(r'models/([^/)]+)', job['name'])
if match:
model_name = match.group(1).lower()
if model_name in audio_model_jobs:
audio_model_jobs[model_name].append(job['id'])
async def fetch_and_process_jobs(session, url):
async with session.get(url, headers=headers) as response:
jobs_data = await response.json()
process_jobs(jobs_data)
return response.links.get('next', {}).get('url')
async def fetch_all_jobs():
async with aiohttp.ClientSession() as session:
next_url = url
with tqdm(desc="Fetching jobs", unit="page") as pbar:
while next_url:
next_url = await fetch_and_process_jobs(session, next_url)
pbar.update(1)
def parse_test_results(text):
pattern = r'=+ (?:(\d+) failed,?\s*)?(?:(\d+) passed,?\s*)?(?:(\d+) skipped,?\s*)?(?:\d+ warnings?\s*)?in \d+\.\d+s'
match = re.search(pattern, text)
if match:
failed = int(match.group(1)) if match.group(1) else 0
passed = int(match.group(2)) if match.group(2) else 0
skipped = int(match.group(3)) if match.group(3) else 0
return {'failed': failed, 'passed': passed, 'skipped': skipped}
raise Exception("Could not find test summary in logs")
def retrieve_job_logs(job_id, job_name):
url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}"
response = requests.get(url, headers=headers)
logs_url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}/logs"
logs_response = requests.get(logs_url, headers=headers)
logs = logs_response.text
test_summary = parse_test_results(logs)
test_summary["model"] = job_name
test_summary["conclusion"] = response.json()['conclusion']
return test_summary
# Fetch initial jobs and run asynchronous job fetching
response = requests.get(url, headers=headers)
jobs = response.json()
process_jobs(jobs)
asyncio.run(fetch_all_jobs())
# Retrieve job logs and process results
results = []
for job_name, job_ids in tqdm(audio_model_jobs.items()):
for job_id in job_ids:
result = retrieve_job_logs(job_id, job_name)
results.append(result)
# Process results into DataFrame and save to CSV
df = (pd.DataFrame(results)
.melt(id_vars=['model', 'conclusion'],
value_vars=['failed', 'passed', 'skipped'],
var_name='test_type',
value_name='number_of_tests')
.groupby(['model', 'conclusion', 'test_type'])
.agg({'number_of_tests': 'sum'})
.reset_index())
df.to_csv('test_results_by_type.csv', index=False)
def load_and_process_data():
# Load the CSV file
model_test_results = pd.read_csv('test_results_by_type.csv')
# Get models with failed tests and their failure counts
failed_models_counts = model_test_results[
(model_test_results['test_type'] == 'failed') &
(model_test_results['number_of_tests'] > 0)
].groupby('model')['number_of_tests'].first().to_dict()
# Add ❌ and failure count to model names that have failures, ✅ for passing models
model_test_results['model'] = model_test_results.apply(
lambda row: f"{row['model']} ❌ ({failed_models_counts[row['model']]})" if row['model'] in failed_models_counts else f"{row['model']} ✅",
axis=1
)
# Separate failed tests and other tests
failed_tests = model_test_results[model_test_results['test_type'] == 'failed'].sort_values('number_of_tests', ascending=False)
other_tests = model_test_results[model_test_results['test_type'] != 'failed']
# Concatenate the dataframes
model_test_results = pd.concat([failed_tests, other_tests])
# Sort models by success/failure and number of failed tests
model_order = model_test_results.sort_values(
by=['conclusion', 'test_type', 'number_of_tests'],
ascending=[True, False, False]
)['model'].unique().tolist()
return model_test_results, model_order, failed_models_counts
def create_bar_plot(model_test_results, model_order, failed_models_counts):
return gr.BarPlot(
model_test_results,
x="model",
y="number_of_tests", # Base layer
color="test_type", # Color by pass/fail status
color_map={"passed": "#008550", "skipped": "#F0B702", "failed": "#8B1710"},
title="Test Results by Model",
x_title=f"Models ({len(failed_models_counts)} failing / {len(model_order)} total)",
y_title="Number of Tests",
height=600,
width=1000,
x_label_angle=45, # Rotate x-axis labels by 45 degrees
x_order=model_order # Set custom order of x-axis
)
# Create the Gradio interface
with gr.Blocks() as results_viz:
gr.Markdown("# Test Results by Model")
model_test_results, model_order, failed_models_counts = load_and_process_data()
test_results_plot = create_bar_plot(model_test_results, model_order, failed_models_counts)
with gr.Row():
refresh_btn = gr.Button(
value="Refresh CI Results (~2mn)",
variant="primary"
)
refresh_status = gr.Textbox()
ci_link = gr.HTML()
def check_and_refresh():
# For now just return hardcoded ID
latest_ci_id = str(get_last_ci_run_id())
try:
with open("ci_id.txt", "r") as f:
current_ci_id = f.read().strip()
except FileNotFoundError:
current_ci_id = ""
if latest_ci_id == current_ci_id:
return "No new CI results available yet.", test_results_plot, f'Latest CI Run'
else:
fetch_and_process_ci_results(latest_ci_id)
with open("ci_id.txt", "w") as f:
f.write(latest_ci_id)
# Reload and reprocess the data
new_model_test_results, new_model_order, new_failed_models_counts = load_and_process_data()
# Create a new BarPlot with the updated data
new_test_results_plot = create_bar_plot(new_model_test_results, new_model_order, new_failed_models_counts)
return "CI results refreshed successfully!", new_test_results_plot, f'Latest CI Run'
refresh_btn.click(fn=check_and_refresh, outputs=[refresh_status, test_results_plot, ci_link])
if __name__ == "__main__":
results_viz.launch()