Spaces:
Running
Running
# ftp_audio_loader.py | |
import io | |
import ftplib | |
from urllib.parse import urlparse | |
import numpy as np | |
from pydub import AudioSegment | |
class FtpAudioLoader: | |
def __init__(self, ftp_url: str) -> None: | |
""" | |
Initialize FTP loader with URL format: ftp://username:password@host/path | |
""" | |
self.parsed_url = urlparse(ftp_url) | |
self.host = self.parsed_url.hostname | |
self.username = self.parsed_url.username | |
self.password = self.parsed_url.password | |
self.base_path = self.parsed_url.path | |
if not self.base_path.endswith("/"): | |
self.base_path += "/" | |
def _download_to_buf(self, filename: str) -> io.BytesIO: | |
"""Download file from FTP server to buffer""" | |
try: | |
# Connect to FTP server | |
ftp = ftplib.FTP() | |
ftp.connect(self.host) | |
ftp.login(self.username, self.password) | |
# Navigate to the directory | |
if self.base_path and self.base_path != "/": | |
ftp.cwd(self.base_path.strip("/")) | |
# Download file to buffer | |
buf = io.BytesIO() | |
ftp.retrbinary(f"RETR {filename}", buf.write) | |
ftp.quit() | |
buf.seek(0) | |
return buf | |
except ftplib.error_perm as e: | |
if "550" in str(e): # File not found | |
raise FileNotFoundError(f"'{filename}' not found on FTP server") | |
else: | |
raise Exception(f"FTP error: {e}") | |
except Exception as e: | |
raise Exception(f"Failed to download '{filename}' from FTP: {e}") | |
def load_audio(self, filename: str) -> tuple[int, np.ndarray]: | |
"""Load audio file and return sample rate and samples""" | |
buf = self._download_to_buf(filename) | |
seg = AudioSegment.from_file(buf) | |
samples = np.array(seg.get_array_of_samples()) | |
if seg.channels > 1: | |
samples = samples.reshape(-1, seg.channels) | |
if np.issubdtype(samples.dtype, np.integer): | |
max_int = np.iinfo(samples.dtype).max | |
samples = samples.astype(np.float32) | |
samples /= max_int | |
else: | |
max_val = np.abs(samples).max() | |
if max_val > 1: | |
samples = samples / max_val | |
samples = samples.astype(np.float32) | |
return seg.frame_rate, samples | |
def get_audio_duration(self, filename: str) -> float: | |
"""Get duration of audio file in seconds""" | |
buf = self._download_to_buf(filename) | |
seg = AudioSegment.from_file(buf) | |
return len(seg) / 1000.0 # Convert milliseconds to seconds | |