- 2.1异常状态触发对话:皮肤状态异常/情绪低落时触发关怀对话 - 2.3双向音频流对话:K230和后端实时音频双向传输 - 核心模块:WebSocket服务器、2个消息处理器、提示词管理 - 异步架构:asyncio + 线程池,流式LLM→TTS - 完整的测试套件和API文档 实现细节: - 使用websockets库(15.0版本) - asyncio.to_thread桥接同步模块 - 流式处理,低延迟 - 自动session管理和资源清理 - 完整的错误处理和日志 新增文件: - src/MainServices.py: WebSocket服务器主入口(171行) - src/handlers/abnormal_trigger.py: 2.1处理器(120行) - src/handlers/audio_stream.py: 2.3处理器(250行) - src/utils/prompts.py: 提示词管理(35行) - test_ws.py: 完整的测试脚本(190行) - WEBSOCKET_API.md: 完整的API文档 - IMPLEMENTATION_SUMMARY.md: 实现总结
9.0 KiB
9.0 KiB
WebSocket服务器API文档
概述
心镜Agent WebSocket服务器实现了2.1(异常状态触发对话)和2.3(双向音频流对话)两个核心接口,用于与K230设备进行实时双向通信。
启动服务器
cd /Users/dsw/workspace/now/2025/wds/IntuitionX/agent
python src/MainServices.py
默认监听地址:ws://0.0.0.0:8765
接口详解
2.1 异常状态触发对话
用途: K230检测到皮肤状态差或悲伤情绪时,触发Agent主动关怀对话
流程:
- K230发送异常状态请求(带trigger_reason)
- 后端返回确认响应
- 拼接相应的提示词(针对poor_skin或sad_emotion)
- 流式调用LLM生成文本回复
- 流式调用TTS合成语音
- 发送音频块到K230(base64编码)
K230 → 后端
{
"type": "abnormal_trigger",
"trigger_reason": "poor_skin | sad_emotion",
"enable_streaming": true,
"context_data": {
"emotion": "sad",
"skin_status": {
"acne": true,
"dark_circles": true
},
"timestamp": "2024-01-01 12:30:45"
}
}
字段说明:
type: 固定值 "abnormal_trigger"trigger_reason:"poor_skin": 皮肤状态差(痘痘、黑眼圈等)"sad_emotion": 悲伤情绪
enable_streaming: 是否启用流式响应(推荐true)context_data: 可选,提供给LLM的上下文信息
后端 → K230(响应)
初始确认:
{
"type": "abnormal_trigger_response",
"success": true
}
音频流(流式发送):
{
"type": "audio_stream_download",
"session_id": "abnormal_trigger",
"data": "base64-encoded-audio",
"is_final": false
}
字段说明:
data: base64编码的PCM音频数据(24kHz采样率,16bit)is_final: 是否为最后一个音频块
提示词策略
poor_skin(皮肤状态差):
检测到用户皮肤状态不佳(可能有痘痘、黑眼圈等)。 请用温柔、关心的语气,简短地(1-2句话)询问用户最近是否休息不好,或提供简单的护肤建议。 不要说教,语气要像朋友般温暖。
sad_emotion(悲伤情绪):
检测到用户情绪低落或悲伤。 请用温暖、共情的语气,简短地(1-2句话)表达你察觉到了用户的情绪,询问是否遇到了困扰。 不要追问细节,语气要温柔、理解、不带评判。
2.3 双向音频流对话
用途: K230和Agent后端通过同一WebSocket连接实现实时音频双向传输
流程:
- K230发送初始化请求,进行握手
- K230持续发送音频流
- 后端使用VAD检测用户停止说话
- 调用ASR识别音频
- 调用LLM生成回复
- 流式调用TTS合成音频
- 发送音频到K230
- 循环处理
握手阶段
K230 → 后端(初始化):
{
"type": "audio_stream_init",
"session_id": "unique-session-id",
"audio_config": {
"sample_rate": 16000,
"bit_depth": 16,
"channels": 1,
"encoding": "pcm"
},
"timestamp": "2024-01-01 12:30:45"
}
后端 → K230(响应):
{
"type": "audio_stream_init_response",
"success": true,
"message": "音频流连接已建立",
"timestamp": "2024-01-01 12:30:45"
}
音频上传阶段
K230 → 后端(上行音频流):
{
"type": "audio_stream_upload",
"session_id": "unique-session-id",
"data": "base64-encoded-audio",
"timestamp": "2024-01-01 12:30:45",
"sequence": 1
}
字段说明:
data: base64编码的PCM音频数据sequence: 音频块序列号(用于排序)
音频回复阶段
后端 → K230(下行音频流):
{
"type": "audio_stream_download",
"session_id": "unique-session-id",
"data": "base64-encoded-audio",
"timestamp": "2024-01-01 12:30:46",
"is_final": false
}
控制消息
K230 → 后端(控制):
{
"type": "audio_stream_control",
"session_id": "unique-session-id",
"action": "pause | resume | end",
"reason": "optional reason",
"timestamp": "2024-01-01 12:30:47"
}
action 说明:
"pause": 暂停音频处理"resume": 恢复音频处理"end": 结束会话,清理资源
实现细节
核心模块集成
| 模块 | 功能 | 集成方式 |
|---|---|---|
| LLM | 流式文本生成 | StreamingLLM.chat() 返回生成器 |
| TTS | 流式语音合成 | StreamingTTS.stream_from_generator() 支持双向流 |
| ASR | 语音识别 | 需要临时WAV文件 |
| VAD | 语音活动检测 | 实时检测语音开始/结束 |
异步架构
- WebSocket层: 完全异步(asyncio)
- 核心模块: 同步阻塞(asyncio.to_thread桥接)
- 优点: WebSocket能够及时处理连接事件,模块处理在线程池中执行
音频处理
采样率转换:
- K230发送: 16kHz PCM
- LLM处理: 文本
- TTS输出: 24kHz PCM(自动转换,由TTS模块处理)
- K230接收: 24kHz PCM
临时文件:
- ASR需要文件路径
- 使用
tempfile.NamedTemporaryFile创建临时WAV - 自动清理(with语句管理)
文件结构
src/
├── MainServices.py # WebSocket服务器主入口
├── handlers/
│ ├── __init__.py
│ ├── abnormal_trigger.py # 2.1处理器
│ └── audio_stream.py # 2.3处理器
└── utils/
└── prompts.py # 提示词管理
关键类和函数
MainServices.py
class WebSocketServer:
"""WebSocket服务器主类"""
def __init__(self, host="0.0.0.0", port=8765)
async def handler(self, websocket) # 消息路由
async def start() # 启动服务器
handlers/abnormal_trigger.py
async def handle_abnormal_trigger(websocket, data)
"""处理2.1异常状态触发对话"""
async def send_audio_stream(websocket, system_prompt)
"""执行LLM→TTS双向流并发送音频"""
handlers/audio_stream.py
class AudioStreamHandler:
"""处理单个会话的双向音频流"""
async def handle_audio_upload(data)
"""接收和处理音频上传"""
async def process_user_speech()
"""完整的ASR→LLM→TTS处理流程"""
async def generate_and_send_response(user_text)
"""生成响应并发送音频"""
utils/prompts.py
def get_trigger_prompt(trigger_reason: str) -> str
"""根据触发原因获取完整提示词"""
性能考虑
流式处理
- LLM支持流式生成,边生成边发送给TTS
- TTS支持双向流,边接收文本边合成音频
- 实现真正的低延迟对话
并发处理
- 每个会话独立的AudioStreamHandler
- WebSocket支持多个并发连接
- 核心模块在线程池执行,避免阻塞主循环
内存优化
- 流式处理避免一次性加载整个响应
- 临时文件自动清理
- 音频块逐个发送,不存储在内存中
错误处理
WebSocket层
- JSON解析错误:返回错误消息,保持连接
- 会话不存在:返回404错误
- 连接断开:自动清理session资源
模块层
- LLM错误:捕获异常,记录日志,返回错误消息
- ASR错误:记录日志,跳过处理
- TTS错误:捕获异常,返回错误消息
日志
- 所有操作都有[标签]标记日志,便于调试
- 格式:
[标签] 消息
测试
运行测试
# 启动服务器
python src/MainServices.py
# 在另一个终端运行测试
python test_ws.py
测试覆盖
- 2.1 异常状态触发对话(poor_skin)
- 2.3 双向音频流初始化和握手
- 音频流上传(虚拟数据)
- 连接管理和清理
常见问题
Q: 为什么2.3没有音频响应?
A: 测试使用的是虚拟PCM数据(全零),VAD无法检测到有效的语音,所以不会触发ASR→LLM→TTS流程。使用真实音频数据(包含语音)即可获得响应。
Q: 音频格式要求?
A:
- 输入: PCM格式,16bit采样,16kHz采样率,单声道
- 输出: PCM格式,16bit采样,24kHz采样率,单声道
Q: 如何确保音频流的顺序?
A:
- 2.3使用sequence字段标记顺序
- VAD确保完整的语音段被累积后再处理
- TTS按顺序发送音频块
Q: 能否同时运行多个2.1会话?
A: 可以,但需要顺序处理(一个接一个)。WebSocket支持并发连接,但每个连接的处理是顺序的。
Q: 临时文件会自动删除吗?
A: 是的,使用pathlib.Path.unlink()在ASR完成后立即删除。
部署建议
生产环境
- 使用反向代理(nginx)处理负载均衡
- 配置HTTPS(WSS)加密连接
- 监控日志和性能指标
- 设置合理的超时和重连机制
开发环境
- 本地启动服务器
- 使用test_ws.py进行功能测试
- 查看[标签]日志输出调试
相关文档
- README.md - 项目总体介绍
- src/Module/llm/llm.py - LLM模块文档
- src/Module/tts/tts.py - TTS模块文档
- src/Module/asr/asr.py - ASR模块文档
- src/Module/vad/vad.py - VAD模块文档