Spaces:
Runtime error
Runtime error
import re | |
import os | |
import torch | |
import shutil | |
import subprocess | |
import folder_paths | |
from torch import Tensor | |
from collections.abc import Mapping | |
audio_extensions = ['mp3', 'mp4', 'wav', 'ogg'] | |
video_extensions = ['webm', 'mp4', 'mov'] | |
def ffmpeg_suitability(path): | |
try: | |
version = subprocess.run([path, "-version"], check=True, | |
capture_output=True).stdout.decode("utf-8") | |
except: | |
return 0 | |
score = 0 | |
#rough layout of the importance of various features | |
simple_criterion = [("libvpx", 20),("264",10), ("265",3), | |
("svtav1",5),("libopus", 1)] | |
for criterion in simple_criterion: | |
if version.find(criterion[0]) >= 0: | |
score += criterion[1] | |
#obtain rough compile year from copyright information | |
copyright_index = version.find('2000-2') | |
if copyright_index >= 0: | |
copyright_year = version[copyright_index+6:copyright_index+9] | |
if copyright_year.isnumeric(): | |
score += int(copyright_year) | |
return score | |
folder_paths.folder_names_and_paths["VHS_video_formats"] = ( | |
[ | |
os.path.join(os.path.dirname(os.path.abspath(__file__)), "video_formats"), | |
], | |
[".json"] | |
) | |
if "VHS_FORCE_FFMPEG_PATH" in os.environ: | |
ffmpeg_path = os.environ.get("VHS_FORCE_FFMPEG_PATH") | |
else: | |
ffmpeg_paths = [] | |
try: | |
from imageio_ffmpeg import get_ffmpeg_exe | |
imageio_ffmpeg_path = get_ffmpeg_exe() | |
ffmpeg_paths.append(imageio_ffmpeg_path) | |
except: | |
if "VHS_USE_IMAGEIO_FFMPEG" in os.environ: | |
raise | |
print("Failed to import imageio_ffmpeg") | |
if "VHS_USE_IMAGEIO_FFMPEG" in os.environ: | |
ffmpeg_path = imageio_ffmpeg_path | |
else: | |
system_ffmpeg = shutil.which("ffmpeg") | |
if system_ffmpeg is not None: | |
ffmpeg_paths.append(system_ffmpeg) | |
if os.path.isfile("ffmpeg"): | |
ffmpeg_paths.append(os.path.abspath("ffmpeg")) | |
if os.path.isfile("ffmpeg.exe"): | |
ffmpeg_paths.append(os.path.abspath("ffmpeg.exe")) | |
if len(ffmpeg_paths) == 0: | |
print("No valid ffmpeg found.") | |
ffmpeg_path = None | |
elif len(ffmpeg_paths) == 1: | |
#Evaluation of suitability isn't required, can take sole option | |
#to reduce startup time | |
ffmpeg_path = ffmpeg_paths[0] | |
else: | |
ffmpeg_path = max(ffmpeg_paths, key=ffmpeg_suitability) | |
gifski_path = os.environ.get("VHS_GIFSKI", None) | |
if gifski_path is None: | |
gifski_path = os.environ.get("JOV_GIFSKI", None) | |
if gifski_path is None: | |
gifski_path = shutil.which("gifski") | |
ytdl_path = os.environ.get("VHS_YTDL", None) or shutil.which('yt-dlp') \ | |
or shutil.which('youtube-dl') | |
def get_audio(file, start_time=0, duration=0): | |
args = [ffmpeg_path, "-i", file] | |
if start_time > 0: | |
args += ["-ss", str(start_time)] | |
if duration > 0: | |
args += ["-t", str(duration)] | |
try: | |
#TODO: scan for sample rate and maintain | |
res = subprocess.run(args + ["-f", "f32le", "-"], | |
capture_output=True, check=True) | |
audio = torch.frombuffer(bytearray(res.stdout), dtype=torch.float32) | |
match = re.search(', (\\d+) Hz, (\\w+), ',res.stderr.decode('utf-8')) | |
except subprocess.CalledProcessError as e: | |
raise Exception(f"VHS failed to extract audio from {file}:\n" \ | |
+ e.stderr.decode("utf-8")) | |
if match: | |
ar = int(match.group(1)) | |
#NOTE: Just throwing an error for other channel types right now | |
#Will deal with issues if they come | |
ac = {"mono": 1, "stereo": 2}[match.group(2)] | |
else: | |
ar = 44100 | |
ac = 2 | |
audio = audio.reshape((-1,ac)).transpose(0,1).unsqueeze(0) | |
return {'waveform': audio, 'sample_rate': ar} | |
class LazyAudioMap(Mapping): | |
def __init__(self, file, start_time, duration): | |
self.file = file | |
self.start_time=start_time | |
self.duration=duration | |
self._dict=None | |
def __getitem__(self, key): | |
if self._dict is None: | |
self._dict = get_audio(self.file, self.start_time, self.duration) | |
return self._dict[key] | |
def __iter__(self): | |
if self._dict is None: | |
self._dict = get_audio(self.file, self.start_time, self.duration) | |
return iter(self._dict) | |
def __len__(self): | |
if self._dict is None: | |
self._dict = get_audio(self.file, self.start_time, self.duration) | |
return len(self._dict) | |
def lazy_get_audio(file, start_time=0, duration=0): | |
return LazyAudioMap(file, start_time, duration) | |