import gradio as gr import subprocess import os from datetime import datetime, timedelta from apscheduler.schedulers.background import BackgroundScheduler import pytz # Für Zeitzonen-Management (nicht direkt verwendet, aber gute Praxis) # --- Konfiguration --- OUTPUT_DIR = "recordings" # Erstelle das Verzeichnis, falls es nicht existiert os.makedirs(OUTPUT_DIR, exist_ok=True) # Initialisiere den Scheduler scheduler = BackgroundScheduler() scheduler.start() # --- Funktionen für die Aufnahme und Planung --- def record_radio_stream(stream_url: str, output_filename: str, duration_seconds: int): """ Startet die Aufnahme eines Webradio-Streams mit ffmpeg. """ full_output_path = os.path.join(OUTPUT_DIR, output_filename) print(f"🎬 Starte Aufnahme von {stream_url} für {duration_seconds} Sekunden nach {full_output_path}...") # ffmpeg Kommando: # -i: Input URL # -c:a copy: Audio-Codec kopieren (kein Re-Encoding), das spart CPU und Zeit # -map 0:a: Stelle sicher, dass nur der Audio-Stream gemappt wird # -t: Dauer der Aufnahme in Sekunden command = [ "ffmpeg", "-i", stream_url, "-c:a", "copy", "-map", "0:a", "-t", str(duration_seconds), full_output_path ] try: # Führe den ffmpeg-Befehl aus # check=True wird eine CalledProcessError erzeugen, wenn ffmpeg fehlschlägt subprocess.run(command, check=True, capture_output=True) print(f"✅ Aufnahme abgeschlossen: {full_output_path}") return full_output_path except subprocess.CalledProcessError as e: print(f"❌ Fehler bei der Aufnahme von {stream_url}:") print(f"Stdout: {e.stdout.decode()}") print(f"Stderr: {e.stderr.decode()}") # Lösche unvollständige Datei, falls vorhanden if os.path.exists(full_output_path): os.remove(full_output_path) return None # Signalisiere Fehler def schedule_recording(stream_url: str, start_datetime_obj: datetime, end_datetime_obj: datetime): """ Plant eine Webradio-Aufnahme basierend auf Start- und Endzeit. Die Zeiten kommen direkt als datetime-Objekte von gr.DateTime. """ try: # Berechne die Dauer der Aufnahme in Sekunden duration = (end_datetime_obj - start_datetime_obj).total_seconds() if duration <= 0: return "❌ Fehler: Die Endzeit muss nach der Startzeit liegen." # Generiere einen eindeutigen Dateinamen timestamp = start_datetime_obj.strftime("%Y%m%d_%H%M%S") output_filename = f"radio_recording_{timestamp}.mp3" # Füge den Job zum Scheduler hinzu # 'date' trigger bedeutet, dass der Job zu einem spezifischen Datum und Uhrzeit ausgeführt wird scheduler.add_job( record_radio_stream, 'date', run_date=start_datetime_obj, args=[stream_url, output_filename, int(duration)] # Dauer als Integer übergeben ) # Zeige geplante Jobs an (optional, zur Fehlersuche) # for job in scheduler.get_jobs(): # print(f"Geplanter Job: {job.id} - Nächste Ausführung: {job.next_run_time}") return f"✅ Aufnahme von **{stream_url}** erfolgreich geplant.\nStart: **{start_datetime_obj.strftime('%Y-%m-%d %H:%M:%S')}** | Ende: **{end_datetime_obj.strftime('%Y-%m-%d %H:%M:%S')}**.\nDatei: **{output_filename}**\nBitte aktualisiere die Dateiliste, nachdem die Aufnahme abgeschlossen ist." except Exception as e: return f"❌ Ein unerwarteter Fehler ist aufgetreten: {e}" def get_recorded_files(): """ Gibt eine Liste der Pfade zu allen aufgenommenen MP3-Dateien zurück. """ files = [os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR) if f.endswith(".mp3")] # Gradio gr.Files erwartet eine Liste von Pfaden. # Wenn keine Dateien da sind, gibt eine leere Liste zurück. return files # --- Gradio UI Definition --- with gr.Blocks() as demo: gr.Markdown("# 📻 Webradio Recorder") gr.Markdown( "Plane deine Webradio-Aufnahmen! Gib die Stream-URL, Start- und Endzeit an, " "und die App nimmt den Stream für dich auf. Die fertige Datei kannst du dann herunterladen." ) with gr.Tab("Aufnahme planen"): with gr.Row(): stream_url_input = gr.Textbox( label="Stream URL", placeholder="z.B. http://stream.laut.fm/meinstream (MP3- oder AAC-Stream)" ) with gr.Row(): # HIER WURDE GR.DATETIME VERWENDET start_datetime_input = gr.DateTime( label="Start Datum & Uhrzeit", value=datetime.now() + timedelta(minutes=1), # Voreinstellung: 1 Minute in der Zukunft info="Wähle das Datum und die Uhrzeit, wann die Aufnahme beginnen soll." ) # HIER WURDE GR.DATETIME VERWENDET end_datetime_input = gr.DateTime( label="End Datum & Uhrzeit", value=datetime.now() + timedelta(minutes=10), # Voreinstellung: 10 Minuten in der Zukunft info="Wähle das Datum und die Uhrzeit, wann die Aufnahme enden soll." ) schedule_button = gr.Button("▶️ Aufnahme planen", variant="primary") schedule_output_message = gr.Textbox(label="Status der Planung", interactive=False) schedule_button.click( fn=schedule_recording, inputs=[stream_url_input, start_datetime_input, end_datetime_input], outputs=schedule_output_message ) with gr.Tab("Verfügbare Aufnahmen"): gr.Markdown("Hier siehst du alle bisher aufgenommenen Dateien.") refresh_files_button = gr.Button("🔄 Aufnahmen aktualisieren", variant="secondary") # gr.Files ermöglicht das Herunterladen der Dateien # HIER WURDE DER PARAMETER 'type' GEÄNDERT downloadable_files = gr.Files(label="Deine Aufnahmen zum Herunterladen", type="filepath") # Wenn der "Aktualisieren"-Button geklickt wird, rufe die Funktion auf, um die Dateien zu listen refresh_files_button.click( fn=get_recorded_files, outputs=downloadable_files ) # Beim Laden der App auch die Dateien einmal anzeigen demo.load( fn=get_recorded_files, outputs=downloadable_files ) # --- App starten --- if __name__ == "__main__": demo.launch()