File size: 2,689 Bytes
8dcb829
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# ftp_audio_loader.py

import io
import ftplib
from urllib.parse import urlparse
import numpy as np
from pydub import AudioSegment

class FtpAudioLoader:
    def __init__(self, ftp_url: str) -> None:
        """
        Initialize FTP loader with URL format: ftp://username:password@host/path
        """
        self.parsed_url = urlparse(ftp_url)
        self.host = self.parsed_url.hostname
        self.username = self.parsed_url.username
        self.password = self.parsed_url.password
        self.base_path = self.parsed_url.path
        
        if not self.base_path.endswith("/"):
            self.base_path += "/"

    def _download_to_buf(self, filename: str) -> io.BytesIO:
        """Download file from FTP server to buffer"""
        try:
            # Connect to FTP server
            ftp = ftplib.FTP()
            ftp.connect(self.host)
            ftp.login(self.username, self.password)
            
            # Navigate to the directory
            if self.base_path and self.base_path != "/":
                ftp.cwd(self.base_path.strip("/"))
            
            # Download file to buffer
            buf = io.BytesIO()
            ftp.retrbinary(f"RETR {filename}", buf.write)
            ftp.quit()
            
            buf.seek(0)
            return buf
            
        except ftplib.error_perm as e:
            if "550" in str(e):  # File not found
                raise FileNotFoundError(f"'{filename}' not found on FTP server")
            else:
                raise Exception(f"FTP error: {e}")
        except Exception as e:
            raise Exception(f"Failed to download '{filename}' from FTP: {e}")

    def load_audio(self, filename: str) -> tuple[int, np.ndarray]:
        """Load audio file and return sample rate and samples"""
        buf = self._download_to_buf(filename)
        seg = AudioSegment.from_file(buf)
        samples = np.array(seg.get_array_of_samples())

        if seg.channels > 1:
            samples = samples.reshape(-1, seg.channels)

        if np.issubdtype(samples.dtype, np.integer):
            max_int = np.iinfo(samples.dtype).max 
            samples = samples.astype(np.float32)
            samples /= max_int  
        else:
            max_val = np.abs(samples).max()
            if max_val > 1:
                samples = samples / max_val
            samples = samples.astype(np.float32)

        return seg.frame_rate, samples

    def get_audio_duration(self, filename: str) -> float:
        """Get duration of audio file in seconds"""
        buf = self._download_to_buf(filename)
        seg = AudioSegment.from_file(buf)
        return len(seg) / 1000.0  # Convert milliseconds to seconds