IntuitionX_agent/WEBSOCKET_API.md
wds 5d2e2cfa6b feat: 实现2.1和2.3 WebSocket接口
- 2.1异常状态触发对话:皮肤状态异常/情绪低落时触发关怀对话
- 2.3双向音频流对话:K230和后端实时音频双向传输
- 核心模块:WebSocket服务器、2个消息处理器、提示词管理
- 异步架构:asyncio + 线程池,流式LLM→TTS
- 完整的测试套件和API文档

实现细节:
- 使用websockets库(15.0版本)
- asyncio.to_thread桥接同步模块
- 流式处理,低延迟
- 自动session管理和资源清理
- 完整的错误处理和日志

新增文件:
- src/MainServices.py: WebSocket服务器主入口(171行)
- src/handlers/abnormal_trigger.py: 2.1处理器(120行)
- src/handlers/audio_stream.py: 2.3处理器(250行)
- src/utils/prompts.py: 提示词管理(35行)
- test_ws.py: 完整的测试脚本(190行)
- WEBSOCKET_API.md: 完整的API文档
- IMPLEMENTATION_SUMMARY.md: 实现总结
2026-01-01 22:57:55 +08:00

9.0 KiB
Raw Permalink Blame History

WebSocket服务器API文档

概述

心镜Agent WebSocket服务器实现了2.1异常状态触发对话和2.3双向音频流对话两个核心接口用于与K230设备进行实时双向通信。

启动服务器

cd /Users/dsw/workspace/now/2025/wds/IntuitionX/agent
python src/MainServices.py

默认监听地址:ws://0.0.0.0:8765

接口详解

2.1 异常状态触发对话

用途: K230检测到皮肤状态差或悲伤情绪时触发Agent主动关怀对话

流程

  1. K230发送异常状态请求带trigger_reason
  2. 后端返回确认响应
  3. 拼接相应的提示词针对poor_skin或sad_emotion
  4. 流式调用LLM生成文本回复
  5. 流式调用TTS合成语音
  6. 发送音频块到K230base64编码

K230 → 后端

{
  "type": "abnormal_trigger",
  "trigger_reason": "poor_skin | sad_emotion",
  "enable_streaming": true,
  "context_data": {
    "emotion": "sad",
    "skin_status": {
      "acne": true,
      "dark_circles": true
    },
    "timestamp": "2024-01-01 12:30:45"
  }
}

字段说明

  • type: 固定值 "abnormal_trigger"
  • trigger_reason:
    • "poor_skin": 皮肤状态差(痘痘、黑眼圈等)
    • "sad_emotion": 悲伤情绪
  • enable_streaming: 是否启用流式响应推荐true
  • context_data: 可选提供给LLM的上下文信息

后端 → K230响应

初始确认

{
  "type": "abnormal_trigger_response",
  "success": true
}

音频流(流式发送)

{
  "type": "audio_stream_download",
  "session_id": "abnormal_trigger",
  "data": "base64-encoded-audio",
  "is_final": false
}

字段说明

  • data: base64编码的PCM音频数据24kHz采样率16bit
  • is_final: 是否为最后一个音频块

提示词策略

poor_skin皮肤状态差

检测到用户皮肤状态不佳(可能有痘痘、黑眼圈等)。 请用温柔、关心的语气简短地1-2句话询问用户最近是否休息不好或提供简单的护肤建议。 不要说教,语气要像朋友般温暖。

sad_emotion悲伤情绪

检测到用户情绪低落或悲伤。 请用温暖、共情的语气简短地1-2句话表达你察觉到了用户的情绪询问是否遇到了困扰。 不要追问细节,语气要温柔、理解、不带评判。


2.3 双向音频流对话

用途: K230和Agent后端通过同一WebSocket连接实现实时音频双向传输

流程

  1. K230发送初始化请求进行握手
  2. K230持续发送音频流
  3. 后端使用VAD检测用户停止说话
  4. 调用ASR识别音频
  5. 调用LLM生成回复
  6. 流式调用TTS合成音频
  7. 发送音频到K230
  8. 循环处理

握手阶段

K230 → 后端(初始化)

{
  "type": "audio_stream_init",
  "session_id": "unique-session-id",
  "audio_config": {
    "sample_rate": 16000,
    "bit_depth": 16,
    "channels": 1,
    "encoding": "pcm"
  },
  "timestamp": "2024-01-01 12:30:45"
}

后端 → K230响应

{
  "type": "audio_stream_init_response",
  "success": true,
  "message": "音频流连接已建立",
  "timestamp": "2024-01-01 12:30:45"
}

音频上传阶段

K230 → 后端(上行音频流)

{
  "type": "audio_stream_upload",
  "session_id": "unique-session-id",
  "data": "base64-encoded-audio",
  "timestamp": "2024-01-01 12:30:45",
  "sequence": 1
}

字段说明

  • data: base64编码的PCM音频数据
  • sequence: 音频块序列号(用于排序)

音频回复阶段

后端 → K230下行音频流

{
  "type": "audio_stream_download",
  "session_id": "unique-session-id",
  "data": "base64-encoded-audio",
  "timestamp": "2024-01-01 12:30:46",
  "is_final": false
}

控制消息

