diff --git a/src/Module/tts/tts.py b/src/Module/tts/tts.py index b4bb74e..a43896e 100644 --- a/src/Module/tts/tts.py +++ b/src/Module/tts/tts.py @@ -267,6 +267,113 @@ class StreamingTTS: pass self._client = None + def stream_from_generator( + self, + text_generator: Generator[str, None, None], + voice: str = None, + language: str = None, + speech_rate: float = None, + volume: int = None, + pitch_rate: float = None, + ) -> Generator[AudioChunk, None, None]: + """ + 双向流式合成 - 边发送文本边接收音频 + + Args: + text_generator: 文本生成器,逐块产生文本 + voice: 音色 + language: 语言 + speech_rate: 语速 + volume: 音量 + pitch_rate: 语调 + + Yields: + AudioChunk: 音频数据块 + + Example: + >>> def get_text(): + ... yield "你好," + ... yield "世界!" + >>> tts = StreamingTTS() + >>> for chunk in tts.stream_from_generator(get_text()): + ... play_audio(chunk.data) + """ + _voice = voice or self.voice + _language = language or self.language + _speech_rate = self._clamp(speech_rate or self.speech_rate, 0.5, 2.0) + _volume = self._clamp(volume or self.volume, 0, 100) + _pitch_rate = self._clamp(pitch_rate or self.pitch_rate, 0.5, 2.0) + + self._callback = _StreamingCallback() + self._client = QwenTtsRealtime( + model=self.model, + callback=self._callback, + url=config.WS_URL + ) + + # 启动发送线程 + send_thread = threading.Thread( + target=self._send_text_generator, + args=( + text_generator, + _voice, + _language, + _speech_rate, + _volume, + _pitch_rate), + daemon=True) + + try: + self._client.connect() + send_thread.start() + + while True: + chunk = self._callback.get_chunk() + if chunk is None: + break + yield chunk + if chunk.is_final: + break + + finally: + self._cleanup() + + def _send_text_generator( + self, + text_generator: Generator[str, None, None], + voice: str, + language: str, + speech_rate: float, + volume: int, + pitch_rate: float + ) -> None: + """后台逐块发送文本""" + try: + if not self._callback.wait_connected(): + return + + self._client.update_session( + voice=voice, + response_format=AudioFormat.PCM_24000HZ_MONO_16BIT, + mode=config.MODE, + language_type=language, + speech_rate=speech_rate, + volume=volume, + pitch_rate=pitch_rate + ) + + # 逐块发送文本 + for text_chunk in text_generator: + if text_chunk: + self._client.append_text(text_chunk) + + self._client.finish() + + except Exception as e: + self._callback._audio_queue.put( + AudioChunk(data=b'', is_final=True, error=str(e)) + ) + def stream_to_file(self, text: str, output_file: str, **kwargs) -> bool: """ 流式合成并保存到文件 diff --git a/test/asr/mirror_hello.mp3 b/test/asr/mirror_hello.mp3 deleted file mode 100644 index 3ec77af..0000000 Binary files a/test/asr/mirror_hello.mp3 and /dev/null differ diff --git a/test/tts/output/bidirectional_test.wav b/test/tts/output/bidirectional_test.wav new file mode 100644 index 0000000..b328524 Binary files /dev/null and b/test/tts/output/bidirectional_test.wav differ diff --git a/test/tts/output/stream_test.wav b/test/tts/output/stream_test.wav index 6995057..ff169c3 100644 Binary files a/test/tts/output/stream_test.wav and b/test/tts/output/stream_test.wav differ diff --git a/test/tts/test_bidirectional.py b/test/tts/test_bidirectional.py new file mode 100644 index 0000000..fe726a9 --- /dev/null +++ b/test/tts/test_bidirectional.py @@ -0,0 +1,88 @@ +""" +TTS 双向流测试 - 边发送文本边接收音频 +""" + +import time +from pathlib import Path +from src.Module.tts.tts import StreamingTTS + + +def simulate_llm_output(): + """ + 模拟 LLM 流式输出 + 一句话分成多个部分逐步发送 + """ + text_chunks = [ + "你好,", + "我是通义千问", + "语音合成系统。", + "现在正在进行", + "双向流测试!" + ] + + for chunk in text_chunks: + print(f" [发送文本] -> {chunk}") + yield chunk + time.sleep(0.1) # 模拟 LLM 生成延迟 + + +def test_bidirectional_stream(): + """测试双向流式合成""" + print("=" * 60) + print(" TTS 双向流测试") + print("=" * 60) + print("\n一句话分为多个部分发送,同时接收音频\n") + + tts = StreamingTTS(voice='Cherry') + + total_bytes = 0 + chunk_count = 0 + audio_data = bytearray() + + print("开始双向流传输...\n") + + # 使用 stream_from_generator 实现双向流 + for audio_chunk in tts.stream_from_generator(simulate_llm_output()): + if audio_chunk.error: + print(f" [错误] {audio_chunk.error}") + break + + if audio_chunk.data: + chunk_count += 1 + total_bytes += len(audio_chunk.data) + audio_data.extend(audio_chunk.data) + print( + f" [收到音频] Chunk { + chunk_count:02d}: { + len( + audio_chunk.data):5d} 字节 | 累计: { + total_bytes:6d} 字节") + + if audio_chunk.is_final: + print("\n传输完成!") + break + + # 保存音频 + output_dir = Path(__file__).parent / 'output' + output_dir.mkdir(exist_ok=True) + wav_file = output_dir / 'bidirectional_test.wav' + + import wave + with wave.open(str(wav_file), 'wb') as wav: + wav.setnchannels(1) + wav.setsampwidth(2) + wav.setframerate(24000) + wav.writeframes(audio_data) + + print(f"\n结果统计:") + print(f" 音频块: {chunk_count} 个") + print(f" 总字节: {total_bytes} 字节") + print(f" 音频时长: {total_bytes / (24000 * 2):.2f} 秒") + print(f" 已保存: {wav_file}") + + return chunk_count > 0 + + +if __name__ == '__main__': + success = test_bidirectional_stream() + print(f"\n测试结果: {'✓ 通过' if success else '✗ 失败'}")