perf: 双向流式

This commit is contained in:
DongShengWu 2026-01-01 21:14:17 +08:00
parent 7f9ae0e036
commit 48fe2f37ae
5 changed files with 195 additions and 0 deletions

View File

@ -267,6 +267,113 @@ class StreamingTTS:
pass pass
self._client = None self._client = None
def stream_from_generator(
self,
text_generator: Generator[str, None, None],
voice: str = None,
language: str = None,
speech_rate: float = None,
volume: int = None,
pitch_rate: float = None,
) -> Generator[AudioChunk, None, None]:
"""
双向流式合成 - 边发送文本边接收音频
Args:
text_generator: 文本生成器逐块产生文本
voice: 音色
language: 语言
speech_rate: 语速
volume: 音量
pitch_rate: 语调
Yields:
AudioChunk: 音频数据块
Example:
>>> def get_text():
... yield "你好,"
... yield "世界!"
>>> tts = StreamingTTS()
>>> for chunk in tts.stream_from_generator(get_text()):
... play_audio(chunk.data)
"""
_voice = voice or self.voice
_language = language or self.language
_speech_rate = self._clamp(speech_rate or self.speech_rate, 0.5, 2.0)
_volume = self._clamp(volume or self.volume, 0, 100)
_pitch_rate = self._clamp(pitch_rate or self.pitch_rate, 0.5, 2.0)
self._callback = _StreamingCallback()
self._client = QwenTtsRealtime(
model=self.model,
callback=self._callback,
url=config.WS_URL
)
# 启动发送线程
send_thread = threading.Thread(
target=self._send_text_generator,
args=(
text_generator,
_voice,
_language,
_speech_rate,
_volume,
_pitch_rate),
daemon=True)
try:
self._client.connect()
send_thread.start()
while True:
chunk = self._callback.get_chunk()
if chunk is None:
break
yield chunk
if chunk.is_final:
break
finally:
self._cleanup()
def _send_text_generator(
self,
text_generator: Generator[str, None, None],
voice: str,
language: str,
speech_rate: float,
volume: int,
pitch_rate: float
) -> None:
"""后台逐块发送文本"""
try:
if not self._callback.wait_connected():
return
self._client.update_session(
voice=voice,
response_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
mode=config.MODE,
language_type=language,
speech_rate=speech_rate,
volume=volume,
pitch_rate=pitch_rate
)
# 逐块发送文本
for text_chunk in text_generator:
if text_chunk:
self._client.append_text(text_chunk)
self._client.finish()
except Exception as e:
self._callback._audio_queue.put(
AudioChunk(data=b'', is_final=True, error=str(e))
)
def stream_to_file(self, text: str, output_file: str, **kwargs) -> bool: def stream_to_file(self, text: str, output_file: str, **kwargs) -> bool:
""" """
流式合成并保存到文件 流式合成并保存到文件

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,88 @@
"""
TTS 双向流测试 - 边发送文本边接收音频
"""
import time
from pathlib import Path
from src.Module.tts.tts import StreamingTTS
def simulate_llm_output():
"""
模拟 LLM 流式输出
一句话分成多个部分逐步发送
"""
text_chunks = [
"你好,",
"我是通义千问",
"语音合成系统。",
"现在正在进行",
"双向流测试!"
]
for chunk in text_chunks:
print(f" [发送文本] -> {chunk}")
yield chunk
time.sleep(0.1) # 模拟 LLM 生成延迟
def test_bidirectional_stream():
"""测试双向流式合成"""
print("=" * 60)
print(" TTS 双向流测试")
print("=" * 60)
print("\n一句话分为多个部分发送,同时接收音频\n")
tts = StreamingTTS(voice='Cherry')
total_bytes = 0
chunk_count = 0
audio_data = bytearray()
print("开始双向流传输...\n")
# 使用 stream_from_generator 实现双向流
for audio_chunk in tts.stream_from_generator(simulate_llm_output()):
if audio_chunk.error:
print(f" [错误] {audio_chunk.error}")
break
if audio_chunk.data:
chunk_count += 1
total_bytes += len(audio_chunk.data)
audio_data.extend(audio_chunk.data)
print(
f" [收到音频] Chunk {
chunk_count:02d}: {
len(
audio_chunk.data):5d} 字节 | 累计: {
total_bytes:6d} 字节")
if audio_chunk.is_final:
print("\n传输完成!")
break
# 保存音频
output_dir = Path(__file__).parent / 'output'
output_dir.mkdir(exist_ok=True)
wav_file = output_dir / 'bidirectional_test.wav'
import wave
with wave.open(str(wav_file), 'wb') as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(24000)
wav.writeframes(audio_data)
print(f"\n结果统计:")
print(f" 音频块: {chunk_count}")
print(f" 总字节: {total_bytes} 字节")
print(f" 音频时长: {total_bytes / (24000 * 2):.2f}")
print(f" 已保存: {wav_file}")
return chunk_count > 0
if __name__ == '__main__':
success = test_bidirectional_stream()
print(f"\n测试结果: {'✓ 通过' if success else '✗ 失败'}")