K230 → 后端(控制)

{
  "type": "audio_stream_control",
  "session_id": "unique-session-id",
  "action": "pause | resume | end",
  "reason": "optional reason",
  "timestamp": "2024-01-01 12:30:47"
}

action 说明

  • "pause": 暂停音频处理
  • "resume": 恢复音频处理
  • "end": 结束会话,清理资源

实现细节

核心模块集成

模块 功能 集成方式
LLM 流式文本生成 StreamingLLM.chat() 返回生成器
TTS 流式语音合成 StreamingTTS.stream_from_generator() 支持双向流
ASR 语音识别 需要临时WAV文件
VAD 语音活动检测 实时检测语音开始/结束

异步架构

  • WebSocket层: 完全异步asyncio
  • 核心模块: 同步阻塞asyncio.to_thread桥接
  • 优点: WebSocket能够及时处理连接事件模块处理在线程池中执行

音频处理

采样率转换

  • K230发送: 16kHz PCM
  • LLM处理: 文本
  • TTS输出: 24kHz PCM自动转换由TTS模块处理
  • K230接收: 24kHz PCM

临时文件

  • ASR需要文件路径
  • 使用 tempfile.NamedTemporaryFile 创建临时WAV
  • 自动清理with语句管理

文件结构

src/
├── MainServices.py                    # WebSocket服务器主入口
├── handlers/
│   ├── __init__.py
│   ├── abnormal_trigger.py           # 2.1处理器
│   └── audio_stream.py               # 2.3处理器
└── utils/
    └── prompts.py                    # 提示词管理

关键类和函数

MainServices.py

class WebSocketServer:
    """WebSocket服务器主类"""
    def __init__(self, host="0.0.0.0", port=8765)
    async def handler(self, websocket)  # 消息路由
    async def start()  # 启动服务器

handlers/abnormal_trigger.py

async def handle_abnormal_trigger(websocket, data)
    """处理2.1异常状态触发对话"""

async def send_audio_stream(websocket, system_prompt)
    """执行LLM→TTS双向流并发送音频"""

handlers/audio_stream.py

class AudioStreamHandler:
    """处理单个会话的双向音频流"""
    async def handle_audio_upload(data)
        """接收和处理音频上传"""

    async def process_user_speech()
        """完整的ASR→LLM→TTS处理流程"""

    async def generate_and_send_response(user_text)
        """生成响应并发送音频"""

utils/prompts.py

def get_trigger_prompt(trigger_reason: str) -> str
    """根据触发原因获取完整提示词"""

性能考虑

流式处理

  • LLM支持流式生成边生成边发送给TTS
  • TTS支持双向流边接收文本边合成音频
  • 实现真正的低延迟对话

并发处理

  • 每个会话独立的AudioStreamHandler
  • WebSocket支持多个并发连接
  • 核心模块在线程池执行,避免阻塞主循环

内存优化

  • 流式处理避免一次性加载整个响应
  • 临时文件自动清理
  • 音频块逐个发送,不存储在内存中

错误处理

WebSocket层

  • JSON解析错误返回错误消息保持连接
  • 会话不存在返回404错误
  • 连接断开自动清理session资源

模块层

  • LLM错误捕获异常记录日志返回错误消息
  • ASR错误记录日志跳过处理
  • TTS错误捕获异常返回错误消息

日志

  • 所有操作都有[标签]标记日志,便于调试
  • 格式: [标签] 消息

测试

运行测试

# 启动服务器
python src/MainServices.py

# 在另一个终端运行测试
python test_ws.py

测试覆盖

  • 2.1 异常状态触发对话poor_skin
  • 2.3 双向音频流初始化和握手
  • 音频流上传(虚拟数据)
  • 连接管理和清理

常见问题

Q: 为什么2.3没有音频响应?

A: 测试使用的是虚拟PCM数据全零VAD无法检测到有效的语音所以不会触发ASR→LLM→TTS流程。使用真实音频数据包含语音即可获得响应。

Q: 音频格式要求?

A:

  • 输入: PCM格式16bit采样16kHz采样率单声道
  • 输出: PCM格式16bit采样24kHz采样率单声道

Q: 如何确保音频流的顺序?

A:

  • 2.3使用sequence字段标记顺序
  • VAD确保完整的语音段被累积后再处理
  • TTS按顺序发送音频块

Q: 能否同时运行多个2.1会话?

A: 可以但需要顺序处理一个接一个。WebSocket支持并发连接但每个连接的处理是顺序的。

Q: 临时文件会自动删除吗?

A: 是的使用pathlib.Path.unlink()在ASR完成后立即删除。


部署建议

生产环境

  1. 使用反向代理nginx处理负载均衡
  2. 配置HTTPSWSS加密连接
  3. 监控日志和性能指标
  4. 设置合理的超时和重连机制

开发环境

  1. 本地启动服务器
  2. 使用test_ws.py进行功能测试
  3. 查看[标签]日志输出调试

相关文档

  • README.md - 项目总体介绍
  • src/Module/llm/llm.py - LLM模块文档
  • src/Module/tts/tts.py - TTS模块文档
  • src/Module/asr/asr.py - ASR模块文档
  • src/Module/vad/vad.py - VAD模块文档