feat: update

This commit is contained in:
DongShengWu 2026-01-02 09:45:27 +08:00
parent 9f2f086cdf
commit b223b32fb4
7 changed files with 589 additions and 5 deletions

View File

@ -10,7 +10,8 @@ sys.path.append("/sdcard")
# 创建舵机对象 (GPIO47, 角度范围0-270度) # 创建舵机对象 (GPIO47, 角度范围0-270度)
servo = ServoController(gpio_pin=47, min_angle=0, max_angle=360, servo_range=360) servo = ServoController(gpio_pin=46, min_angle=0, max_angle=360, servo_range=360)
servo = ServoController(gpio_pin=61, min_angle=0, max_angle=360, servo_range=360)
print("舵机测试开始...") print("舵机测试开始...")
@ -42,5 +43,5 @@ print("舵机测试开始...")
# 360度舵机需要一直发 sg90。 0-180为顺(速度变小) 180-360为逆速度变大 # 360度舵机需要一直发 sg90。 0-180为顺(速度变小) 180-360为逆速度变大
while True: while True:
servo.set_angle(360) servo.set_angle(100)
time.sleep(1) time.sleep(1)

View File

@ -11,11 +11,14 @@ from state_detection import StateDetector
# ========== 配置参数 ========== # ========== 配置参数 ==========
# WiFi配置 # WiFi配置
WIFI_SSID = "dongshengwu" #WIFI_SSID = "dongshengwu"
#WIFI_PASSWORD = "wds666666"
WIFI_SSID = "ZTE_969121"
WIFI_PASSWORD = "wds666666" WIFI_PASSWORD = "wds666666"
# 服务器配置 # 服务器配置
SERVER_HOST = "172.20.10.9" #SERVER_HOST = "172.20.10.9"
SERVER_HOST = "192.168.0.21"
SERVER_PORT = 8081 SERVER_PORT = 8081
API_PATH = "/api/detection/analyze" API_PATH = "/api/detection/analyze"

BIN
k230/HaveFace.wav Normal file

Binary file not shown.

Binary file not shown.

View File

@ -17,3 +17,8 @@
3. 请求模型端( 情绪检测以及皮肤状态打分,注意时间戳) 3. 请求模型端( 情绪检测以及皮肤状态打分,注意时间戳)
4. 保存到数据库( 图像数据 ,心情,皮肤状态) 4. 保存到数据库( 图像数据 ,心情,皮肤状态)
5. 判断时候状态不好! 然后发起语音对话 5. 判断时候状态不好! 然后发起语音对话
## 其他
### 音频格式转换
ffmpeg -y -i hello_24k.wav -ar 44100 -ac 1 -sample_fmt s16 hello.wav

View File

