tts_labeling / utils /ftp_audio_loader.py
vargha's picture
auxiliray scripts for dataset managements
8dcb829
# ftp_audio_loader.py
import io
import ftplib
from urllib.parse import urlparse
import numpy as np
from pydub import AudioSegment
class FtpAudioLoader:
def __init__(self, ftp_url: str) -> None:
"""
Initialize FTP loader with URL format: ftp://username:password@host/path
"""
self.parsed_url = urlparse(ftp_url)
self.host = self.parsed_url.hostname
self.username = self.parsed_url.username
self.password = self.parsed_url.password
self.base_path = self.parsed_url.path
if not self.base_path.endswith("/"):
self.base_path += "/"
def _download_to_buf(self, filename: str) -> io.BytesIO:
"""Download file from FTP server to buffer"""
try:
# Connect to FTP server
ftp = ftplib.FTP()
ftp.connect(self.host)
ftp.login(self.username, self.password)
# Navigate to the directory
if self.base_path and self.base_path != "/":
ftp.cwd(self.base_path.strip("/"))
# Download file to buffer
buf = io.BytesIO()
ftp.retrbinary(f"RETR {filename}", buf.write)
ftp.quit()
buf.seek(0)
return buf
except ftplib.error_perm as e:
if "550" in str(e): # File not found
raise FileNotFoundError(f"'{filename}' not found on FTP server")
else:
raise Exception(f"FTP error: {e}")
except Exception as e:
raise Exception(f"Failed to download '{filename}' from FTP: {e}")
def load_audio(self, filename: str) -> tuple[int, np.ndarray]:
"""Load audio file and return sample rate and samples"""
buf = self._download_to_buf(filename)
seg = AudioSegment.from_file(buf)
samples = np.array(seg.get_array_of_samples())
if seg.channels > 1:
samples = samples.reshape(-1, seg.channels)
if np.issubdtype(samples.dtype, np.integer):
max_int = np.iinfo(samples.dtype).max
samples = samples.astype(np.float32)
samples /= max_int
else:
max_val = np.abs(samples).max()
if max_val > 1:
samples = samples / max_val
samples = samples.astype(np.float32)
return seg.frame_rate, samples
def get_audio_duration(self, filename: str) -> float:
"""Get duration of audio file in seconds"""
buf = self._download_to_buf(filename)
seg = AudioSegment.from_file(buf)
return len(seg) / 1000.0 # Convert milliseconds to seconds