Spaces:
Sleeping
Sleeping
import requests | |
import yaml | |
import os | |
import re | |
import asyncio | |
import aiohttp | |
import pandas as pd | |
from tqdm import tqdm | |
def get_audio_models(): | |
url = "https://raw.githubusercontent.com/huggingface/transformers/main/docs/source/en/_toctree.yml" | |
response = requests.get(url) | |
if response.status_code != 200: | |
print("Failed to fetch the YAML file") | |
return [] | |
toctree_content = yaml.safe_load(response.text) | |
for section in toctree_content: | |
if section.get('title') == 'API': | |
for subsection in section.get('sections', []): | |
if subsection.get('title') == 'Models': | |
for model_section in subsection.get('sections', []): | |
if model_section.get('title') == 'Audio models': | |
return [audio_model.get('local').split('/')[-1].lower().replace('-', '_') for audio_model in model_section.get('sections', []) if 'local' in audio_model] | |
return [] | |
def fetch_and_process_ci_results(job_id): | |
github_token = os.environ.get('GITHUB_TOKEN') | |
if not github_token: | |
raise ValueError("GitHub token not found in environment variables") | |
headers = { | |
"Authorization": f"token {github_token}", | |
"Accept": "application/vnd.github+json" | |
} | |
audio_models = get_audio_models() | |
non_tested_models = [ | |
"xls_r", | |
"speech_to_text_2", | |
"mctct", | |
"xlsr_wav2vec2", | |
"mms" | |
] | |
url = f"https://api.github.com/repos/huggingface/transformers/actions/runs/{job_id}/jobs" | |
audio_model_jobs = {audio_model: [] for audio_model in audio_models} | |
def process_jobs(jobs_data): | |
for job in jobs_data['jobs']: | |
if "Model CI" in job['name'] and "models" in job['name']: | |
match = re.search(r'models/([^/)]+)', job['name']) | |
if match: | |
model_name = match.group(1).lower() | |
if model_name in audio_model_jobs: | |
audio_model_jobs[model_name].append(job['id']) | |
async def fetch_and_process_jobs(session, url): | |
async with session.get(url, headers=headers) as response: | |
jobs_data = await response.json() | |
process_jobs(jobs_data) | |
return response.links.get('next', {}).get('url') | |
async def fetch_all_jobs(): | |
async with aiohttp.ClientSession() as session: | |
next_url = url | |
with tqdm(desc="Fetching jobs", unit="page") as pbar: | |
while next_url: | |
next_url = await fetch_and_process_jobs(session, next_url) | |
pbar.update(1) | |
def parse_test_results(text): | |
pattern = r'=+ (?:(\d+) failed,?\s*)?(?:(\d+) passed,?\s*)?(?:(\d+) skipped,?\s*)?(?:\d+ warnings?\s*)?in \d+\.\d+s' | |
match = re.search(pattern, text) | |
if match: | |
failed = int(match.group(1)) if match.group(1) else 0 | |
passed = int(match.group(2)) if match.group(2) else 0 | |
skipped = int(match.group(3)) if match.group(3) else 0 | |
return {'failed': failed, 'passed': passed, 'skipped': skipped} | |
raise Exception("Could not find test summary in logs") | |
def retrieve_job_logs(job_id, job_name): | |
url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}" | |
response = requests.get(url, headers=headers) | |
logs_url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}/logs" | |
logs_response = requests.get(logs_url, headers=headers) | |
logs = logs_response.text | |
test_summary = parse_test_results(logs) | |
test_summary["model"] = job_name | |
test_summary["conclusion"] = response.json()['conclusion'] | |
return test_summary | |
# Fetch initial jobs and run asynchronous job fetching | |
response = requests.get(url, headers=headers) | |
jobs = response.json() | |
process_jobs(jobs) | |
asyncio.run(fetch_all_jobs()) | |
# Retrieve job logs and process results | |
results = [] | |
for job_name, job_ids in tqdm(audio_model_jobs.items()): | |
for job_id in job_ids: | |
result = retrieve_job_logs(job_id, job_name) | |
results.append(result) | |
# Process results into DataFrame and save to CSV | |
df = (pd.DataFrame(results) | |
.melt(id_vars=['model', 'conclusion'], | |
value_vars=['failed', 'passed', 'skipped'], | |
var_name='test_type', | |
value_name='number_of_tests') | |
.groupby(['model', 'conclusion', 'test_type']) | |
.agg({'number_of_tests': 'sum'}) | |
.reset_index()) | |
df.to_csv('test_results_by_type.csv', index=False) |