vargha commited on
Commit
8dcb829
Β·
1 Parent(s): ebf7d39

auxiliray scripts for dataset managements

Browse files
scripts/calculate_annotator_audio_minutes.py ADDED
@@ -0,0 +1,249 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Script to calculate total minutes of audio data assigned to each annotator.
4
+
5
+ This script queries the database to find all audio files assigned to each annotator
6
+ through AnnotationInterval ranges, loads the actual audio files to calculate their
7
+ durations, and reports the total minutes per annotator.
8
+ """
9
+
10
+ import argparse
11
+ import sys
12
+ import os
13
+ import time
14
+ from typing import Dict, List, Tuple
15
+ from sqlalchemy import and_
16
+ from sqlalchemy.exc import OperationalError
17
+
18
+ # Add project root to Python path
19
+ project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
20
+ if project_root not in sys.path:
21
+ sys.path.insert(0, project_root)
22
+
23
+ from utils.database import get_db, get_db_readonly
24
+ from utils.cloud_server_audio_loader import CloudServerAudioLoader
25
+ from data.models import Annotator, AnnotationInterval, TTSData
26
+ from utils.logger import Logger
27
+ from utils.sentry_integration import capture_custom_event
28
+ import sentry_sdk
29
+ from config import conf
30
+
31
+ log = Logger()
32
+
33
+ def get_assigned_tts_data_for_annotator(db, annotator_id: int) -> List[TTSData]:
34
+ """
35
+ Get all TTSData items assigned to a specific annotator through AnnotationInterval ranges.
36
+
37
+ Args:
38
+ db: Database session
39
+ annotator_id: ID of the annotator
40
+
41
+ Returns:
42
+ List of TTSData objects assigned to the annotator
43
+ """
44
+ max_retries = 3
45
+ retry_delay = 5 # seconds
46
+
47
+ for attempt in range(max_retries):
48
+ try:
49
+ # Get all annotation intervals for this annotator
50
+ intervals = db.query(AnnotationInterval).filter(
51
+ AnnotationInterval.annotator_id == annotator_id
52
+ ).all()
53
+
54
+ if not intervals:
55
+ return []
56
+
57
+ # Collect all TTSData IDs within the assigned ranges
58
+ assigned_tts_data = []
59
+ for interval in intervals:
60
+ if interval.start_index is not None and interval.end_index is not None:
61
+ tts_data_in_range = db.query(TTSData).filter(
62
+ and_(
63
+ TTSData.id >= interval.start_index,
64
+ TTSData.id <= interval.end_index
65
+ )
66
+ ).all()
67
+ assigned_tts_data.extend(tts_data_in_range)
68
+
69
+ return assigned_tts_data
70
+
71
+ except OperationalError as e:
72
+ if "Lost connection to MySQL server" in str(e) and attempt < max_retries - 1:
73
+ log.warning(f"Database connection lost, retrying in {retry_delay} seconds... (attempt {attempt + 1}/{max_retries})")
74
+ time.sleep(retry_delay)
75
+ # Refresh the database session
76
+ db.rollback()
77
+ continue
78
+ else:
79
+ raise
80
+
81
+ def calculate_audio_duration_seconds(filename: str, loader: CloudServerAudioLoader) -> float:
82
+ """
83
+ Calculate the duration of an audio file in seconds.
84
+
85
+ Args:
86
+ filename: Name of the audio file
87
+ loader: CloudServerAudioLoader instance
88
+
89
+ Returns:
90
+ Duration in seconds, or 0.0 if file cannot be loaded
91
+ """
92
+ try:
93
+ sample_rate, samples = loader.load_audio(filename)
94
+ # Calculate duration in seconds
95
+ if samples.ndim == 1:
96
+ # Mono audio
97
+ duration_seconds = len(samples) / sample_rate
98
+ else:
99
+ # Multi-channel audio - use length of first channel
100
+ duration_seconds = samples.shape[0] / sample_rate
101
+
102
+ return duration_seconds
103
+ except Exception as e:
104
+ log.warning(f"Failed to load audio file '{filename}': {e}")
105
+ sentry_sdk.capture_exception(e, extra={
106
+ 'operation': 'calculate_audio_duration',
107
+ 'filename': filename
108
+ })
109
+ return 0.0
110
+
111
+ def calculate_annotator_audio_minutes(annotator_name: str = None):
112
+ """
113
+ Calculate and report the total minutes of audio assigned to each annotator.
114
+
115
+ Args:
116
+ annotator_name: Optional name of specific annotator to calculate for
117
+ """
118
+ try:
119
+ # Initialize audio loader
120
+ loader = CloudServerAudioLoader(conf.FTP_URL)
121
+
122
+ # First, get the annotators list with a fresh connection
123
+ annotator_data = []
124
+ with get_db_readonly() as db:
125
+ # Get annotators based on filter
126
+ if annotator_name:
127
+ annotators = db.query(Annotator).filter(
128
+ Annotator.is_active == True,
129
+ Annotator.name == annotator_name
130
+ ).all()
131
+ if not annotators:
132
+ log.error(f"No active annotator found with name: {annotator_name}")
133
+ return
134
+ else:
135
+ annotators = db.query(Annotator).filter(Annotator.is_active == True).all()
136
+
137
+ # Extract the data we need before the session closes
138
+ annotator_data = [(ann.id, ann.name) for ann in annotators]
139
+
140
+ if not annotator_data:
141
+ log.info("No active annotators found.")
142
+ return
143
+
144
+ log.info("--- Annotator Audio Duration Report ---")
145
+ log.info("Calculating total minutes of assigned audio per annotator...")
146
+ log.info("")
147
+
148
+ total_annotators = len(annotator_data)
149
+ annotator_results = []
150
+
151
+ for idx, (annotator_id, annotator_name) in enumerate(annotator_data, 1):
152
+ log.info(f"Processing annotator {idx}/{total_annotators}: {annotator_name} (ID: {annotator_id})")
153
+
154
+ # Get assigned TTSData for this annotator with a fresh connection
155
+ assigned_tts_data = []
156
+ with get_db_readonly() as db:
157
+ assigned_tts_data = get_assigned_tts_data_for_annotator(db, annotator_id)
158
+
159
+ if not assigned_tts_data:
160
+ log.info(f" No audio files assigned to {annotator_name}")
161
+ annotator_results.append((annotator_name, 0, 0.0))
162
+ continue
163
+
164
+ total_duration_seconds = 0.0
165
+ successful_files = 0
166
+ failed_files = 0
167
+
168
+ log.info(f" Calculating duration for {len(assigned_tts_data)} assigned audio files...")
169
+
170
+ # Calculate duration for each assigned audio file
171
+ for tts_data in assigned_tts_data:
172
+ duration = calculate_audio_duration_seconds(tts_data.filename, loader)
173
+ if duration > 0:
174
+ total_duration_seconds += duration
175
+ successful_files += 1
176
+ else:
177
+ failed_files += 1
178
+
179
+ total_minutes = total_duration_seconds / 60.0
180
+
181
+ log.info(f" Successfully processed: {successful_files} files")
182
+ if failed_files > 0:
183
+ log.warning(f" Failed to process: {failed_files} files")
184
+ log.info(f" Total duration: {total_duration_seconds:.2f} seconds ({total_minutes:.2f} minutes)")
185
+
186
+ annotator_results.append((annotator_name, len(assigned_tts_data), total_minutes))
187
+ log.info("")
188
+
189
+ # Print summary report
190
+ log.info("=" * 60)
191
+ log.info("SUMMARY REPORT")
192
+ log.info("=" * 60)
193
+ log.info(f"{'Annotator':<20} {'Files':<8} {'Minutes':<12} {'Hours':<8}")
194
+ log.info("-" * 60)
195
+
196
+ total_files = 0
197
+ total_minutes = 0.0
198
+
199
+ for annotator_name, file_count, minutes in annotator_results:
200
+ hours = minutes / 60.0
201
+ log.info(f"{annotator_name:<20} {file_count:<8} {minutes:<12.2f} {hours:<8.2f}")
202
+ total_files += file_count
203
+ total_minutes += minutes
204
+
205
+ log.info("-" * 60)
206
+ total_hours = total_minutes / 60.0
207
+ log.info(f"{'TOTAL':<20} {total_files:<8} {total_minutes:<12.2f} {total_hours:<8.2f}")
208
+ log.info("=" * 60)
209
+
210
+ # Capture analytics event
211
+ capture_custom_event(
212
+ 'annotator_audio_calculation_completed',
213
+ {
214
+ 'total_annotators': total_annotators,
215
+ 'total_files_processed': total_files,
216
+ 'total_minutes': total_minutes,
217
+ 'total_hours': total_hours
218
+ }
219
+ )
220
+
221
+ except Exception as e:
222
+ log.error(f"Failed to calculate annotator audio minutes: {e}")
223
+ sentry_sdk.capture_exception(e, extra={
224
+ 'operation': 'calculate_annotator_audio_minutes'
225
+ })
226
+ raise
227
+
228
+ def main():
229
+ """Main entry point for the script."""
230
+ parser = argparse.ArgumentParser(
231
+ description="Calculate total minutes of audio data assigned to each annotator"
232
+ )
233
+ parser.add_argument(
234
+ '--annotator',
235
+ type=str,
236
+ help="Calculate for a specific annotator by name (optional, calculates for all if not specified)"
237
+ )
238
+
239
+ args = parser.parse_args()
240
+
241
+ if args.annotator:
242
+ log.info(f"Calculating audio minutes for annotator: {args.annotator}")
243
+ calculate_annotator_audio_minutes(args.annotator)
244
+ else:
245
+ log.info("Calculating audio minutes for all annotators")
246
+ calculate_annotator_audio_minutes()
247
+
248
+ if __name__ == "__main__":
249
+ main()
scripts/export_approved_datasets.py ADDED
@@ -0,0 +1,659 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Optimized TTS Data Export to Hugging Face
4
+ This script exports approved TTS annotations directly from the database to Hugging Face.
5
+ Features:
6
+ - Local caching for audio files to avoid re-downloading
7
+ - Batch processing to handle large datasets without memory issues
8
+ - Resume capability for interrupted uploads
9
+ - Better error handling and retry mechanisms
10
+ - HuggingFace best practices for large dataset uploads
11
+ """
12
+
13
+ import os
14
+ import sys
15
+ import json
16
+ import hashlib
17
+ import time
18
+ import shutil
19
+ from pathlib import Path
20
+ from concurrent.futures import ThreadPoolExecutor, as_completed
21
+ from typing import List, Dict, Optional, Tuple
22
+ import pymysql
23
+ import requests
24
+ import pandas as pd
25
+ from huggingface_hub import HfApi, login
26
+ from datasets import Dataset, Audio, Features, Value
27
+ import librosa
28
+ import numpy as np
29
+ from tqdm import tqdm
30
+
31
+ # Configuration
32
+ TARGET_REPO = "navidved/approved-tts-dataset"
33
+ SPEAKER_NAME = "ali_bandari"
34
+ BATCH_SIZE = 100 # Process annotations in batches
35
+ CACHE_DIR = "./audio_cache" # Local cache directory
36
+ TEMP_DIR = "./temp_dataset" # Temporary directory for dataset preparation
37
+ MAX_WORKERS = 4 # Concurrent downloads
38
+ MAX_RETRIES = 3 # Max retries for failed downloads
39
+
40
+ # Memory optimization settings
41
+ OPTIMIZE_MEMORY = True # Enable memory optimizations
42
+ TARGET_SAMPLE_RATE = 22050 # Reduce sample rate to save memory (None to keep original)
43
+ AUDIO_DTYPE = 'int16' # Use int16 instead of float32 to halve memory usage
44
+ USE_GENERATOR = True # Use generator-based dataset creation (recommended for large datasets)
45
+
46
+ # Database configuration (edit these if needed)
47
+ DB_CONFIG = {
48
+ 'host': 'annotation-db.apps.teh2.abrhapaas.com',
49
+ 'port': 32107,
50
+ 'user': os.getenv('DB_USER', 'navid'),
51
+ 'password': os.getenv('DB_PASSWORD', 'ZUJSK!1V!PF4ZEnIaylX'),
52
+ 'database': os.getenv('DB_NAME', 'tts'),
53
+ 'charset': 'utf8mb4'
54
+ }
55
+
56
+ # Audio server base URL
57
+ AUDIO_BASE_URL = "http://hubbit.ir/hf_dataset/tts"
58
+
59
+ class CacheManager:
60
+ """Handles local caching of audio files"""
61
+
62
+ def __init__(self, cache_dir: str):
63
+ self.cache_dir = Path(cache_dir)
64
+ self.cache_dir.mkdir(exist_ok=True)
65
+ self.index_file = self.cache_dir / "cache_index.json"
66
+ self.index = self._load_index()
67
+
68
+ def _load_index(self) -> Dict:
69
+ """Load cache index from disk"""
70
+ if self.index_file.exists():
71
+ try:
72
+ with open(self.index_file, 'r') as f:
73
+ return json.load(f)
74
+ except:
75
+ return {}
76
+ return {}
77
+
78
+ def _save_index(self):
79
+ """Save cache index to disk"""
80
+ with open(self.index_file, 'w') as f:
81
+ json.dump(self.index, f)
82
+
83
+ def _get_cache_key(self, filename: str) -> str:
84
+ """Generate cache key for filename"""
85
+ return hashlib.md5(filename.encode()).hexdigest()
86
+
87
+ def get_cached_file(self, filename: str) -> Optional[Path]:
88
+ """Get cached file path if exists and valid"""
89
+ cache_key = self._get_cache_key(filename)
90
+ if cache_key in self.index:
91
+ cached_path = Path(self.index[cache_key])
92
+ if cached_path.exists():
93
+ return cached_path
94
+ else:
95
+ # Remove invalid entry
96
+ del self.index[cache_key]
97
+ self._save_index()
98
+ return None
99
+
100
+ def cache_file(self, filename: str, file_data: bytes) -> Path:
101
+ """Cache file data and return path"""
102
+ cache_key = self._get_cache_key(filename)
103
+ # Use original extension if available
104
+ ext = Path(filename).suffix or '.mp3'
105
+ cached_path = self.cache_dir / f"{cache_key}{ext}"
106
+
107
+ with open(cached_path, 'wb') as f:
108
+ f.write(file_data)
109
+
110
+ self.index[cache_key] = str(cached_path)
111
+ self._save_index()
112
+ return cached_path
113
+
114
+
115
+ class AudioDownloader:
116
+ """Handles audio downloading with retry logic"""
117
+
118
+ def __init__(self, base_url: str, cache_manager: CacheManager, max_retries: int = 3):
119
+ self.base_url = base_url
120
+ self.cache_manager = cache_manager
121
+ self.max_retries = max_retries
122
+
123
+ def download_audio(self, filename: str) -> Optional[Tuple[Path, Dict]]:
124
+ """Download and process audio file, return (path, audio_info)"""
125
+ # Check cache first
126
+ cached_path = self.cache_manager.get_cached_file(filename)
127
+ if cached_path:
128
+ return self._load_audio_info(cached_path, filename)
129
+
130
+ # Download file
131
+ url = f"{self.base_url}/{filename}"
132
+
133
+ for attempt in range(self.max_retries):
134
+ try:
135
+ response = requests.get(url, timeout=30)
136
+ response.raise_for_status()
137
+
138
+ # Cache the file
139
+ cached_path = self.cache_manager.cache_file(filename, response.content)
140
+ return self._load_audio_info(cached_path, filename)
141
+
142
+ except Exception as e:
143
+ if attempt < self.max_retries - 1:
144
+ time.sleep(2 ** attempt) # Exponential backoff
145
+ continue
146
+ else:
147
+ print(f" ❌ Failed to download {filename} after {self.max_retries} attempts: {e}")
148
+ return None
149
+
150
+ def _load_audio_info(self, file_path: Path, filename: str) -> Tuple[Path, Dict]:
151
+ """Load audio information and audio data with memory optimization"""
152
+ try:
153
+ # Load audio data with librosa
154
+ sr = TARGET_SAMPLE_RATE if OPTIMIZE_MEMORY else None
155
+ audio_data, sample_rate = librosa.load(str(file_path), sr=sr, mono=True)
156
+
157
+ # Optimize audio data type for memory efficiency
158
+ if OPTIMIZE_MEMORY and AUDIO_DTYPE == 'int16':
159
+ # Convert float32 to int16 to halve memory usage
160
+ audio_data = (audio_data * 32767).astype(np.int16)
161
+
162
+ return file_path, {
163
+ 'filename': filename,
164
+ 'path': str(file_path),
165
+ 'audio_array': audio_data, # Optimized audio array
166
+ 'duration': len(audio_data) / sample_rate,
167
+ 'sample_rate': sample_rate,
168
+ 'channels': 1,
169
+ 'dtype': str(audio_data.dtype)
170
+ }
171
+ except Exception as e:
172
+ # Try with soundfile as fallback
173
+ try:
174
+ import soundfile as sf
175
+ audio_data, sample_rate = sf.read(str(file_path))
176
+ if len(audio_data.shape) > 1:
177
+ audio_data = np.mean(audio_data, axis=1) # Convert to mono
178
+
179
+ # Apply sample rate optimization
180
+ if OPTIMIZE_MEMORY and TARGET_SAMPLE_RATE and sample_rate != TARGET_SAMPLE_RATE:
181
+ import scipy.signal
182
+ num_samples = int(len(audio_data) * TARGET_SAMPLE_RATE / sample_rate)
183
+ audio_data = scipy.signal.resample(audio_data, num_samples)
184
+ sample_rate = TARGET_SAMPLE_RATE
185
+
186
+ # Optimize data type
187
+ if OPTIMIZE_MEMORY and AUDIO_DTYPE == 'int16':
188
+ audio_data = (audio_data * 32767).astype(np.int16)
189
+
190
+ return file_path, {
191
+ 'filename': filename,
192
+ 'path': str(file_path),
193
+ 'audio_array': audio_data,
194
+ 'duration': len(audio_data) / sample_rate,
195
+ 'sample_rate': sample_rate,
196
+ 'channels': 1,
197
+ 'dtype': str(audio_data.dtype)
198
+ }
199
+ except ImportError:
200
+ print(f" ❌ Error loading audio {filename}: {e}")
201
+ return None
202
+
203
+
204
+ class BatchProcessor:
205
+ """Processes annotations in batches to avoid memory issues"""
206
+
207
+ def __init__(self, downloader: AudioDownloader, temp_dir: str, batch_size: int = 100):
208
+ self.downloader = downloader
209
+ self.temp_dir = Path(temp_dir)
210
+ self.temp_dir.mkdir(exist_ok=True)
211
+ self.batch_size = batch_size
212
+
213
+ def process_batch(self, annotations: List[Dict], batch_id: int) -> Optional[Path]:
214
+ """Process a batch of annotations and save to parquet"""
215
+ print(f"\nπŸ“¦ Processing batch {batch_id} with {len(annotations)} annotations...")
216
+
217
+ batch_data = []
218
+
219
+ # Use ThreadPoolExecutor for concurrent downloads
220
+ with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
221
+ # Submit all download tasks
222
+ future_to_annotation = {
223
+ executor.submit(self.downloader.download_audio, ann['audio_file_name']): ann
224
+ for ann in annotations
225
+ }
226
+
227
+ # Process completed downloads
228
+ for future in tqdm(as_completed(future_to_annotation),
229
+ total=len(annotations),
230
+ desc=f"Batch {batch_id}"):
231
+ annotation = future_to_annotation[future]
232
+ try:
233
+ result = future.result()
234
+ if result:
235
+ file_path, audio_info = result
236
+ # Structure audio data for HuggingFace compatibility
237
+ audio_array = audio_info['audio_array']
238
+
239
+ # Convert to list for serialization, handling different dtypes
240
+ if audio_info.get('dtype') == 'int16':
241
+ # For int16, convert to float32 for better compatibility with HF Audio
242
+ array_list = (audio_array.astype(np.float32) / 32767.0).tolist()
243
+ else:
244
+ array_list = audio_array.astype(np.float32).tolist()
245
+
246
+ audio_data = {
247
+ 'array': array_list,
248
+ 'sampling_rate': int(audio_info['sample_rate']),
249
+ 'path': f"audio/{annotation['audio_file_name']}"
250
+ }
251
+
252
+ batch_data.append({
253
+ 'audio': audio_data, # HuggingFace standard audio column
254
+ 'file_name': f"audio/{annotation['audio_file_name']}", # Keep for compatibility
255
+ 'sentence': annotation['sentence'],
256
+ 'speaker': SPEAKER_NAME,
257
+ 'duration': audio_info['duration'],
258
+ 'sample_rate': audio_info['sample_rate']
259
+ })
260
+ except Exception as e:
261
+ print(f" ⚠️ Error processing {annotation['audio_file_name']}: {e}")
262
+
263
+ if not batch_data:
264
+ print(f" ❌ No valid audio files in batch {batch_id}")
265
+ return None
266
+
267
+ # Save batch to parquet
268
+ batch_file = self.temp_dir / f"batch_{batch_id:04d}.parquet"
269
+ df = pd.DataFrame(batch_data)
270
+ df.to_parquet(batch_file, index=False)
271
+
272
+ print(f" βœ… Saved {len(batch_data)} files to {batch_file}")
273
+ return batch_file
274
+
275
+
276
+ class DatasetUploader:
277
+ """Handles HuggingFace dataset upload using best practices"""
278
+
279
+ def __init__(self, temp_dir: str, target_repo: str):
280
+ self.temp_dir = Path(temp_dir)
281
+ self.target_repo = target_repo
282
+ self.api = HfApi()
283
+
284
+ def prepare_dataset_structure(self) -> Path:
285
+ """Prepare dataset structure for upload"""
286
+ dataset_dir = self.temp_dir / "dataset"
287
+ dataset_dir.mkdir(exist_ok=True)
288
+
289
+ # Create audio directory
290
+ audio_dir = dataset_dir / "audio"
291
+ audio_dir.mkdir(exist_ok=True)
292
+
293
+ batch_files = list(self.temp_dir.glob("batch_*.parquet"))
294
+ print(f"\nπŸ“ Preparing dataset structure from {len(batch_files)} batch files...")
295
+
296
+ if USE_GENERATOR:
297
+ # Memory-efficient generator-based approach
298
+ print("🧠 Using memory-efficient generator approach...")
299
+
300
+ def audio_sample_generator():
301
+ """Generator that yields one sample at a time to minimize memory usage"""
302
+ sample_count = 0
303
+ for batch_file in tqdm(batch_files, desc="Processing batch files"):
304
+ try:
305
+ df = pd.read_parquet(batch_file)
306
+ for _, row in df.iterrows():
307
+ sample_count += 1
308
+ yield {
309
+ 'audio': row['audio'],
310
+ 'file_name': row['file_name'],
311
+ 'sentence': row['sentence'],
312
+ 'speaker': row['speaker'],
313
+ 'duration': row['duration'],
314
+ 'sample_rate': row['sample_rate']
315
+ }
316
+ # Clean up processed batch file to save disk space
317
+ batch_file.unlink()
318
+ print(f" 🧹 Cleaned up {batch_file.name}")
319
+ except Exception as e:
320
+ print(f" ⚠️ Error processing {batch_file}: {e}")
321
+ continue
322
+
323
+ print(f" βœ… Generated {sample_count} samples")
324
+
325
+ # Create dataset using generator (memory efficient)
326
+ print(f"\nπŸ”„ Creating HuggingFace dataset using generator...")
327
+
328
+ features = Features({
329
+ 'audio': Audio(sampling_rate=None),
330
+ 'file_name': Value('string'),
331
+ 'sentence': Value('string'),
332
+ 'speaker': Value('string'),
333
+ 'duration': Value('float32'),
334
+ 'sample_rate': Value('int32')
335
+ })
336
+
337
+ dataset = Dataset.from_generator(
338
+ audio_sample_generator,
339
+ features=features,
340
+ cache_dir=str(self.temp_dir / "hf_cache") # Use local cache
341
+ )
342
+
343
+ num_samples = len(dataset)
344
+
345
+ else:
346
+ # Original approach (memory intensive)
347
+ print("⚠️ Using original approach - may consume significant memory...")
348
+ all_data = []
349
+
350
+ for batch_file in tqdm(batch_files, desc="Processing batches"):
351
+ df = pd.read_parquet(batch_file)
352
+ for _, row in df.iterrows():
353
+ all_data.append({
354
+ 'audio': row['audio'],
355
+ 'file_name': row['file_name'],
356
+ 'sentence': row['sentence'],
357
+ 'speaker': row['speaker'],
358
+ 'duration': row['duration'],
359
+ 'sample_rate': row['sample_rate']
360
+ })
361
+
362
+ print(f"\nπŸ”„ Creating HuggingFace dataset with {len(all_data)} samples...")
363
+ df = pd.DataFrame(all_data)
364
+
365
+ features = Features({
366
+ 'audio': Audio(sampling_rate=None),
367
+ 'file_name': Value('string'),
368
+ 'sentence': Value('string'),
369
+ 'speaker': Value('string'),
370
+ 'duration': Value('float32'),
371
+ 'sample_rate': Value('int32')
372
+ })
373
+
374
+ dataset = Dataset.from_pandas(df, features=features)
375
+ num_samples = len(all_data)
376
+
377
+ # Save the dataset in HuggingFace format
378
+ print(f"πŸ’Ύ Saving dataset to disk...")
379
+ dataset.save_to_disk(str(dataset_dir / "dataset"))
380
+
381
+ # Save metadata for compatibility (using a small sample to avoid memory issues)
382
+ print(f"πŸ“‹ Creating metadata files...")
383
+ sample_data = []
384
+ for i, sample in enumerate(dataset.select(range(min(1000, len(dataset))))):
385
+ sample_data.append({
386
+ 'file_name': sample['file_name'],
387
+ 'sentence': sample['sentence'],
388
+ 'speaker': sample['speaker'],
389
+ 'duration': sample['duration'],
390
+ 'sample_rate': sample['sample_rate']
391
+ })
392
+
393
+ metadata_df = pd.DataFrame(sample_data)
394
+ metadata_df.to_parquet(dataset_dir / "train.parquet", index=False)
395
+ metadata_df.to_parquet(dataset_dir / "metadata.parquet", index=False)
396
+
397
+ # Create dataset card
398
+ self._create_dataset_card(dataset_dir, num_samples)
399
+
400
+ print(f" βœ… Dataset prepared with {num_samples} samples in {dataset_dir}")
401
+ return dataset_dir
402
+
403
+ def _create_dataset_card(self, dataset_dir: Path, num_samples: int):
404
+ """Create a basic dataset card"""
405
+ card_content = f"""---
406
+ license: mit
407
+ task_categories:
408
+ - text-to-speech
409
+ language:
410
+ - fa
411
+ tags:
412
+ - tts
413
+ - persian
414
+ - farsi
415
+ - speech-synthesis
416
+ size_categories:
417
+ - {self._get_size_category(num_samples)}
418
+ ---
419
+
420
+ # {TARGET_REPO.split('/')[-1]}
421
+
422
+ This dataset contains {num_samples} Persian TTS samples with the speaker "{SPEAKER_NAME}".
423
+
424
+ ## Dataset Structure
425
+
426
+ - `dataset/`: HuggingFace dataset format with audio arrays
427
+ - `train.parquet`: Training split metadata
428
+ - `metadata.parquet`: General metadata file (same content as train.parquet)
429
+
430
+ **Metadata columns:**
431
+ - `audio`: Audio data with array, sampling_rate, and path
432
+ - `array`: Audio data as float array
433
+ - `sampling_rate`: Sample rate in Hz
434
+ - `path`: Relative path to audio file
435
+ - `file_name`: Relative path to audio files (e.g., "audio/filename.mp3")
436
+ - `sentence`: Transcription text in Persian
437
+ - `speaker`: Speaker identifier ("{SPEAKER_NAME}")
438
+ - `duration`: Audio duration in seconds
439
+ - `sample_rate`: Audio sample rate in Hz
440
+
441
+ ## Usage
442
+
443
+ ```python
444
+ from datasets import load_dataset
445
+
446
+ # Load the dataset
447
+ dataset = load_dataset("{self.target_repo}")
448
+
449
+ # Access audio and transcription
450
+ for item in dataset['train']:
451
+ audio_data = item['audio'] # Dict with 'array', 'sampling_rate', 'path'
452
+ audio_array = audio_data['array'] # Actual audio as numpy array
453
+ sample_rate = audio_data['sampling_rate'] # Sample rate
454
+ text = item['sentence'] # Transcription
455
+ speaker = item['speaker'] # Speaker ID
456
+
457
+ # You can also load with streaming for large datasets
458
+ dataset = load_dataset("{self.target_repo}", streaming=True)
459
+ for item in dataset['train']:
460
+ audio = item['audio']['array'] # Audio array directly
461
+ text = item['sentence'] # Transcription
462
+ ```
463
+
464
+ ## Speaker
465
+
466
+ - **Speaker ID**: {SPEAKER_NAME}
467
+ - **Language**: Persian (Farsi)
468
+ - **Total Samples**: {num_samples}
469
+
470
+ Generated using the TTS annotation system.
471
+ """
472
+
473
+ with open(dataset_dir / "README.md", 'w', encoding='utf-8') as f:
474
+ f.write(card_content)
475
+
476
+ def _get_size_category(self, num_samples: int) -> str:
477
+ """Get size category for dataset card"""
478
+ if num_samples < 1000:
479
+ return "n<1K"
480
+ elif num_samples < 10000:
481
+ return "1K<n<10K"
482
+ elif num_samples < 100000:
483
+ return "10K<n<100K"
484
+ else:
485
+ return "100K<n<1M"
486
+
487
+ def upload_dataset(self, dataset_dir: Path):
488
+ """Upload dataset using HuggingFace best practices"""
489
+ print(f"\nπŸš€ Uploading dataset to {self.target_repo}...")
490
+
491
+ try:
492
+ # Check if dataset directory exists in HF format
493
+ hf_dataset_dir = dataset_dir / "dataset"
494
+ if hf_dataset_dir.exists():
495
+ print("πŸ“¦ Uploading HuggingFace dataset format...")
496
+ # Load and push the dataset
497
+ dataset = Dataset.load_from_disk(str(hf_dataset_dir))
498
+ dataset.push_to_hub(
499
+ self.target_repo,
500
+ commit_message="Add TTS dataset with audio arrays"
501
+ )
502
+ print(f"βœ… Dataset upload completed successfully!")
503
+ else:
504
+ # Fallback to folder upload
505
+ print("πŸ“ Uploading as folder...")
506
+ self.api.upload_large_folder(
507
+ repo_id=self.target_repo,
508
+ repo_type="dataset",
509
+ folder_path=str(dataset_dir)
510
+ )
511
+ print(f"βœ… Folder upload completed successfully!")
512
+
513
+ print(f"Dataset available at: https://huggingface.co/datasets/{self.target_repo}")
514
+
515
+ except Exception as e:
516
+ print(f"❌ Upload failed: {e}")
517
+ print("You can retry the upload or use the prepared dataset directory manually.")
518
+ print(f"Dataset directory: {dataset_dir}")
519
+
520
+ # Fallback to regular upload_folder with commit message
521
+ print("\nπŸ”„ Trying fallback upload method...")
522
+ try:
523
+ self.api.upload_folder(
524
+ repo_id=self.target_repo,
525
+ repo_type="dataset",
526
+ folder_path=str(dataset_dir),
527
+ commit_message="Add TTS dataset with audio arrays"
528
+ )
529
+ print(f"βœ… Fallback upload completed successfully!")
530
+ print(f"Dataset available at: https://huggingface.co/datasets/{self.target_repo}")
531
+ except Exception as fallback_error:
532
+ print(f"❌ Fallback upload also failed: {fallback_error}")
533
+ print(f"Manual upload required. Dataset directory: {dataset_dir}")
534
+ raise
535
+
536
+ def get_approved_annotations():
537
+ """Get all approved annotations from the database"""
538
+ connection = pymysql.connect(**DB_CONFIG)
539
+ try:
540
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
541
+ # Query for approved annotations
542
+ query = """
543
+ SELECT
544
+ a.annotated_sentence as sentence,
545
+ td.filename as audio_file_name
546
+ FROM annotations a
547
+ JOIN validations v ON a.id = v.annotation_id
548
+ JOIN tts_data td ON a.tts_data_id = td.id
549
+ WHERE v.validated = 1
550
+ """
551
+ cursor.execute(query)
552
+ results = cursor.fetchall()
553
+ print(f"Found {len(results)} approved annotations")
554
+ return results
555
+ finally:
556
+ connection.close()
557
+
558
+
559
+ def cleanup_temp_files(temp_dir: Path, keep_dataset: bool = True):
560
+ """Clean up temporary files"""
561
+ if not keep_dataset and temp_dir.exists():
562
+ shutil.rmtree(temp_dir)
563
+ print(f"🧹 Cleaned up temporary directory: {temp_dir}")
564
+ else:
565
+ # Only clean up batch files, keep the dataset
566
+ batch_files = list(temp_dir.glob("batch_*.parquet"))
567
+ for batch_file in batch_files:
568
+ batch_file.unlink()
569
+ print(f"🧹 Cleaned up {len(batch_files)} batch files")
570
+
571
+
572
+ def main():
573
+ """Main export function with improved error handling and performance"""
574
+ print("πŸš€ Starting optimized TTS data export to Hugging Face...")
575
+ print(f"πŸ“Š Configuration:")
576
+ print(f" - Target repository: {TARGET_REPO}")
577
+ print(f" - Speaker: {SPEAKER_NAME}")
578
+ print(f" - Batch size: {BATCH_SIZE}")
579
+ print(f" - Cache directory: {CACHE_DIR}")
580
+ print(f" - Max concurrent downloads: {MAX_WORKERS}")
581
+
582
+ if OPTIMIZE_MEMORY:
583
+ print(f"🧠 Memory Optimizations Enabled:")
584
+ print(f" - Target sample rate: {TARGET_SAMPLE_RATE or 'Original'}")
585
+ print(f" - Audio data type: {AUDIO_DTYPE}")
586
+ print(f" - Generator-based processing: {USE_GENERATOR}")
587
+ else:
588
+ print("⚠️ Memory optimizations disabled - may consume significant RAM")
589
+
590
+ try:
591
+ # Initialize components
592
+ cache_manager = CacheManager(CACHE_DIR)
593
+ downloader = AudioDownloader(AUDIO_BASE_URL, cache_manager, MAX_RETRIES)
594
+ processor = BatchProcessor(downloader, TEMP_DIR, BATCH_SIZE)
595
+ uploader = DatasetUploader(TEMP_DIR, TARGET_REPO)
596
+
597
+ # Get approved annotations
598
+ print("\nπŸ“‹ Fetching approved annotations from database...")
599
+ annotations = get_approved_annotations()
600
+
601
+ if not annotations:
602
+ print("❌ No approved annotations found!")
603
+ return
604
+
605
+ total_batches = (len(annotations) + BATCH_SIZE - 1) // BATCH_SIZE
606
+ print(f"πŸ“¦ Will process {len(annotations)} annotations in {total_batches} batches")
607
+
608
+ # Process annotations in batches
609
+ batch_files = []
610
+ for i in range(0, len(annotations), BATCH_SIZE):
611
+ batch_id = i // BATCH_SIZE + 1
612
+ batch_annotations = annotations[i:i + BATCH_SIZE]
613
+
614
+ batch_file = processor.process_batch(batch_annotations, batch_id)
615
+ if batch_file:
616
+ batch_files.append(batch_file)
617
+
618
+ if not batch_files:
619
+ print("❌ No batches were processed successfully!")
620
+ return
621
+
622
+ print(f"\nβœ… Successfully processed {len(batch_files)} batches")
623
+
624
+ # Prepare dataset structure
625
+ dataset_dir = uploader.prepare_dataset_structure()
626
+
627
+ # Login to HF
628
+ print("\nπŸ”‘ Logging in to Hugging Face...")
629
+ try:
630
+ login() # Will use HF_TOKEN env var or prompt for token
631
+ except Exception as e:
632
+ print(f"❌ HF login failed: {e}")
633
+ print("Make sure you have HF_TOKEN environment variable set or login manually")
634
+ return
635
+
636
+ # Upload dataset
637
+ uploader.upload_dataset(dataset_dir)
638
+
639
+ # Cleanup
640
+ cleanup_temp_files(Path(TEMP_DIR), keep_dataset=True)
641
+
642
+ print("\nπŸŽ‰ Export completed successfully!")
643
+ print(f"πŸ“Š Final stats:")
644
+ print(f" - Total annotations processed: {len(annotations)}")
645
+ print(f" - Successful batches: {len(batch_files)}")
646
+ print(f" - Dataset URL: https://huggingface.co/datasets/{TARGET_REPO}")
647
+ print(f" - Local dataset copy: {dataset_dir}")
648
+
649
+ except KeyboardInterrupt:
650
+ print("\n⚠️ Process interrupted by user")
651
+ print("πŸ’‘ You can resume by running the script again - cached files will be reused")
652
+ except Exception as e:
653
+ print(f"\n❌ Error during export: {e}")
654
+ print("πŸ’‘ Check the error above and try again - cached files will be reused")
655
+ raise
656
+
657
+
658
+ if __name__ == "__main__":
659
+ main()
utils/ftp_audio_loader.py ADDED
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ftp_audio_loader.py
2
+
3
+ import io
4
+ import ftplib
5
+ from urllib.parse import urlparse
6
+ import numpy as np
7
+ from pydub import AudioSegment
8
+
9
+ class FtpAudioLoader:
10
+ def __init__(self, ftp_url: str) -> None:
11
+ """
12
+ Initialize FTP loader with URL format: ftp://username:password@host/path
13
+ """
14
+ self.parsed_url = urlparse(ftp_url)
15
+ self.host = self.parsed_url.hostname
16
+ self.username = self.parsed_url.username
17
+ self.password = self.parsed_url.password
18
+ self.base_path = self.parsed_url.path
19
+
20
+ if not self.base_path.endswith("/"):
21
+ self.base_path += "/"
22
+
23
+ def _download_to_buf(self, filename: str) -> io.BytesIO:
24
+ """Download file from FTP server to buffer"""
25
+ try:
26
+ # Connect to FTP server
27
+ ftp = ftplib.FTP()
28
+ ftp.connect(self.host)
29
+ ftp.login(self.username, self.password)
30
+
31
+ # Navigate to the directory
32
+ if self.base_path and self.base_path != "/":
33
+ ftp.cwd(self.base_path.strip("/"))
34
+
35
+ # Download file to buffer
36
+ buf = io.BytesIO()
37
+ ftp.retrbinary(f"RETR {filename}", buf.write)
38
+ ftp.quit()
39
+
40
+ buf.seek(0)
41
+ return buf
42
+
43
+ except ftplib.error_perm as e:
44
+ if "550" in str(e): # File not found
45
+ raise FileNotFoundError(f"'{filename}' not found on FTP server")
46
+ else:
47
+ raise Exception(f"FTP error: {e}")
48
+ except Exception as e:
49
+ raise Exception(f"Failed to download '{filename}' from FTP: {e}")
50
+
51
+ def load_audio(self, filename: str) -> tuple[int, np.ndarray]:
52
+ """Load audio file and return sample rate and samples"""
53
+ buf = self._download_to_buf(filename)
54
+ seg = AudioSegment.from_file(buf)
55
+ samples = np.array(seg.get_array_of_samples())
56
+
57
+ if seg.channels > 1:
58
+ samples = samples.reshape(-1, seg.channels)
59
+
60
+ if np.issubdtype(samples.dtype, np.integer):
61
+ max_int = np.iinfo(samples.dtype).max
62
+ samples = samples.astype(np.float32)
63
+ samples /= max_int
64
+ else:
65
+ max_val = np.abs(samples).max()
66
+ if max_val > 1:
67
+ samples = samples / max_val
68
+ samples = samples.astype(np.float32)
69
+
70
+ return seg.frame_rate, samples
71
+
72
+ def get_audio_duration(self, filename: str) -> float:
73
+ """Get duration of audio file in seconds"""
74
+ buf = self._download_to_buf(filename)
75
+ seg = AudioSegment.from_file(buf)
76
+ return len(seg) / 1000.0 # Convert milliseconds to seconds