feat: LLM封装类

This commit is contained in:
DongShengWu 2026-01-01 21:37:02 +08:00
parent 35c9b9eb58
commit 9ab0089a83
3 changed files with 414 additions and 0 deletions

262
src/Module/llm/llm.py Normal file
View File

@ -0,0 +1,262 @@
"""
流式 LLM 模块 - 基于 DeepSeek 模型
核心特性使用 yield 生成器实现流式返回
"""
import os
from typing import Generator, Optional, List, Dict, Any
from pathlib import Path
from dataclasses import dataclass, field
from dotenv import load_dotenv
from openai import OpenAI
from . import llmconfig as config
@dataclass
class ChatChunk:
"""聊天响应块"""
content: str = "" # 回复内容
reasoning_content: str = "" # 思考内容 (思考模式)
is_final: bool = False # 是否为最后一块
is_reasoning: bool = False # 是否为思考内容
error: Optional[str] = None # 错误信息
@dataclass
class ChatResponse:
"""完整聊天响应"""
content: str = "" # 完整回复
reasoning_content: str = "" # 完整思考过程
success: bool = True
error: Optional[str] = None
usage: Optional[Dict[str, Any]] = None # Token 消耗
@dataclass
class Message:
"""消息"""
role: str # 'system', 'user', 'assistant'
content: str
class StreamingLLM:
"""
流式大语言模型类
使用方式:
llm = StreamingLLM()
for chunk in llm.chat("你好"):
print(chunk.content, end="")
"""
def __init__(
self,
model: str = None,
enable_thinking: bool = None,
temperature: float = None,
top_p: float = None,
max_tokens: int = None,
system_prompt: str = None,
):
"""
初始化流式 LLM
Args:
model: 模型名称
enable_thinking: 是否启用思考模式
temperature: 温度参数
top_p: Top P 采样
max_tokens: 最大输出 Token
system_prompt: 系统提示词
"""
self._load_api_key()
self.model = model or config.MODEL
self.enable_thinking = enable_thinking if enable_thinking is not None else config.ENABLE_THINKING
self.temperature = temperature if temperature is not None else config.TEMPERATURE
self.top_p = top_p if top_p is not None else config.TOP_P
self.max_tokens = max_tokens or config.MAX_TOKENS
self.system_prompt = system_prompt or config.SYSTEM_PROMPT
# 初始化 OpenAI 客户端
self._client = OpenAI(
api_key=os.environ.get('DASHSCOPE_API_KEY'),
base_url=config.API_BASE_URL
)
# 对话历史
self._messages: List[Dict[str, str]] = []
def _load_api_key(self) -> None:
"""从 .env 加载 API Key"""
current_dir = Path(__file__).parent
for _ in range(5):
env_path = current_dir / '.env'
if env_path.exists():
load_dotenv(env_path)
break
current_dir = current_dir.parent
if not os.environ.get('DASHSCOPE_API_KEY'):
raise ValueError('未找到 DASHSCOPE_API_KEY')
def chat(
self,
message: str,
enable_thinking: bool = None,
temperature: float = None,
system_prompt: str = None,
) -> Generator[ChatChunk, None, None]:
"""
流式对话 - 核心接口
Args:
message: 用户消息
enable_thinking: 临时覆盖思考模式
temperature: 临时覆盖温度
system_prompt: 临时覆盖系统提示词
Yields:
ChatChunk: 响应块
"""
_enable_thinking = enable_thinking if enable_thinking is not None else self.enable_thinking
_temperature = temperature if temperature is not None else self.temperature
_system_prompt = system_prompt or self.system_prompt
# 构建消息
messages = [{"role": "system", "content": _system_prompt}]
messages.extend(self._messages)
messages.append({"role": "user", "content": message})
try:
# 调用 API
completion = self._client.chat.completions.create(
model=self.model,
messages=messages,
temperature=_temperature,
top_p=self.top_p,
max_tokens=self.max_tokens,
extra_body={"enable_thinking": _enable_thinking},
stream=True,
stream_options={"include_usage": True}
)
full_content = ""
full_reasoning = ""
for chunk in completion:
# 处理 usage 信息
if not chunk.choices:
yield ChatChunk(
content="",
is_final=True,
)
continue
delta = chunk.choices[0].delta
# 思考内容
if hasattr(delta, 'reasoning_content') and delta.reasoning_content:
full_reasoning += delta.reasoning_content
yield ChatChunk(
content="",
reasoning_content=delta.reasoning_content,
is_reasoning=True
)
# 回复内容
if hasattr(delta, 'content') and delta.content:
full_content += delta.content
yield ChatChunk(
content=delta.content,
is_reasoning=False
)
# 更新对话历史
self._messages.append({"role": "user", "content": message})
self._messages.append({"role": "assistant", "content": full_content})
except Exception as e:
yield ChatChunk(
content="",
is_final=True,
error=str(e)
)
def chat_complete(
self,
message: str,
**kwargs
) -> ChatResponse:
"""
非流式对话返回完整响应
Args:
message: 用户消息
**kwargs: 传递给 chat() 的参数
Returns:
ChatResponse: 完整响应
"""
content = ""
reasoning = ""
error = None
for chunk in self.chat(message, **kwargs):
if chunk.error:
error = chunk.error
break
if chunk.reasoning_content:
reasoning += chunk.reasoning_content
if chunk.content:
content += chunk.content
return ChatResponse(
content=content,
reasoning_content=reasoning,
success=error is None,
error=error
)
def clear_history(self) -> None:
"""清空对话历史"""
self._messages.clear()
def get_history(self) -> List[Dict[str, str]]:
"""获取对话历史"""
return self._messages.copy()
# ============================================================
# 便捷函数
# ============================================================
def chat(message: str, **kwargs) -> Generator[ChatChunk, None, None]:
"""
便捷的流式对话函数
Args:
message: 用户消息
**kwargs: 传递给 StreamingLLM 的参数
Yields:
ChatChunk: 响应块
"""
llm = StreamingLLM(**kwargs)
yield from llm.chat(message)
def chat_complete(message: str, **kwargs) -> ChatResponse:
"""
便捷的非流式对话函数
Args:
message: 用户消息
**kwargs: 传递给 StreamingLLM 的参数
Returns:
ChatResponse: 完整响应
"""
llm = StreamingLLM(**kwargs)
return llm.chat_complete(message)

