Dreamspire's picture
custom_nodes
f2dbf59
import re
import os
import torch
import shutil
import subprocess
import folder_paths
from torch import Tensor
from collections.abc import Mapping
audio_extensions = ['mp3', 'mp4', 'wav', 'ogg']
video_extensions = ['webm', 'mp4', 'mov']
def ffmpeg_suitability(path):
try:
version = subprocess.run([path, "-version"], check=True,
capture_output=True).stdout.decode("utf-8")
except:
return 0
score = 0
#rough layout of the importance of various features
simple_criterion = [("libvpx", 20),("264",10), ("265",3),
("svtav1",5),("libopus", 1)]
for criterion in simple_criterion:
if version.find(criterion[0]) >= 0:
score += criterion[1]
#obtain rough compile year from copyright information
copyright_index = version.find('2000-2')
if copyright_index >= 0:
copyright_year = version[copyright_index+6:copyright_index+9]
if copyright_year.isnumeric():
score += int(copyright_year)
return score
folder_paths.folder_names_and_paths["VHS_video_formats"] = (
[
os.path.join(os.path.dirname(os.path.abspath(__file__)), "video_formats"),
],
[".json"]
)
if "VHS_FORCE_FFMPEG_PATH" in os.environ:
ffmpeg_path = os.environ.get("VHS_FORCE_FFMPEG_PATH")
else:
ffmpeg_paths = []
try:
from imageio_ffmpeg import get_ffmpeg_exe
imageio_ffmpeg_path = get_ffmpeg_exe()
ffmpeg_paths.append(imageio_ffmpeg_path)
except:
if "VHS_USE_IMAGEIO_FFMPEG" in os.environ:
raise
print("Failed to import imageio_ffmpeg")
if "VHS_USE_IMAGEIO_FFMPEG" in os.environ:
ffmpeg_path = imageio_ffmpeg_path
else:
system_ffmpeg = shutil.which("ffmpeg")
if system_ffmpeg is not None:
ffmpeg_paths.append(system_ffmpeg)
if os.path.isfile("ffmpeg"):
ffmpeg_paths.append(os.path.abspath("ffmpeg"))
if os.path.isfile("ffmpeg.exe"):
ffmpeg_paths.append(os.path.abspath("ffmpeg.exe"))
if len(ffmpeg_paths) == 0:
print("No valid ffmpeg found.")
ffmpeg_path = None
elif len(ffmpeg_paths) == 1:
#Evaluation of suitability isn't required, can take sole option
#to reduce startup time
ffmpeg_path = ffmpeg_paths[0]
else:
ffmpeg_path = max(ffmpeg_paths, key=ffmpeg_suitability)
gifski_path = os.environ.get("VHS_GIFSKI", None)
if gifski_path is None:
gifski_path = os.environ.get("JOV_GIFSKI", None)
if gifski_path is None:
gifski_path = shutil.which("gifski")
ytdl_path = os.environ.get("VHS_YTDL", None) or shutil.which('yt-dlp') \
or shutil.which('youtube-dl')
def get_audio(file, start_time=0, duration=0):
args = [ffmpeg_path, "-i", file]
if start_time > 0:
args += ["-ss", str(start_time)]
if duration > 0:
args += ["-t", str(duration)]
try:
#TODO: scan for sample rate and maintain
res = subprocess.run(args + ["-f", "f32le", "-"],
capture_output=True, check=True)
audio = torch.frombuffer(bytearray(res.stdout), dtype=torch.float32)
match = re.search(', (\\d+) Hz, (\\w+), ',res.stderr.decode('utf-8'))
except subprocess.CalledProcessError as e:
raise Exception(f"VHS failed to extract audio from {file}:\n" \
+ e.stderr.decode("utf-8"))
if match:
ar = int(match.group(1))
#NOTE: Just throwing an error for other channel types right now
#Will deal with issues if they come
ac = {"mono": 1, "stereo": 2}[match.group(2)]
else:
ar = 44100
ac = 2
audio = audio.reshape((-1,ac)).transpose(0,1).unsqueeze(0)
return {'waveform': audio, 'sample_rate': ar}
class LazyAudioMap(Mapping):
def __init__(self, file, start_time, duration):
self.file = file
self.start_time=start_time
self.duration=duration
self._dict=None
def __getitem__(self, key):
if self._dict is None:
self._dict = get_audio(self.file, self.start_time, self.duration)
return self._dict[key]
def __iter__(self):
if self._dict is None:
self._dict = get_audio(self.file, self.start_time, self.duration)
return iter(self._dict)
def __len__(self):
if self._dict is None:
self._dict = get_audio(self.file, self.start_time, self.duration)
return len(self._dict)
def lazy_get_audio(file, start_time=0, duration=0):
return LazyAudioMap(file, start_time, duration)