import pandas as pd import gradio as gr import requests import yaml import os import re import asyncio import aiohttp import pandas as pd from tqdm import tqdm from get_last_ci_run import get_last_ci_run_id def get_audio_models(): url = "https://raw.githubusercontent.com/huggingface/transformers/main/docs/source/en/_toctree.yml" response = requests.get(url) if response.status_code != 200: print("Failed to fetch the YAML file") return [] toctree_content = yaml.safe_load(response.text) for section in toctree_content: if section.get('title') == 'API': for subsection in section.get('sections', []): if subsection.get('title') == 'Models': for model_section in subsection.get('sections', []): if model_section.get('title') == 'Audio models': return [audio_model.get('local').split('/')[-1].lower().replace('-', '_') for audio_model in model_section.get('sections', []) if 'local' in audio_model] return [] def fetch_and_process_ci_results(job_id): github_token = os.environ.get('GITHUB_TOKEN') if not github_token: raise ValueError("GitHub token not found in environment variables") headers = { "Authorization": f"token {github_token}", "Accept": "application/vnd.github+json" } audio_models = get_audio_models() non_tested_models = [ "xls_r", "speech_to_text_2", "mctct", "xlsr_wav2vec2", "mms" ] url = f"https://api.github.com/repos/huggingface/transformers/actions/runs/{job_id}/jobs" audio_model_jobs = {audio_model: [] for audio_model in audio_models} def process_jobs(jobs_data): for job in jobs_data['jobs']: if "Model CI" in job['name'] and "models" in job['name']: match = re.search(r'models/([^/)]+)', job['name']) if match: model_name = match.group(1).lower() if model_name in audio_model_jobs: audio_model_jobs[model_name].append(job['id']) async def fetch_and_process_jobs(session, url): async with session.get(url, headers=headers) as response: jobs_data = await response.json() process_jobs(jobs_data) return response.links.get('next', {}).get('url') async def fetch_all_jobs(): async with aiohttp.ClientSession() as session: next_url = url with tqdm(desc="Fetching jobs", unit="page") as pbar: while next_url: next_url = await fetch_and_process_jobs(session, next_url) pbar.update(1) def parse_test_results(text): pattern = r'=+ (?:(\d+) failed,?\s*)?(?:(\d+) passed,?\s*)?(?:(\d+) skipped,?\s*)?(?:\d+ warnings?\s*)?in \d+\.\d+s' match = re.search(pattern, text) if match: failed = int(match.group(1)) if match.group(1) else 0 passed = int(match.group(2)) if match.group(2) else 0 skipped = int(match.group(3)) if match.group(3) else 0 return {'failed': failed, 'passed': passed, 'skipped': skipped} raise Exception("Could not find test summary in logs") def retrieve_job_logs(job_id, job_name): url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}" response = requests.get(url, headers=headers) logs_url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}/logs" logs_response = requests.get(logs_url, headers=headers) logs = logs_response.text test_summary = parse_test_results(logs) test_summary["model"] = job_name test_summary["conclusion"] = response.json()['conclusion'] return test_summary # Fetch initial jobs and run asynchronous job fetching response = requests.get(url, headers=headers) jobs = response.json() process_jobs(jobs) asyncio.run(fetch_all_jobs()) # Retrieve job logs and process results results = [] for job_name, job_ids in tqdm(audio_model_jobs.items()): for job_id in job_ids: result = retrieve_job_logs(job_id, job_name) results.append(result) # Process results into DataFrame and save to CSV df = (pd.DataFrame(results) .melt(id_vars=['model', 'conclusion'], value_vars=['failed', 'passed', 'skipped'], var_name='test_type', value_name='number_of_tests') .groupby(['model', 'conclusion', 'test_type']) .agg({'number_of_tests': 'sum'}) .reset_index()) df.to_csv('test_results_by_type.csv', index=False) def load_and_process_data(): # Load the CSV file model_test_results = pd.read_csv('test_results_by_type.csv') # Get models with failed tests and their failure counts failed_models_counts = model_test_results[ (model_test_results['test_type'] == 'failed') & (model_test_results['number_of_tests'] > 0) ].groupby('model')['number_of_tests'].first().to_dict() # Add ❌ and failure count to model names that have failures, ✅ for passing models model_test_results['model'] = model_test_results.apply( lambda row: f"{row['model']} ❌ ({failed_models_counts[row['model']]})" if row['model'] in failed_models_counts else f"{row['model']} ✅", axis=1 ) # Separate failed tests and other tests failed_tests = model_test_results[model_test_results['test_type'] == 'failed'].sort_values('number_of_tests', ascending=False) other_tests = model_test_results[model_test_results['test_type'] != 'failed'] # Concatenate the dataframes model_test_results = pd.concat([failed_tests, other_tests]) # Sort models by success/failure and number of failed tests model_order = model_test_results.sort_values( by=['conclusion', 'test_type', 'number_of_tests'], ascending=[True, False, False] )['model'].unique().tolist() return model_test_results, model_order, failed_models_counts def create_bar_plot(model_test_results, model_order, failed_models_counts): return gr.BarPlot( model_test_results, x="model", y="number_of_tests", # Base layer color="test_type", # Color by pass/fail status color_map={"passed": "#008550", "skipped": "#F0B702", "failed": "#8B1710"}, title="Test Results by Model", x_title=f"Models ({len(failed_models_counts)} failing / {len(model_order)} total)", y_title="Number of Tests", height=600, width=1000, x_label_angle=45, # Rotate x-axis labels by 45 degrees x_order=model_order # Set custom order of x-axis ) # Create the Gradio interface with gr.Blocks() as results_viz: gr.Markdown("# Test Results by Model") model_test_results, model_order, failed_models_counts = load_and_process_data() test_results_plot = create_bar_plot(model_test_results, model_order, failed_models_counts) with gr.Row(): refresh_btn = gr.Button( value="Refresh CI Results (~2mn)", variant="primary" ) refresh_status = gr.Textbox() ci_link = gr.HTML() def check_and_refresh(): # For now just return hardcoded ID latest_ci_id = str(get_last_ci_run_id()) try: with open("ci_id.txt", "r") as f: current_ci_id = f.read().strip() except FileNotFoundError: current_ci_id = "" if latest_ci_id == current_ci_id: return "No new CI results available yet.", test_results_plot, f'Latest CI Run' else: fetch_and_process_ci_results(latest_ci_id) with open("ci_id.txt", "w") as f: f.write(latest_ci_id) # Reload and reprocess the data new_model_test_results, new_model_order, new_failed_models_counts = load_and_process_data() # Create a new BarPlot with the updated data new_test_results_plot = create_bar_plot(new_model_test_results, new_model_order, new_failed_models_counts) return "CI results refreshed successfully!", new_test_results_plot, f'Latest CI Run' refresh_btn.click(fn=check_and_refresh, outputs=[refresh_status, test_results_plot, ci_link]) if __name__ == "__main__": results_viz.launch()