View File

@ -0,0 +1,57 @@
"""
LLM 配置文件
定义大语言模型的默认参数
"""
# ============================================================
# 模型配置
# ============================================================
# 默认模型
MODEL = 'deepseek-v3.2'
# 可选模型列表
AVAILABLE_MODELS = [
'deepseek-v3.2', # 最新版,支持思考模式
'deepseek-v3.2-exp', # 实验版
'deepseek-v3.1', # 上一版本
'deepseek-r1', # 推理模型 (总是思考)
'deepseek-r1-0528', # 推理模型升级版
'deepseek-v3', # 基础版 (不思考)
]
# ============================================================
# API 配置
# ============================================================
# API Base URL (北京地域)
API_BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1'
# ============================================================
# 生成参数
# ============================================================
# 是否启用思考模式 (仅 deepseek-v3.2/v3.1 等支持)
ENABLE_THINKING = False
# 温度 (0.0-2.0, 越高越随机)
TEMPERATURE = 1.0
# Top P 采样
TOP_P = 0.95
# 最大输出 Token 数
MAX_TOKENS = 4096
# ============================================================
# 上下文配置
# ============================================================
# 系统提示词
SYSTEM_PROMPT = "You are a helpful assistant."
# 上下文长度限制
MAX_CONTEXT_LENGTH = 131072

95
test/llm/test_llm.py Normal file
View File

@ -0,0 +1,95 @@
"""
LLM 流式调用测试
"""
from src.Module.llm.llm import StreamingLLM, chat
def test_streaming_chat():
"""测试流式对话"""
print("=" * 60)
print(" LLM 流式对话测试")
print("=" * 60)
llm = StreamingLLM(
model='deepseek-v3.2',
enable_thinking=False, # 不开启思考模式
)
message = "用一句话介绍你自己"
print(f"\n用户: {message}\n")
print("助手: ", end="", flush=True)
full_response = ""
for chunk in llm.chat(message):
if chunk.error:
print(f"\n[错误] {chunk.error}")
return False
if chunk.content:
print(chunk.content, end="", flush=True)
full_response += chunk.content
print("\n")
print(f"完整响应长度: {len(full_response)} 字符")
return len(full_response) > 0
def test_thinking_mode():
"""测试思考模式"""
print("\n" + "=" * 60)
print("思考模式测试")
print("=" * 60)
llm = StreamingLLM(
model='deepseek-v3.2',
enable_thinking=True, # 开启思考模式
)
message = "1+1等于几简单回答"
print(f"\n用户: {message}\n")
reasoning = ""
content = ""
is_answering = False
for chunk in llm.chat(message):
if chunk.error:
print(f"\n[错误] {chunk.error}")
return False
if chunk.reasoning_content:
if not is_answering:
if not reasoning:
print("思考过程: ", end="", flush=True)
print(chunk.reasoning_content, end="", flush=True)
reasoning += chunk.reasoning_content
if chunk.content:
if not is_answering:
print(f"\n\n回复: ", end="", flush=True)
is_answering = True
print(chunk.content, end="", flush=True)
content += chunk.content
print("\n")
print(f"思考过程: {len(reasoning)} 字符")
print(f"回复内容: {len(content)} 字符")
return len(content) > 0
if __name__ == '__main__':
results = []
success1 = test_streaming_chat()
results.append(("流式对话", success1))
success2 = test_thinking_mode()
results.append(("思考模式", success2))
print("\n" + "=" * 60)
print("测试结果:")
for name, success in results:
status = "✓ 通过" if success else "✗ 失败"
print(f" {name}: {status}")