@ -0,0 +1,547 @@
# main.py
# 人脸检测与状态分析主程序
# 功能:实时人脸检测 -> 持续3秒触发情绪和皮肤状态检测 -> 输出结果
# 适用于庐山派 K230-CanMV 开发板
import os
import time
import gc
import sys
from face_detect_module import FaceDetector
from state_detection import StateDetector
from audio_module import AudioPlayer
import camera_module
# 音频播放相关导入
from media.media import *
from media.pyaudio import *
import media.wave as wave
# ========== 配置参数 ==========
# WiFi 配置
WIFI_SSID = "ZTE_969121"
WIFI_PASSWORD = "wds666666"
# 服务器配置
SERVER_HOST = "192.168.0.21"
SERVER_PORT = 8081
API_PATH = "/api/detection/analyze"
# 检测参数
DETECTION_THRESHOLD = 3.0 # 人脸持续3秒后触发检测
COOLDOWN_PERIOD = 10.0 # 检测完成后10秒冷却期避免频繁检测
# 显示和音频配置
DISPLAY_MODE = "lcd" # LCD显示模式
AUDIO_FILE = "/sdcard/HaveFace.wav" # 检测到人脸时的提示音
# 摄像头配置
DETECTION_IMAGE_WIDTH = 640
DETECTION_IMAGE_HEIGHT = 480
# ========== 状态管理类 ==========
class FaceDetectionState:
"""
人脸检测状态管理
追踪
- 人脸首次出现时间
- 上次检测触发时间
- 是否正在检测中
"""
def __init__(self):
self.face_start_time = None # 人脸首次出现的时间戳None表示无人脸
self.last_detection_time = None # 上次触发检测的时间戳
self.is_detecting = False # 是否正在执行检测中
def on_face_detected(self):
"""
人脸检测到时调用有人脸
"""
if self.face_start_time is None:
self.face_start_time = time.time()
def on_face_lost(self):
"""
人脸丢失时调用无人脸
重置人脸计时器
"""
self.face_start_time = None
def get_face_duration(self):
"""
获取人脸持续时间
返回
float: 人脸持续的秒数无人脸时返回0
"""
if self.face_start_time is None:
return 0.0
return time.time() - self.face_start_time
def should_trigger_detection(
self,
threshold=DETECTION_THRESHOLD,
cooldown=COOLDOWN_PERIOD):
"""
判断是否应该触发检测
条件
1. 人脸持续时间 >= threshold
2. 不在检测中
3. 超过冷却期避免重复检测
参数
threshold: 触发检测的时间阈值
cooldown: 冷却期
返回
bool: 是否应该触发检测
"""
# 如果正在检测中,不再触发
if self.is_detecting:
return False
# 人脸持续时间不足,不触发
if self.get_face_duration() < threshold:
return False
# 检查冷却期
if self.last_detection_time is not None:
time_since_last = time.time() - self.last_detection_time
if time_since_last < cooldown:
return False
return True
def start_detection(self):
"""标记开始执行检测"""
self.is_detecting = True
def end_detection(self):
"""标记检测完成,记录时间用于冷却期计算"""
self.is_detecting = False
self.last_detection_time = time.time()
def has_active_face(self):
"""是否有活跃人脸(人脸出现过)"""
return self.face_start_time is not None
# ========== 辅助函数 ==========
def exit_check():
"""
检查退出信号参考boot.py
"""
try:
os.exitpoint()
except KeyboardInterrupt as e:
print("[用户] 中断: {}".format(e))
return True
return False
def play_audio_safe(filename):
"""
安全播放音频文件完全参考boot.py实现
特点
- 完整播放音频
- 正确初始化和释放资源
- 支持用户中断
- 参考 boot.py 的实现方式
参数
filename: WAV文件路径
返回
bool: True成功, False失败
"""
try:
# 打开wav文件
wf = wave.open(filename, 'rb')
chunk = int(wf.get_framerate() / 25)
# 初始化PyAudio
p = PyAudio()
p.initialize(chunk)
MediaManager.init() # vb buffer初始化重要
# 创建音频输出流设置的音频参数均为wave中获取到的参数
stream = p.open(
format=p.get_format_from_width(wf.get_sampwidth()),
channels=wf.get_channels(),
rate=wf.get_framerate(),
output=True,
frames_per_buffer=chunk
)
# 从wav文件中读取第一帧数据
data = wf.read_frames(chunk)
# 逐帧读取并播放
while data:
stream.write(data) # 将帧数据写入到音频输出流中
data = wf.read_frames(chunk) # 从wav文件中读取数一帧数据
if exit_check(): # 检查退出信号
break
return True
except BaseException as e:
print("[错误] 音频播放异常: {}".format(e))
return False
finally:
# 清理资源
try:
stream.stop_stream() # 停止音频输出流
stream.close() # 关闭音频输出流
except:
pass
try:
p.terminate() # 释放音频对象
except:
pass
try:
wf.close() # 关闭wav文件
except:
pass
try:
MediaManager.deinit() # 释放vb buffer
except:
pass
def image_to_rgb888(img):
"""
将图像转换为RGB888格式的bytes数据
支持多种Image对象格式兼容不同的K230库版本
参数
img: Image对象
返回
bytes: RGB888格式的图像数据
转换失败返回 None
"""
try:
if hasattr(img, 'to_bytes'):
return img.to_bytes()
if hasattr(img, 'tobytes'):
return img.tobytes()
return bytes(img)
except Exception as e:
print("[错误] 图像转换错误: {}".format(e))
return None
def initialize():
"""
初始化程序
步骤
1. 连接 WiFi重试3次
2. 初始化 StateDetector
3. 初始化 FaceDetector
返回
tuple: (face_detector, state_detector)
初始化失败返回 (None, None)
"""
print("=" * 50)
print("程序启动 - 人脸检测与状态分析系统")
print("=" * 50)
# ===== 第1步连接WiFi =====
print("\n[初始化] 连接WiFi: {}".format(WIFI_SSID))
state_detector = StateDetector(SERVER_HOST, SERVER_PORT, API_PATH)
for attempt in range(3):
try:
state_detector.connect_wifi(WIFI_SSID, WIFI_PASSWORD)
print("[初始化] WiFi连接成功")
break
except Exception as e:
print("[初始化] WiFi连接失败 (尝试 {}/3): {}".format(attempt + 1, e))
if attempt < 2:
time.sleep(2)
else:
print("[错误] WiFi连接失败程序退出")
return None, None
# ===== 第2步初始化人脸检测器 =====
print("[初始化] 启动人脸检测器...")
try:
face_detector = FaceDetector(
display_mode=DISPLAY_MODE,
confidence_threshold=0.5,
debug_mode=0
)
face_detector.start()
print("[初始化] 人脸检测器启动成功")
except Exception as e:
print("[错误] 人脸检测器启动失败: {}".format(e))
state_detector.disconnect()
return None, None
print("\n[初始化] 初始化完成,开始检测...\n")
return face_detector, state_detector
def capture_detection_image():
"""
捕获用于状态检测的图像640x480
由于 FaceDetector 使用摄像头的 1920x1080 分辨率
而状态检测需要 640x480 的图像
此函数会临时停止 FaceDetector使用 camera_module 快速拍摄
返回
bytes: RGB888 格式的图像数据
失败返回 None
"""
try:
# 初始化 640x480 摄像头
camera_module.camera_init(DETECTION_IMAGE_WIDTH, DETECTION_IMAGE_HEIGHT)
camera_module.camera_start()
# 等待摄像头稳定(参考 05test_state_detection.py
time.sleep(1)
# 拍照
print("[检测] 拍照中...")
img = camera_module.camera_snapshot()
# 转换为RGB888格式
rgb888_data = image_to_rgb888(img)
# 清理摄像头资源
camera_module.camera_stop()
camera_module.camera_deinit()
return rgb888_data
except Exception as e:
print("[错误] 图像捕获失败: {}".format(e))
try:
camera_module.camera_stop()
camera_module.camera_deinit()
except BaseException:
pass
return None
def trigger_detection(face_detector, state_detector):
"""
触发状态检测的完整流程
步骤
1. 暂停人脸检测显示屏幕会冻结
2. 播放提示音
3. 捕获 640x480 图像
4. 发送状态检测请求情绪+皮肤
5. 输出结果
6. 恢复人脸检测显示
参数
face_detector: FaceDetector 实例
state_detector: StateDetector 实例
注意
此过程会阻塞 3-5 期间屏幕显示会冻结
"""
print("\n" + "=" * 50)
print("触发状态检测...")
print("=" * 50)
try:
# 暂停人脸检测
face_detector.stop()
print("[检测] 暂停人脸显示")
# ===== 步骤1播放提示音 =====
print("[检测] 播放提示音...")
try:
if play_audio_safe(AUDIO_FILE):
print("[检测] 提示音播放完成")
else:
print("[警告] 播放音频失败")
except Exception as e:
print("[警告] 播放音频异常: {}".format(e))
# ===== 步骤2捕获图像 =====
print("[检测] 捕获图像...")
rgb888_data = capture_detection_image()
if rgb888_data is None:
print("[错误] 无法获取图像数据,检测失败")
return
print("[检测] 图像数据大小: {} bytes".format(len(rgb888_data)))
# ===== 步骤3发送检测请求 =====
print("[检测] 发送状态检测请求...")
result = state_detector.detect(
rgb888_data,
width=DETECTION_IMAGE_WIDTH,
height=DETECTION_IMAGE_HEIGHT
)
# ===== 步骤4输出结果 =====
print("\n" + "=" * 45 + " 检测结果 " + "=" * 45)
if result.get("success"):
emotion = result.get('emotion', 'unknown')
confidence = result.get('emotion_confidence', 0)
skin = result.get('skin_status', {})
print("情绪: {} (置信度: {:.2f})".format(emotion, confidence))
print("皮肤状态:")
print(" - 痘痘: {}".format(skin.get('acne', 0)))
print(" - 皱纹: {}".format(skin.get('wrinkles', 0)))
print(" - 毛孔: {}".format(skin.get('pores', 0)))
print(" - 黑眼圈: {}".format(skin.get('dark_circles', 0)))
else:
print("检测失败: {}".format(result.get('error', '未知错误')))
print("=" * 100)
except Exception as e:
print("[错误] 检测过程异常: {}".format(e))
try:
sys.print_exception(e)
except BaseException:
pass
finally:
# 恢复人脸检测显示
print("\n[检测] 恢复人脸检测...")
try:
face_detector.start()
print("[检测] 人脸检测已恢复\n")
except Exception as e:
print("[错误] 恢复人脸检测失败: {}".format(e))
def main_loop(face_detector, state_detector):
"""
主检测循环
流程
1. 检测人脸
2. 更新状态出现/消失
3. 判断是否触发检测
4. 绘制和显示
5. 垃圾回收
参数
face_detector: FaceDetector 实例
state_detector: StateDetector 实例
"""
state = FaceDetectionState()
frame_count = 0
while True:
try:
os.exitpoint()
# ===== 步骤1检测人脸 =====
img, faces = face_detector.detect()
frame_count += 1
# ===== 步骤2更新状态 =====
if faces:
state.on_face_detected()
# 仅每秒打印一次(减少日志输出)
if frame_count % 25 == 0:
print("[检测] 检测到 {} 个人脸,持续时间: {:.1f}".format(
len(faces), state.get_face_duration()
))
else:
state.on_face_lost()
# ===== 步骤3判断是否触发检测 =====
if state.should_trigger_detection():
state.start_detection()
trigger_detection(face_detector, state_detector)
state.end_detection()
# ===== 步骤4绘制和显示 =====
face_detector.draw_boxes()
face_detector.show()
# ===== 步骤5垃圾回收 =====
gc.collect()
except KeyboardInterrupt:
print("\n[用户] 用户停止")
break
except Exception as e:
print("[错误] 主循环异常: {}".format(e))
try:
sys.print_exception(e)
except BaseException:
pass
time.sleep(1)
# ========== 主程序入口 ==========
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
face_detector = None
state_detector = None
try:
# 初始化
face_detector, state_detector = initialize()
# 启动主循环
if face_detector and state_detector:
main_loop(face_detector, state_detector)
else:
print("[错误] 初始化失败,程序退出")
except KeyboardInterrupt:
print("\n[用户] 用户中断")
except Exception as e:
print("[错误] 程序异常: {}".format(e))
try:
sys.print_exception(e)
except BaseException:
pass
finally:
# 清理资源
print("\n[清理] 释放资源...")
if face_detector:
try:
face_detector.stop()
print("[清理] 人脸检测器已停止")
except BaseException:
pass
if state_detector:
try:
state_detector.disconnect()
print("[清理] WiFi已断开")
except BaseException:
pass
print("[清理] 程序退出")

28
k230/servo.py Normal file
View File

@ -0,0 +1,28 @@
# 02test_servo.py
# 舵机测试程序:最大 -> 最小 -> 中间
# 适用于庐山派 K230-CanMV 开发板
from servo_module import ServoController
import time
import sys
sys.path.append("/sdcard")
# 俯仰角
servo1 = ServoController(gpio_pin=42, min_angle=45, max_angle=225, servo_range=270)
# 横滚
servo2 = ServoController(gpio_pin=52, min_angle=45, max_angle=225, servo_range=270)
print("舵机测试开始...")
try:
servo1.set_angle(200)
servo2.set_angle(135)
time.sleep(1)
except KeyboardInterrupt:
print("用户停止")
finally:
servo1.deinit()
print("舵机资源已释放")