From 9ab0089a831ddc9c1b613b82b0aa7d6089bb5b8e Mon Sep 17 00:00:00 2001 From: wds Date: Thu, 1 Jan 2026 21:37:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20:sparkles:=20LLM=E5=B0=81=E8=A3=85?= =?UTF-8?q?=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Module/llm/llm.py | 262 ++++++++++++++++++++++++++++++++++++ src/Module/llm/llmconfig.py | 57 ++++++++ test/llm/test_llm.py | 95 +++++++++++++ 3 files changed, 414 insertions(+) create mode 100644 src/Module/llm/llm.py create mode 100644 src/Module/llm/llmconfig.py create mode 100644 test/llm/test_llm.py diff --git a/src/Module/llm/llm.py b/src/Module/llm/llm.py new file mode 100644 index 0000000..74ae84d --- /dev/null +++ b/src/Module/llm/llm.py @@ -0,0 +1,262 @@ +""" +流式 LLM 模块 - 基于 DeepSeek 模型 +核心特性:使用 yield 生成器实现流式返回 +""" + +import os +from typing import Generator, Optional, List, Dict, Any +from pathlib import Path +from dataclasses import dataclass, field +from dotenv import load_dotenv +from openai import OpenAI + +from . import llmconfig as config + + +@dataclass +class ChatChunk: + """聊天响应块""" + content: str = "" # 回复内容 + reasoning_content: str = "" # 思考内容 (思考模式) + is_final: bool = False # 是否为最后一块 + is_reasoning: bool = False # 是否为思考内容 + error: Optional[str] = None # 错误信息 + + +@dataclass +class ChatResponse: + """完整聊天响应""" + content: str = "" # 完整回复 + reasoning_content: str = "" # 完整思考过程 + success: bool = True + error: Optional[str] = None + usage: Optional[Dict[str, Any]] = None # Token 消耗 + + +@dataclass +class Message: + """消息""" + role: str # 'system', 'user', 'assistant' + content: str + + +class StreamingLLM: + """ + 流式大语言模型类 + + 使用方式: + llm = StreamingLLM() + for chunk in llm.chat("你好"): + print(chunk.content, end="") + """ + + def __init__( + self, + model: str = None, + enable_thinking: bool = None, + temperature: float = None, + top_p: float = None, + max_tokens: int = None, + system_prompt: str = None, + ): + """ + 初始化流式 LLM + + Args: + model: 模型名称 + enable_thinking: 是否启用思考模式 + temperature: 温度参数 + top_p: Top P 采样 + max_tokens: 最大输出 Token + system_prompt: 系统提示词 + """ + self._load_api_key() + + self.model = model or config.MODEL + self.enable_thinking = enable_thinking if enable_thinking is not None else config.ENABLE_THINKING + self.temperature = temperature if temperature is not None else config.TEMPERATURE + self.top_p = top_p if top_p is not None else config.TOP_P + self.max_tokens = max_tokens or config.MAX_TOKENS + self.system_prompt = system_prompt or config.SYSTEM_PROMPT + + # 初始化 OpenAI 客户端 + self._client = OpenAI( + api_key=os.environ.get('DASHSCOPE_API_KEY'), + base_url=config.API_BASE_URL + ) + + # 对话历史 + self._messages: List[Dict[str, str]] = [] + + def _load_api_key(self) -> None: + """从 .env 加载 API Key""" + current_dir = Path(__file__).parent + for _ in range(5): + env_path = current_dir / '.env' + if env_path.exists(): + load_dotenv(env_path) + break + current_dir = current_dir.parent + + if not os.environ.get('DASHSCOPE_API_KEY'): + raise ValueError('未找到 DASHSCOPE_API_KEY') + + def chat( + self, + message: str, + enable_thinking: bool = None, + temperature: float = None, + system_prompt: str = None, + ) -> Generator[ChatChunk, None, None]: + """ + 流式对话 - 核心接口 + + Args: + message: 用户消息 + enable_thinking: 临时覆盖思考模式 + temperature: 临时覆盖温度 + system_prompt: 临时覆盖系统提示词 + + Yields: + ChatChunk: 响应块 + """ + _enable_thinking = enable_thinking if enable_thinking is not None else self.enable_thinking + _temperature = temperature if temperature is not None else self.temperature + _system_prompt = system_prompt or self.system_prompt + + # 构建消息 + messages = [{"role": "system", "content": _system_prompt}] + messages.extend(self._messages) + messages.append({"role": "user", "content": message}) + + try: + # 调用 API + completion = self._client.chat.completions.create( + model=self.model, + messages=messages, + temperature=_temperature, + top_p=self.top_p, + max_tokens=self.max_tokens, + extra_body={"enable_thinking": _enable_thinking}, + stream=True, + stream_options={"include_usage": True} + ) + + full_content = "" + full_reasoning = "" + + for chunk in completion: + # 处理 usage 信息 + if not chunk.choices: + yield ChatChunk( + content="", + is_final=True, + ) + continue + + delta = chunk.choices[0].delta + + # 思考内容 + if hasattr(delta, 'reasoning_content') and delta.reasoning_content: + full_reasoning += delta.reasoning_content + yield ChatChunk( + content="", + reasoning_content=delta.reasoning_content, + is_reasoning=True + ) + + # 回复内容 + if hasattr(delta, 'content') and delta.content: + full_content += delta.content + yield ChatChunk( + content=delta.content, + is_reasoning=False + ) + + # 更新对话历史 + self._messages.append({"role": "user", "content": message}) + self._messages.append({"role": "assistant", "content": full_content}) + + except Exception as e: + yield ChatChunk( + content="", + is_final=True, + error=str(e) + ) + + def chat_complete( + self, + message: str, + **kwargs + ) -> ChatResponse: + """ + 非流式对话,返回完整响应 + + Args: + message: 用户消息 + **kwargs: 传递给 chat() 的参数 + + Returns: + ChatResponse: 完整响应 + """ + content = "" + reasoning = "" + error = None + + for chunk in self.chat(message, **kwargs): + if chunk.error: + error = chunk.error + break + if chunk.reasoning_content: + reasoning += chunk.reasoning_content + if chunk.content: + content += chunk.content + + return ChatResponse( + content=content, + reasoning_content=reasoning, + success=error is None, + error=error + ) + + def clear_history(self) -> None: + """清空对话历史""" + self._messages.clear() + + def get_history(self) -> List[Dict[str, str]]: + """获取对话历史""" + return self._messages.copy() + + +# ============================================================ +# 便捷函数 +# ============================================================ + +def chat(message: str, **kwargs) -> Generator[ChatChunk, None, None]: + """ + 便捷的流式对话函数 + + Args: + message: 用户消息 + **kwargs: 传递给 StreamingLLM 的参数 + + Yields: + ChatChunk: 响应块 + """ + llm = StreamingLLM(**kwargs) + yield from llm.chat(message) + + +def chat_complete(message: str, **kwargs) -> ChatResponse: + """ + 便捷的非流式对话函数 + + Args: + message: 用户消息 + **kwargs: 传递给 StreamingLLM 的参数 + + Returns: + ChatResponse: 完整响应 + """ + llm = StreamingLLM(**kwargs) + return llm.chat_complete(message) diff --git a/src/Module/llm/llmconfig.py b/src/Module/llm/llmconfig.py new file mode 100644 index 0000000..32e1884 --- /dev/null +++ b/src/Module/llm/llmconfig.py @@ -0,0 +1,57 @@ +""" +LLM 配置文件 +定义大语言模型的默认参数 +""" + +# ============================================================ +# 模型配置 +# ============================================================ + +# 默认模型 +MODEL = 'deepseek-v3.2' + +# 可选模型列表 +AVAILABLE_MODELS = [ + 'deepseek-v3.2', # 最新版,支持思考模式 + 'deepseek-v3.2-exp', # 实验版 + 'deepseek-v3.1', # 上一版本 + 'deepseek-r1', # 推理模型 (总是思考) + 'deepseek-r1-0528', # 推理模型升级版 + 'deepseek-v3', # 基础版 (不思考) +] + + +# ============================================================ +# API 配置 +# ============================================================ + +# API Base URL (北京地域) +API_BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' + + +# ============================================================ +# 生成参数 +# ============================================================ + +# 是否启用思考模式 (仅 deepseek-v3.2/v3.1 等支持) +ENABLE_THINKING = False + +# 温度 (0.0-2.0, 越高越随机) +TEMPERATURE = 1.0 + +# Top P 采样 +TOP_P = 0.95 + +# 最大输出 Token 数 +MAX_TOKENS = 4096 + + +# ============================================================ +# 上下文配置 +# ============================================================ + +# 系统提示词 +SYSTEM_PROMPT = "You are a helpful assistant." + +# 上下文长度限制 +MAX_CONTEXT_LENGTH = 131072 diff --git a/test/llm/test_llm.py b/test/llm/test_llm.py new file mode 100644 index 0000000..91f22e6 --- /dev/null +++ b/test/llm/test_llm.py @@ -0,0 +1,95 @@ +""" +LLM 流式调用测试 +""" + +from src.Module.llm.llm import StreamingLLM, chat + + +def test_streaming_chat(): + """测试流式对话""" + print("=" * 60) + print(" LLM 流式对话测试") + print("=" * 60) + + llm = StreamingLLM( + model='deepseek-v3.2', + enable_thinking=False, # 不开启思考模式 + ) + + message = "用一句话介绍你自己" + print(f"\n用户: {message}\n") + print("助手: ", end="", flush=True) + + full_response = "" + for chunk in llm.chat(message): + if chunk.error: + print(f"\n[错误] {chunk.error}") + return False + if chunk.content: + print(chunk.content, end="", flush=True) + full_response += chunk.content + + print("\n") + print(f"完整响应长度: {len(full_response)} 字符") + + return len(full_response) > 0 + + +def test_thinking_mode(): + """测试思考模式""" + print("\n" + "=" * 60) + print("思考模式测试") + print("=" * 60) + + llm = StreamingLLM( + model='deepseek-v3.2', + enable_thinking=True, # 开启思考模式 + ) + + message = "1+1等于几?简单回答" + print(f"\n用户: {message}\n") + + reasoning = "" + content = "" + is_answering = False + + for chunk in llm.chat(message): + if chunk.error: + print(f"\n[错误] {chunk.error}") + return False + + if chunk.reasoning_content: + if not is_answering: + if not reasoning: + print("思考过程: ", end="", flush=True) + print(chunk.reasoning_content, end="", flush=True) + reasoning += chunk.reasoning_content + + if chunk.content: + if not is_answering: + print(f"\n\n回复: ", end="", flush=True) + is_answering = True + print(chunk.content, end="", flush=True) + content += chunk.content + + print("\n") + print(f"思考过程: {len(reasoning)} 字符") + print(f"回复内容: {len(content)} 字符") + + return len(content) > 0 + + +if __name__ == '__main__': + results = [] + + success1 = test_streaming_chat() + results.append(("流式对话", success1)) + + success2 = test_thinking_mode() + results.append(("思考模式", success2)) + + print("\n" + "=" * 60) + print("测试结果:") + for name, success in results: + status = "✓ 通过" if success else "✗ 失败" + print(f" {name}: {status}")