Spaces:
Sleeping
Sleeping
import os | |
from pathlib import Path | |
import gradio as gr | |
import numpy as np | |
from pydub import AudioSegment | |
from components.header import Header | |
from utils.logger import Logger | |
log = Logger() | |
# اگر فایلهای صوتی در پوشهٔ خاصی هستند این را عوض کنید | |
AUDIO_DIR = Path("audio") # <project_root>/audio/<filename>.wav | |
class DashboardPage: | |
"""صفحهٔ داشبورد شامل اطلاعات متنی (چپ) و پخشکنندهٔ صوت (راست).""" | |
# ───────── ساخت UI ───────── # | |
def __init__(self) -> None: | |
with gr.Column(visible=False) as self.container: | |
# هدر | |
self.header = Header() | |
# بدنهٔ دو ستونه | |
with gr.Row(): | |
# -------- ستونهٔ چپ : متادیتا -------- # | |
with gr.Column(scale=3) as self.left_col: | |
with gr.Row(): | |
self.tts_id = gr.Textbox(label="ID", interactive=False) | |
self.filename = gr.Textbox(label="Filename", interactive=False) | |
self.sentence = gr.Textbox( | |
label="Sentence", interactive=False, max_lines=5, rtl=True | |
) | |
self.ann_sentence = gr.Textbox( | |
label="Annotated Sentence", | |
interactive=True, | |
max_lines=5, | |
rtl=True, | |
) | |
with gr.Row(): | |
self.ann_at = gr.Textbox( | |
label="Annotation Time", | |
interactive=False, | |
) | |
self.validated = gr.Checkbox( | |
label="Annotation is Validate", | |
interactive=False, | |
) | |
# دکمههای پیمایش زیر اطلاعات متنی | |
with gr.Row(): | |
self.btn_prev = gr.Button("⬅️ Previous") | |
self.btn_next = gr.Button("Next ➡️") | |
# -------- ستونهٔ راست : پخشکننده -------- # | |
with gr.Column(scale=2) as self.right_col: | |
self.audio = gr.Audio(label="🔊 Audio", interactive=False) | |
# stateهای مخفی | |
self.items_state = gr.State([]) # list[dict] | |
self.idx_state = gr.State(0) # اندیس فعلی | |
# ───────── wiring ───────── # | |
def register_callbacks( | |
self, | |
login_page, | |
session_state: gr.State, # dict درون gr.State | |
root_blocks: gr.Blocks, | |
) -> None: | |
# رویداد خروج | |
self.header.register_callbacks(login_page, self, session_state) | |
# ---------- helpers ---------- # | |
def _audio_path(filename: str) -> str: | |
"""مسیر کامل فایل صوتی روی دیسک.""" | |
return str(AUDIO_DIR / filename) | |
def _duration_seconds(wav_path: str) -> float: | |
"""طول فایل صوتی به ثانیه (برای اسلایدرها).""" | |
try: | |
dur = len(AudioSegment.from_file(wav_path)) / 1000.0 | |
return round(dur, 2) | |
except Exception as e: | |
log.warning(f"Cannot read duration for '{wav_path}': {e}") | |
return 0.0 | |
def show_current(items: list, idx: int): | |
"""دادههای رکورد idx را برای خروجیها تولید میکند.""" | |
if not items: | |
# 6 فیلد متنی + 3 فیلد صوت + validated | |
return [ | |
"", | |
"", | |
"", | |
"", | |
"", | |
False, | |
None, | |
gr.update(minimum=0, maximum=0, value=0), | |
gr.update(minimum=0, maximum=0, value=0), | |
] | |
data = items[idx] | |
wav_path = _audio_path(data["filename"]) | |
dur = _duration_seconds(wav_path) | |
return [ | |
data["id"], | |
data["filename"], | |
data["sentence"], | |
data.get("annotated_sentence", ""), | |
data.get("annotated_at", ""), | |
bool(data.get("validated", False)), | |
wav_path, # audio | |
gr.update(minimum=0, maximum=dur, value=0), # start slider | |
gr.update(minimum=0, maximum=dur, value=dur), # end slider | |
] | |
def next_idx(items: list, idx: int): | |
return min(idx + 1, max(len(items) - 1, 0)) | |
def prev_idx(items: list, idx: int): | |
return max(idx - 1, 0) | |
# ---------- initial load ---------- # | |
def load_items(sess: dict): | |
items = sess.get("dashboard_items", []) | |
return ( | |
items, | |
0, | |
*show_current(items, 0), | |
) | |
root_blocks.load( | |
fn=load_items, | |
inputs=[session_state], | |
outputs=[ | |
self.items_state, | |
self.idx_state, | |
self.tts_id, | |
self.filename, | |
self.sentence, | |
self.ann_sentence, | |
self.ann_at, | |
self.validated, | |
self.audio, | |
self.start_slider, | |
self.end_slider, | |
], | |
) | |
# ---------- prev / next buttons ---------- # | |
for btn, fn_nav in [(self.btn_prev, prev_idx), (self.btn_next, next_idx)]: | |
( | |
btn.click( | |
fn=fn_nav, | |
inputs=[self.items_state, self.idx_state], | |
outputs=self.idx_state, | |
).then( | |
fn=show_current, | |
inputs=[self.items_state, self.idx_state], | |
outputs=[ | |
self.tts_id, | |
self.filename, | |
self.sentence, | |
self.ann_sentence, | |
self.ann_at, | |
self.validated, | |
self.audio, | |
self.start_slider, | |
self.end_slider, | |
], | |
) | |
) | |
# ---------- Play-Selection button ---------- # | |
def play_selection(wav_path: str, start: float, end: float): | |
""" | |
بخش انتخابشده از فایل را جدا میکند و بهصورت | |
(sr, np.array) برمیگرداند تا در Player پخش شود. | |
""" | |
if not wav_path or not os.path.exists(wav_path): | |
return None | |
try: | |
seg = AudioSegment.from_file(wav_path) | |
start_ms = int(max(start, 0) * 1000) | |
end_ms = int(min(end, len(seg) / 1000) * 1000) | |
if start_ms >= end_ms: | |
end_ms = start_ms + 1000 # حداقل ۱ ثانیه | |
clip = seg[start_ms:end_ms] | |
samples = np.array(clip.get_array_of_samples()).astype(np.float32) | |
samples /= np.iinfo(samples.dtype).max # نرمالسازی | |
return (clip.frame_rate, samples) | |
except Exception as e: | |
log.error(f"Cannot slice audio '{wav_path}': {e}") | |
return None | |
self.play_btn.click( | |
fn=play_selection, | |
inputs=[self.audio, self.start_slider, self.end_slider], | |
outputs=self.audio, | |
) | |