Spaces:
Running
Running
import os | |
import torchaudio | |
import time | |
import json | |
import shutil | |
from pyannote.audio import Pipeline | |
import whisper | |
from kalbos_nustatymas import ( | |
transcribe_text, | |
transcribe_text_wav2vec, | |
recognize_language | |
) | |
def analizuoti_kalbetojus(pasirinktas_modelis="Wav2Vec2", failas="/tmp/ivestis.wav"): | |
start_time = time.time() | |
tekstas = [] | |
lietuviskas_tekstas = "" | |
visi_segmentai = [] | |
tekstas.append("🔁 Įkeliama diarizacijos sistema...") | |
diar_pipeline = Pipeline.from_pretrained( | |
"pyannote/speaker-diarization", | |
use_auth_token=os.getenv("HF_TOKEN") | |
) | |
tekstas.append("✅ Diarizacijos modelis paruoštas.") | |
tekstas.append(f"🔁 Įkeliamas modelis: {pasirinktas_modelis}...") | |
whisper_modelis = None | |
if pasirinktas_modelis == "Whisper": | |
whisper_modelis = whisper.load_model("medium") | |
tekstas.append(f"✅ Modelis {pasirinktas_modelis} paruoštas.") | |
tekstas.append("🧠 Atliekama diarizacija...") | |
diar_result = diar_pipeline(failas) | |
speaker_segments = list(diar_result.itertracks(yield_label=True)) | |
skaiciuokle = {} | |
transkripcijos = {} | |
kalbos_visos = {} | |
waveform, sample_rate = torchaudio.load(failas) | |
TEMP_FOLDER = "/tmp/temp_segmentai" | |
if os.path.exists(TEMP_FOLDER): | |
shutil.rmtree(TEMP_FOLDER) | |
os.makedirs(TEMP_FOLDER, exist_ok=True) | |
for i, (segment, _, speaker) in enumerate(speaker_segments): | |
if (segment.end - segment.start) < 1.0: | |
continue | |
output_path = os.path.join(TEMP_FOLDER, f"segment_{i}_{speaker}.wav") | |
start_sample = int(segment.start * sample_rate) | |
end_sample = int(segment.end * sample_rate) | |
torchaudio.save(output_path, waveform[:, start_sample:end_sample], sample_rate) | |
if pasirinktas_modelis == "Whisper": | |
try: | |
result = whisper_modelis.transcribe(output_path) | |
text = result["text"].strip() | |
lang = result["language"].strip().lower() | |
if lang == "ru": | |
retry = whisper_modelis.transcribe(output_path, language="lt") | |
text = retry["text"].strip() | |
lang = "lt (forced)" | |
except Exception: | |
text = "[KLAIDA Whisper transkripcijoje]" | |
lang = "unknown" | |
elif pasirinktas_modelis == "Wav2Vec2": | |
try: | |
best_text = "" | |
best_lang = "unknown" | |
longest = 0 | |
for kalba in ["lt", "en", "de"]: | |
try: | |
txt = transcribe_text_wav2vec(output_path, kalba=kalba).strip() | |
if len(txt) > longest: | |
best_text = txt | |
best_lang = kalba | |
longest = len(txt) | |
except: | |
continue | |
text = best_text | |
lang = best_lang | |
except Exception: | |
text = "[KLAIDA Wav2Vec2 transkripcijoje]" | |
lang = "unknown" | |
else: | |
text = "[Nežinomas modelis]" | |
lang = "unknown" | |
trukme = round(segment.end - segment.start, 2) | |
if speaker not in skaiciuokle: | |
skaiciuokle[speaker] = {} | |
lang_clean = lang.replace(" (forced)", "") | |
skaiciuokle[speaker][lang_clean] = skaiciuokle[speaker].get(lang_clean, 0) + trukme | |
if speaker not in transkripcijos: | |
transkripcijos[speaker] = [] | |
transkripcijos[speaker].append({"tekstas": text, "kalba": lang, "trukme": trukme}) | |
if speaker not in kalbos_visos: | |
kalbos_visos[speaker] = set() | |
kalbos_visos[speaker].add(lang_clean) | |
max_lt_seconds = 0 | |
kalbetojas_lt = None | |
for speaker, kalbos in skaiciuokle.items(): | |
lt_trukme = kalbos.get("lt", 0) | |
if lt_trukme > max_lt_seconds: | |
max_lt_seconds = lt_trukme | |
kalbetojas_lt = speaker | |
sorted_speakers = sorted(skaiciuokle.keys()) | |
for idx, speaker in enumerate(sorted_speakers): | |
etikete = chr(65 + idx) # A, B, C, ... | |
for sak in transkripcijos[speaker]: | |
if speaker == kalbetojas_lt and "lt" in sak["kalba"]: | |
lietuviskas_tekstas += sak["tekstas"] + " " | |
visi_segmentai.append({ | |
"kalbetojas": etikete, | |
"modelis": pasirinktas_modelis, | |
"kalba": sak["kalba"], | |
"tekstas": sak["tekstas"], | |
"trukme": sak["trukme"] | |
}) | |
elapsed_time = round(time.time() - start_time, 2) | |
minutes = int(elapsed_time) // 60 | |
seconds = int(elapsed_time) % 60 | |
formatted_time = f"{minutes} min. {seconds} sek." | |
os.makedirs("rezultatai", exist_ok=True) | |
failas = os.path.join("rezultatai", f"{pasirinktas_modelis.lower()}.json") | |
try: | |
with open(failas, "w", encoding="utf-8") as f: | |
json.dump({ | |
"modelis": pasirinktas_modelis, | |
"apdorojimo_laikas": elapsed_time, | |
"apdorojimo_laikas_tekstu": formatted_time, | |
"segmentai": visi_segmentai | |
}, f, ensure_ascii=False, indent=2) | |
tekstas.append(f"✅ JSON failas įrašytas: {failas}") | |
except Exception as e: | |
tekstas.append(f"❌ Nepavyko įrašyti JSON: {str(e)}") | |
tekstas.append(f"⏱️ Programos vykdymo trukmė: {formatted_time}") | |
return "\n".join(tekstas), lietuviskas_tekstas.strip(), visi_segmentai | |