PrinzPesia's picture
Update app.py
b8ea041 verified
import gradio as gr
import subprocess
import os
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
import pytz # Für Zeitzonen-Management (nicht direkt verwendet, aber gute Praxis)
# --- Konfiguration ---
OUTPUT_DIR = "recordings"
# Erstelle das Verzeichnis, falls es nicht existiert
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Initialisiere den Scheduler
scheduler = BackgroundScheduler()
scheduler.start()
# --- Funktionen für die Aufnahme und Planung ---
def record_radio_stream(stream_url: str, output_filename: str, duration_seconds: int):
"""
Startet die Aufnahme eines Webradio-Streams mit ffmpeg.
"""
full_output_path = os.path.join(OUTPUT_DIR, output_filename)
print(f"🎬 Starte Aufnahme von {stream_url} für {duration_seconds} Sekunden nach {full_output_path}...")
# ffmpeg Kommando:
# -i: Input URL
# -c:a copy: Audio-Codec kopieren (kein Re-Encoding), das spart CPU und Zeit
# -map 0:a: Stelle sicher, dass nur der Audio-Stream gemappt wird
# -t: Dauer der Aufnahme in Sekunden
command = [
"ffmpeg",
"-i", stream_url,
"-c:a", "copy",
"-map", "0:a",
"-t", str(duration_seconds),
full_output_path
]
try:
# Führe den ffmpeg-Befehl aus
# check=True wird eine CalledProcessError erzeugen, wenn ffmpeg fehlschlägt
subprocess.run(command, check=True, capture_output=True)
print(f"✅ Aufnahme abgeschlossen: {full_output_path}")
return full_output_path
except subprocess.CalledProcessError as e:
print(f"❌ Fehler bei der Aufnahme von {stream_url}:")
print(f"Stdout: {e.stdout.decode()}")
print(f"Stderr: {e.stderr.decode()}")
# Lösche unvollständige Datei, falls vorhanden
if os.path.exists(full_output_path):
os.remove(full_output_path)
return None # Signalisiere Fehler
def schedule_recording(stream_url: str, start_datetime_obj: datetime, end_datetime_obj: datetime):
"""
Plant eine Webradio-Aufnahme basierend auf Start- und Endzeit.
Die Zeiten kommen direkt als datetime-Objekte von gr.DateTime.
"""
try:
# Berechne die Dauer der Aufnahme in Sekunden
duration = (end_datetime_obj - start_datetime_obj).total_seconds()
if duration <= 0:
return "❌ Fehler: Die Endzeit muss nach der Startzeit liegen."
# Generiere einen eindeutigen Dateinamen
timestamp = start_datetime_obj.strftime("%Y%m%d_%H%M%S")
output_filename = f"radio_recording_{timestamp}.mp3"
# Füge den Job zum Scheduler hinzu
# 'date' trigger bedeutet, dass der Job zu einem spezifischen Datum und Uhrzeit ausgeführt wird
scheduler.add_job(
record_radio_stream,
'date',
run_date=start_datetime_obj,
args=[stream_url, output_filename, int(duration)] # Dauer als Integer übergeben
)
# Zeige geplante Jobs an (optional, zur Fehlersuche)
# for job in scheduler.get_jobs():
# print(f"Geplanter Job: {job.id} - Nächste Ausführung: {job.next_run_time}")
return f"✅ Aufnahme von **{stream_url}** erfolgreich geplant.\nStart: **{start_datetime_obj.strftime('%Y-%m-%d %H:%M:%S')}** | Ende: **{end_datetime_obj.strftime('%Y-%m-%d %H:%M:%S')}**.\nDatei: **{output_filename}**\nBitte aktualisiere die Dateiliste, nachdem die Aufnahme abgeschlossen ist."
except Exception as e:
return f"❌ Ein unerwarteter Fehler ist aufgetreten: {e}"
def get_recorded_files():
"""
Gibt eine Liste der Pfade zu allen aufgenommenen MP3-Dateien zurück.
"""
files = [os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR) if f.endswith(".mp3")]
# Gradio gr.Files erwartet eine Liste von Pfaden.
# Wenn keine Dateien da sind, gibt eine leere Liste zurück.
return files
# --- Gradio UI Definition ---
with gr.Blocks() as demo:
gr.Markdown("# 📻 Webradio Recorder")
gr.Markdown(
"Plane deine Webradio-Aufnahmen! Gib die Stream-URL, Start- und Endzeit an, "
"und die App nimmt den Stream für dich auf. Die fertige Datei kannst du dann herunterladen."
)
with gr.Tab("Aufnahme planen"):
with gr.Row():
stream_url_input = gr.Textbox(
label="Stream URL",
placeholder="z.B. http://stream.laut.fm/meinstream (MP3- oder AAC-Stream)"
)
with gr.Row():
# HIER WURDE GR.DATETIME VERWENDET
start_datetime_input = gr.DateTime(
label="Start Datum & Uhrzeit",
value=datetime.now() + timedelta(minutes=1), # Voreinstellung: 1 Minute in der Zukunft
info="Wähle das Datum und die Uhrzeit, wann die Aufnahme beginnen soll."
)
# HIER WURDE GR.DATETIME VERWENDET
end_datetime_input = gr.DateTime(
label="End Datum & Uhrzeit",
value=datetime.now() + timedelta(minutes=10), # Voreinstellung: 10 Minuten in der Zukunft
info="Wähle das Datum und die Uhrzeit, wann die Aufnahme enden soll."
)
schedule_button = gr.Button("▶️ Aufnahme planen", variant="primary")
schedule_output_message = gr.Textbox(label="Status der Planung", interactive=False)
schedule_button.click(
fn=schedule_recording,
inputs=[stream_url_input, start_datetime_input, end_datetime_input],
outputs=schedule_output_message
)
with gr.Tab("Verfügbare Aufnahmen"):
gr.Markdown("Hier siehst du alle bisher aufgenommenen Dateien.")
refresh_files_button = gr.Button("🔄 Aufnahmen aktualisieren", variant="secondary")
# gr.Files ermöglicht das Herunterladen der Dateien
# HIER WURDE DER PARAMETER 'type' GEÄNDERT
downloadable_files = gr.Files(label="Deine Aufnahmen zum Herunterladen", type="filepath")
# Wenn der "Aktualisieren"-Button geklickt wird, rufe die Funktion auf, um die Dateien zu listen
refresh_files_button.click(
fn=get_recorded_files,
outputs=downloadable_files
)
# Beim Laden der App auch die Dateien einmal anzeigen
demo.load(
fn=get_recorded_files,
outputs=downloadable_files
)
# --- App starten ---
if __name__ == "__main__":
demo.launch()