projektas1-demo-hf1 / kalbetojai_analize.py
Elanas's picture
Upload kalbetojai_analize.py
9db06b6 verified
raw
history blame
5.56 kB
import os
import torchaudio
import time
import json
import shutil
from pyannote.audio import Pipeline
import whisper
from kalbos_nustatymas import (
transcribe_text,
transcribe_text_wav2vec,
recognize_language
)
def analizuoti_kalbetojus(pasirinktas_modelis="Wav2Vec2", failas="/tmp/ivestis.wav"):
start_time = time.time()
tekstas = []
lietuviskas_tekstas = ""
visi_segmentai = []
tekstas.append("🔁 Įkeliama diarizacijos sistema...")
diar_pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization",
use_auth_token=os.getenv("HF_TOKEN")
)
tekstas.append("✅ Diarizacijos modelis paruoštas.")
tekstas.append(f"🔁 Įkeliamas modelis: {pasirinktas_modelis}...")
whisper_modelis = None
if pasirinktas_modelis == "Whisper":
whisper_modelis = whisper.load_model("medium")
tekstas.append(f"✅ Modelis {pasirinktas_modelis} paruoštas.")
tekstas.append("🧠 Atliekama diarizacija...")
diar_result = diar_pipeline(failas)
speaker_segments = list(diar_result.itertracks(yield_label=True))
skaiciuokle = {}
transkripcijos = {}
kalbos_visos = {}
waveform, sample_rate = torchaudio.load(failas)
TEMP_FOLDER = "/tmp/temp_segmentai"
if os.path.exists(TEMP_FOLDER):
shutil.rmtree(TEMP_FOLDER)
os.makedirs(TEMP_FOLDER, exist_ok=True)
for i, (segment, _, speaker) in enumerate(speaker_segments):
if (segment.end - segment.start) < 1.0:
continue
output_path = os.path.join(TEMP_FOLDER, f"segment_{i}_{speaker}.wav")
start_sample = int(segment.start * sample_rate)
end_sample = int(segment.end * sample_rate)
torchaudio.save(output_path, waveform[:, start_sample:end_sample], sample_rate)
if pasirinktas_modelis == "Whisper":
try:
result = whisper_modelis.transcribe(output_path)
text = result["text"].strip()
lang = result["language"].strip().lower()
if lang == "ru":
retry = whisper_modelis.transcribe(output_path, language="lt")
text = retry["text"].strip()
lang = "lt (forced)"
except Exception:
text = "[KLAIDA Whisper transkripcijoje]"
lang = "unknown"
elif pasirinktas_modelis == "Wav2Vec2":
try:
best_text = ""
best_lang = "unknown"
longest = 0
for kalba in ["lt", "en", "de"]:
try:
txt = transcribe_text_wav2vec(output_path, kalba=kalba).strip()
if len(txt) > longest:
best_text = txt
best_lang = kalba
longest = len(txt)
except:
continue
text = best_text
lang = best_lang
except Exception:
text = "[KLAIDA Wav2Vec2 transkripcijoje]"
lang = "unknown"
else:
text = "[Nežinomas modelis]"
lang = "unknown"
trukme = round(segment.end - segment.start, 2)
if speaker not in skaiciuokle:
skaiciuokle[speaker] = {}
lang_clean = lang.replace(" (forced)", "")
skaiciuokle[speaker][lang_clean] = skaiciuokle[speaker].get(lang_clean, 0) + trukme
if speaker not in transkripcijos:
transkripcijos[speaker] = []
transkripcijos[speaker].append({"tekstas": text, "kalba": lang, "trukme": trukme})
if speaker not in kalbos_visos:
kalbos_visos[speaker] = set()
kalbos_visos[speaker].add(lang_clean)
max_lt_seconds = 0
kalbetojas_lt = None
for speaker, kalbos in skaiciuokle.items():
lt_trukme = kalbos.get("lt", 0)
if lt_trukme > max_lt_seconds:
max_lt_seconds = lt_trukme
kalbetojas_lt = speaker
sorted_speakers = sorted(skaiciuokle.keys())
for idx, speaker in enumerate(sorted_speakers):
etikete = chr(65 + idx) # A, B, C, ...
for sak in transkripcijos[speaker]:
if speaker == kalbetojas_lt and "lt" in sak["kalba"]:
lietuviskas_tekstas += sak["tekstas"] + " "
visi_segmentai.append({
"kalbetojas": etikete,
"modelis": pasirinktas_modelis,
"kalba": sak["kalba"],
"tekstas": sak["tekstas"],
"trukme": sak["trukme"]
})
elapsed_time = round(time.time() - start_time, 2)
minutes = int(elapsed_time) // 60
seconds = int(elapsed_time) % 60
formatted_time = f"{minutes} min. {seconds} sek."
os.makedirs("rezultatai", exist_ok=True)
failas = os.path.join("rezultatai", f"{pasirinktas_modelis.lower()}.json")
try:
with open(failas, "w", encoding="utf-8") as f:
json.dump({
"modelis": pasirinktas_modelis,
"apdorojimo_laikas": elapsed_time,
"apdorojimo_laikas_tekstu": formatted_time,
"segmentai": visi_segmentai
}, f, ensure_ascii=False, indent=2)
tekstas.append(f"✅ JSON failas įrašytas: {failas}")
except Exception as e:
tekstas.append(f"❌ Nepavyko įrašyti JSON: {str(e)}")
tekstas.append(f"⏱️ Programos vykdymo trukmė: {formatted_time}")
return "\n".join(tekstas), lietuviskas_tekstas.strip(), visi_segmentai