Firefly-Neko / firefly-neko-stt-live2d-multi.py
Shiina-Mahiru's picture
Upload firefly-neko-stt-live2d-multi.py with huggingface_hub
6119bec verified
raw
history blame
14.5 kB
import os
import time
import re
import wave
import pyaudio
import numpy as np
import concurrent.futures
import soundfile as sf
import sys
import nltk
from tools.i18n.i18n import I18nAuto
from funasr import AutoModel
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
import pygame
from pygame.locals import *
import live2d.v3 as live2d
from live2d.v3 import StandardParams
from live2d.utils import log
from live2d.utils.lipsync import WavHandler
sys.path.append('./GPT-SoVITS-v2-240821/GPT_SoVITS')
from inference_webui import change_gpt_weights, change_sovits_weights, get_tts_wav
i18n = I18nAuto()
nltk.download('averaged_perceptron_tagger')
nltk.download('averaged_perceptron_tagger_eng')
live2d.setLogEnable(False)
class QwenFireflyNeko:
def __init__(self):
print("初始化中...")
pygame.init()
pygame.mixer.init()
live2d.init()
#self.audioPlayed = True
self.bat_file_path = 'GPT-SoVITS-v2-240821\\go-cli.bat'
self.model_name = "model/Qwen2.5-7B-Instruct"
with open('background.txt', 'r', encoding='utf-8') as file:
self.background = file.read()
self.end_of_talk = False
self.cache = {}
self.result_text = ""
self.sound_threshold = 500
self.wait_time = 1
self.no_sound_start_time = time.time()
self.running = True
# 使用 4 位量化配置
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype="float16",
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True
)
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
quantization_config=quantization_config,
torch_dtype="auto",
device_map="auto"
)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
model_dir = "model"
self.stt_model = AutoModel(
model=f"{model_dir}/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch",
vad_model=f"{model_dir}/speech_fsmn_vad_zh-cn-16k-common-pytorch",
punc_model=f"{model_dir}/punc_ct-transformer_cn-en-common-vocab471067-large",
disable_update=True,
ngpu=0 # 使用 CPU
)
#live2d
def live2d_init(self):
display = (800, 600)
pygame.display.set_mode(display, DOUBLEBUF | OPENGL, vsync=1)
pygame.display.set_caption("pygame window")
if live2d.LIVE2D_VERSION == 3:
live2d.glewInit()
self.live2d_model = live2d.LAppModel()
if live2d.LIVE2D_VERSION == 3:
self.live2d_model.LoadModelJson(
"Firefly-desktop/Firefly.model3.json"
)
self.live2d_model.Resize(*display)
self.running = True
self.live2d_model.SetAutoBlinkEnable(False)
self.live2d_model.SetAutoBreathEnable(False)
self.dx: float = 0.0
self.dy: float = 0.0
self.scale: float = 1.0
self.wavHandler = WavHandler()
self.lipSyncN = 2.5
fc = None
sc = None
self.live2d_model.StartRandomMotion("TapBody", 300, sc, fc)
for i in range(self.live2d_model.GetParameterCount()):
param = self.live2d_model.GetParameter(i)
log.Debug(
param.id, param.type, param.value, param.max, param.min, param.default
)
# 设置 part 透明度
# log.Debug(f"Part Count: {model.GetPartCount()}")
self.partIds = self.live2d_model.GetPartIds()
self.currentTopClickedPartId = None
def getHitFeedback(self, x, y):
t = time.time()
hitPartIds = self.live2d_model.HitPart(x, y, False)
#print(f"hit part cost: {time.time() - t}s")
#print(f"hit parts: {hitPartIds}")
if self.currentTopClickedPartId is not None:
pidx = self.partIds.index(self.currentTopClickedPartId)
self.live2d_model.SetPartOpacity(pidx, 1)
# model.SetPartScreenColor(pidx, 0.0, 0.0, 0.0, 1.0)
self.live2d_model.SetPartMultiplyColor(pidx, 1.0, 1.0, 1., 1)
# print("Part Screen Color:", model.GetPartScreenColor(pidx))
#print("Part Multiply Color:", self.live2d_model.GetPartMultiplyColor(pidx))
if len(hitPartIds) > 0:
ret = hitPartIds[0]
return ret
def live2d_main(self):
self.live2d_model.SetExpression("expression2.exp3")
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.running = False
return
if event.type == pygame.MOUSEBUTTONDOWN:
x, y = pygame.mouse.get_pos()
# currentTopClickedPartId = getHitFeedback(x, y)
# log.Info(f"Clicked Part: {currentTopClickedPartId}")
# model.Touch(x, y, onFinishMotionHandler=lambda : print("motion finished"), onStartMotionHandler=lambda group, no: print(f"started motion: {group} {no}"))
# model.StartRandomMotion(group="TapBody", onFinishMotionHandler=lambda : print("motion finished"), onStartMotionHandler=lambda group, no: print(f"started motion: {group} {no}"))
#model.SetRandomExpression()
self.live2d_model.StartRandomMotion(priority=3)
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
self.dx -= 0.1
elif event.key == pygame.K_RIGHT:
self.dx += 0.1
elif event.key == pygame.K_UP:
self.dy += 0.1
elif event.key == pygame.K_DOWN:
self.dy -= 0.1
elif event.key == pygame.K_i:
self.scale += 0.01
elif event.key == pygame.K_u:
self.scale -= 0.01
elif event.key == pygame.K_r:
self.live2d_model.StopAllMotions()
self.live2d_model.ResetPose()
elif event.key == pygame.K_e:
self.live2d_model.ResetExpression()
if event.type == pygame.MOUSEMOTION:
self.live2d_model.Drag(*pygame.mouse.get_pos())
self.currentTopClickedPartId = self.getHitFeedback(*pygame.mouse.get_pos())
self.live2d_model.Update()
if self.currentTopClickedPartId is not None:
pidx = self.partIds.index(self.currentTopClickedPartId)
self.live2d_model.SetPartOpacity(pidx, 0.5)
# 在此以 255 为最大灰度级
# 原色和屏幕色取反并相乘,再取反
# 以红色通道为例:r = 255 - (255 - 原色.r) * (255 - screenColor.r) / 255
# 通道数值越大,该通道颜色对最终结果的贡献越大,下面的调用即为突出蓝色的效果
# model.SetPartScreenColor(pidx, .0, 0., 1.0, 1)
# r = multiplyColor.r * 原色.r / 255
# 下面即为仅保留蓝色通道的结果
self.live2d_model.SetPartMultiplyColor(pidx, .0, .0, 1., .9)
if self.wavHandler.Update():
# 利用 wav 响度更新 嘴部张合
self.live2d_model.AddParameterValue(
StandardParams.ParamMouthOpenY, self.wavHandler.GetRms() * self.lipSyncN
)
# 一般通过设置 param 去除水印
# model.SetParameterValue("Param14", 1, 1)
self.live2d_model.SetOffset(self.dx, self.dy)
self.live2d_model.SetScale(self.scale)
live2d.clearBuffer(1.0, 1.0, 1.0, 1)
self.live2d_model.Draw()
pygame.display.flip()
pygame.time.wait(10)
#tts
def synthesize(self, GPT_model_path, SoVITS_model_path, ref_audio_path, ref_text_path, ref_language, target_text_path, target_language, output_path):
# Read reference text
with open(ref_text_path, 'r', encoding='utf-8') as file:
ref_text = file.read()
# Read target text
with open(target_text_path, 'r', encoding='utf-8') as file:
target_text = file.read()
# Change model weights
change_gpt_weights(gpt_path=GPT_model_path)
change_sovits_weights(sovits_path=SoVITS_model_path)
# Synthesize audio
synthesis_result = get_tts_wav(ref_wav_path=ref_audio_path,
prompt_text=ref_text,
prompt_language=i18n(ref_language),
text=target_text,
text_language=i18n(target_language), top_p=1, temperature=1)
result_list = list(synthesis_result)
if result_list:
last_sampling_rate, last_audio_data = result_list[-1]
output_wav_path = os.path.join(output_path, "output.wav")
sf.write(output_wav_path, last_audio_data, last_sampling_rate)
print(f"Audio saved to {output_wav_path}")
def extract_language(self, text):
text = re.sub(r'([^)]*)', '', text)
text = re.sub(r'【[^】]*】', '', text)
return text
def play_wav(self, file_path):
chunk_size = 1024
with wave.open(file_path, 'rb') as wf:
p = pyaudio.PyAudio()
self.wavHandler.Start(file_path)
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
data = wf.readframes(chunk_size)
while data:
stream.write(data)
data = wf.readframes(chunk_size)
stream.stop_stream()
stream.close()
p.terminate()
def stt(self):
p = pyaudio.PyAudio()
chunk_size = 16000 * 3 # 3 秒
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=chunk_size)
try:
while self.running:
audio_data = stream.read(chunk_size)
speech_chunk = np.frombuffer(audio_data, dtype=np.int16)
if np.max(speech_chunk) > self.sound_threshold:
# 保存音频块为临时文件
self.end_of_talk = False
temp_wav_path = "temp_chunk.wav"
with wave.open(temp_wav_path, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(p.get_sample_size(pyaudio.paInt16))
wf.setframerate(16000)
wf.writeframes(speech_chunk.tobytes())
res = self.stt_model.generate(input=temp_wav_path, cache=self.cache, is_final=False, chunk_size=chunk_size)
os.remove(temp_wav_path)
if res and len(res[0]["text"]) > 0:
self.result_text += res[0]["text"]
print("STT 未修改:", self.result_text)
self.no_sound_start_time = time.time()
else:
if not self.end_of_talk and len(self.result_text) > 0 and time.time() - self.no_sound_start_time > self.wait_time:
print("已停顿")
self.end_of_talk = True
self.no_sound_start_time = time.time()
return self.result_text
finally:
stream.stop_stream()
stream.close()
p.terminate()
#llm
def process_llm(self, prompt):
start_time = time.time()
messages = [
{"role": "system", "content": self.background},
{"role": "user", "content": prompt}
]
text = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)
generated_ids = self.model.generate(
**model_inputs,
max_new_tokens=512
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
response = response.replace("流萤猫酱:", "")
print("合成完成,耗时:", time.time() - start_time)
print("已生成文本,正在合成语音...")
target_text = self.extract_language(response)
with open('target_text.txt', 'w', encoding='utf-8') as file:
file.write(target_text)
self.synthesize("GPT_weights_v2/流萤-e10.ckpt",
"SoVITS_weights_v2/流萤_e15_s810.pth",
"firefly/ref_audio/example.wav",
"ref_text.txt", "中文",
"target_text.txt", "中文",
"output"
)
print("LLM 流萤猫酱:", response)
self.play_wav("output/output.wav")
def main(self):
self.live2d_init()
print("初始化完成!")
with concurrent.futures.ThreadPoolExecutor() as executor: #ThreadPoolExecutor
future_stt = executor.submit(self.stt)
while self.running:
if future_stt.done():
prompt = future_stt.result()
self.result_text = ""
executor.submit(self.process_llm, prompt)
future_stt = executor.submit(self.stt)
self.live2d_main()
live2d.dispose()
pygame.quit()
quit()
if __name__ == "__main__":
app = QwenFireflyNeko()
app.main()