diff --git a/k230/02test_servo.py b/k230/02test_servo.py index f3820ba..24cd1b0 100644 --- a/k230/02test_servo.py +++ b/k230/02test_servo.py @@ -10,7 +10,8 @@ sys.path.append("/sdcard") # 创建舵机对象 (GPIO47, 角度范围0-270度) -servo = ServoController(gpio_pin=47, min_angle=0, max_angle=360, servo_range=360) +servo = ServoController(gpio_pin=46, min_angle=0, max_angle=360, servo_range=360) +servo = ServoController(gpio_pin=61, min_angle=0, max_angle=360, servo_range=360) print("舵机测试开始...") @@ -42,5 +43,5 @@ print("舵机测试开始...") # 360度舵机需要一直发! sg90。 0-180为顺(速度变小) 180-360为逆(速度变大)。 while True: - servo.set_angle(360) + servo.set_angle(100) time.sleep(1) diff --git a/k230/05test_state_detection.py b/k230/05test_state_detection.py index 19c6cb9..228b669 100644 --- a/k230/05test_state_detection.py +++ b/k230/05test_state_detection.py @@ -11,11 +11,14 @@ from state_detection import StateDetector # ========== 配置参数 ========== # WiFi配置 -WIFI_SSID = "dongshengwu" +#WIFI_SSID = "dongshengwu" +#WIFI_PASSWORD = "wds666666" +WIFI_SSID = "ZTE_969121" WIFI_PASSWORD = "wds666666" # 服务器配置 -SERVER_HOST = "172.20.10.9" +#SERVER_HOST = "172.20.10.9" +SERVER_HOST = "192.168.0.21" SERVER_PORT = 8081 API_PATH = "/api/detection/analyze" diff --git a/k230/HaveFace.wav b/k230/HaveFace.wav new file mode 100644 index 0000000..ed4f69a Binary files /dev/null and b/k230/HaveFace.wav differ diff --git a/k230/hello_24k.wav b/k230/hello_24k.wav deleted file mode 100644 index e68783a..0000000 Binary files a/k230/hello_24k.wav and /dev/null differ diff --git a/k230/k230_readme.md b/k230/k230_readme.md index 10d7849..51156b9 100644 --- a/k230/k230_readme.md +++ b/k230/k230_readme.md @@ -16,4 +16,9 @@ 2. 检测人脸( 超过5s) 3. 请求模型端( 情绪检测以及皮肤状态打分,注意时间戳) 4. 保存到数据库( 图像数据 ,心情,皮肤状态) -5. 判断时候状态不好! 然后发起语音对话 \ No newline at end of file +5. 判断时候状态不好! 然后发起语音对话 + + +## 其他 +### 音频格式转换 +ffmpeg -y -i hello_24k.wav -ar 44100 -ac 1 -sample_fmt s16 hello.wav diff --git a/k230/main.py b/k230/main.py index e69de29..87e9e07 100644 --- a/k230/main.py +++ b/k230/main.py @@ -0,0 +1,547 @@ +# main.py +# 人脸检测与状态分析主程序 +# 功能:实时人脸检测 -> 持续3秒触发情绪和皮肤状态检测 -> 输出结果 +# 适用于庐山派 K230-CanMV 开发板 + +import os +import time +import gc +import sys +from face_detect_module import FaceDetector +from state_detection import StateDetector +from audio_module import AudioPlayer +import camera_module + +# 音频播放相关导入 +from media.media import * +from media.pyaudio import * +import media.wave as wave + +# ========== 配置参数 ========== + +# WiFi 配置 +WIFI_SSID = "ZTE_969121" +WIFI_PASSWORD = "wds666666" + +# 服务器配置 +SERVER_HOST = "192.168.0.21" +SERVER_PORT = 8081 +API_PATH = "/api/detection/analyze" + +# 检测参数 +DETECTION_THRESHOLD = 3.0 # 人脸持续3秒后触发检测 +COOLDOWN_PERIOD = 10.0 # 检测完成后10秒冷却期(避免频繁检测) + +# 显示和音频配置 +DISPLAY_MODE = "lcd" # LCD显示模式 +AUDIO_FILE = "/sdcard/HaveFace.wav" # 检测到人脸时的提示音 + +# 摄像头配置 +DETECTION_IMAGE_WIDTH = 640 +DETECTION_IMAGE_HEIGHT = 480 + +# ========== 状态管理类 ========== + + +class FaceDetectionState: + """ + 人脸检测状态管理 + + 追踪: + - 人脸首次出现时间 + - 上次检测触发时间 + - 是否正在检测中 + """ + + def __init__(self): + self.face_start_time = None # 人脸首次出现的时间戳(None表示无人脸) + self.last_detection_time = None # 上次触发检测的时间戳 + self.is_detecting = False # 是否正在执行检测中 + + def on_face_detected(self): + """ + 人脸检测到时调用(有人脸) + """ + if self.face_start_time is None: + self.face_start_time = time.time() + + def on_face_lost(self): + """ + 人脸丢失时调用(无人脸) + 重置人脸计时器 + """ + self.face_start_time = None + + def get_face_duration(self): + """ + 获取人脸持续时间(秒) + + 返回: + float: 人脸持续的秒数,无人脸时返回0 + """ + if self.face_start_time is None: + return 0.0 + return time.time() - self.face_start_time + + def should_trigger_detection( + self, + threshold=DETECTION_THRESHOLD, + cooldown=COOLDOWN_PERIOD): + """ + 判断是否应该触发检测 + + 条件: + 1. 人脸持续时间 >= threshold + 2. 不在检测中 + 3. 超过冷却期(避免重复检测) + + 参数: + threshold: 触发检测的时间阈值(秒) + cooldown: 冷却期(秒) + + 返回: + bool: 是否应该触发检测 + """ + # 如果正在检测中,不再触发 + if self.is_detecting: + return False + + # 人脸持续时间不足,不触发 + if self.get_face_duration() < threshold: + return False + + # 检查冷却期 + if self.last_detection_time is not None: + time_since_last = time.time() - self.last_detection_time + if time_since_last < cooldown: + return False + + return True + + def start_detection(self): + """标记开始执行检测""" + self.is_detecting = True + + def end_detection(self): + """标记检测完成,记录时间用于冷却期计算""" + self.is_detecting = False + self.last_detection_time = time.time() + + def has_active_face(self): + """是否有活跃人脸(人脸出现过)""" + return self.face_start_time is not None + + +# ========== 辅助函数 ========== + +def exit_check(): + """ + 检查退出信号(参考boot.py) + """ + try: + os.exitpoint() + except KeyboardInterrupt as e: + print("[用户] 中断: {}".format(e)) + return True + return False + + +def play_audio_safe(filename): + """ + 安全播放音频文件(完全参考boot.py实现) + + 特点: + - 完整播放音频 + - 正确初始化和释放资源 + - 支持用户中断 + - 参考 boot.py 的实现方式 + + 参数: + filename: WAV文件路径 + + 返回: + bool: True成功, False失败 + """ + try: + # 打开wav文件 + wf = wave.open(filename, 'rb') + chunk = int(wf.get_framerate() / 25) + + # 初始化PyAudio + p = PyAudio() + p.initialize(chunk) + MediaManager.init() # vb buffer初始化(重要!) + + # 创建音频输出流,设置的音频参数均为wave中获取到的参数 + stream = p.open( + format=p.get_format_from_width(wf.get_sampwidth()), + channels=wf.get_channels(), + rate=wf.get_framerate(), + output=True, + frames_per_buffer=chunk + ) + + # 从wav文件中读取第一帧数据 + data = wf.read_frames(chunk) + + # 逐帧读取并播放 + while data: + stream.write(data) # 将帧数据写入到音频输出流中 + data = wf.read_frames(chunk) # 从wav文件中读取数一帧数据 + if exit_check(): # 检查退出信号 + break + + return True + + except BaseException as e: + print("[错误] 音频播放异常: {}".format(e)) + return False + + finally: + # 清理资源 + try: + stream.stop_stream() # 停止音频输出流 + stream.close() # 关闭音频输出流 + except: + pass + + try: + p.terminate() # 释放音频对象 + except: + pass + + try: + wf.close() # 关闭wav文件 + except: + pass + + try: + MediaManager.deinit() # 释放vb buffer + except: + pass + + +def image_to_rgb888(img): + """ + 将图像转换为RGB888格式的bytes数据 + + 支持多种Image对象格式(兼容不同的K230库版本) + + 参数: + img: Image对象 + + 返回: + bytes: RGB888格式的图像数据 + 转换失败返回 None + """ + try: + if hasattr(img, 'to_bytes'): + return img.to_bytes() + if hasattr(img, 'tobytes'): + return img.tobytes() + return bytes(img) + except Exception as e: + print("[错误] 图像转换错误: {}".format(e)) + return None + + +def initialize(): + """ + 初始化程序 + + 步骤: + 1. 连接 WiFi(重试3次) + 2. 初始化 StateDetector + 3. 初始化 FaceDetector + + 返回: + tuple: (face_detector, state_detector) + 初始化失败返回 (None, None) + """ + print("=" * 50) + print("程序启动 - 人脸检测与状态分析系统") + print("=" * 50) + + # ===== 第1步:连接WiFi ===== + print("\n[初始化] 连接WiFi: {}".format(WIFI_SSID)) + + state_detector = StateDetector(SERVER_HOST, SERVER_PORT, API_PATH) + + for attempt in range(3): + try: + state_detector.connect_wifi(WIFI_SSID, WIFI_PASSWORD) + print("[初始化] WiFi连接成功") + break + except Exception as e: + print("[初始化] WiFi连接失败 (尝试 {}/3): {}".format(attempt + 1, e)) + if attempt < 2: + time.sleep(2) + else: + print("[错误] WiFi连接失败,程序退出") + return None, None + + # ===== 第2步:初始化人脸检测器 ===== + print("[初始化] 启动人脸检测器...") + + try: + face_detector = FaceDetector( + display_mode=DISPLAY_MODE, + confidence_threshold=0.5, + debug_mode=0 + ) + face_detector.start() + print("[初始化] 人脸检测器启动成功") + except Exception as e: + print("[错误] 人脸检测器启动失败: {}".format(e)) + state_detector.disconnect() + return None, None + + print("\n[初始化] 初始化完成,开始检测...\n") + + return face_detector, state_detector + + +def capture_detection_image(): + """ + 捕获用于状态检测的图像(640x480) + + 由于 FaceDetector 使用摄像头的 1920x1080 分辨率, + 而状态检测需要 640x480 的图像, + 此函数会临时停止 FaceDetector,使用 camera_module 快速拍摄。 + + 返回: + bytes: RGB888 格式的图像数据 + 失败返回 None + """ + try: + # 初始化 640x480 摄像头 + camera_module.camera_init(DETECTION_IMAGE_WIDTH, DETECTION_IMAGE_HEIGHT) + camera_module.camera_start() + + # 等待摄像头稳定(参考 05test_state_detection.py) + time.sleep(1) + + # 拍照 + print("[检测] 拍照中...") + img = camera_module.camera_snapshot() + + # 转换为RGB888格式 + rgb888_data = image_to_rgb888(img) + + # 清理摄像头资源 + camera_module.camera_stop() + camera_module.camera_deinit() + + return rgb888_data + + except Exception as e: + print("[错误] 图像捕获失败: {}".format(e)) + try: + camera_module.camera_stop() + camera_module.camera_deinit() + except BaseException: + pass + return None + + +def trigger_detection(face_detector, state_detector): + """ + 触发状态检测的完整流程 + + 步骤: + 1. 暂停人脸检测显示(屏幕会冻结) + 2. 播放提示音 + 3. 捕获 640x480 图像 + 4. 发送状态检测请求(情绪+皮肤) + 5. 输出结果 + 6. 恢复人脸检测显示 + + 参数: + face_detector: FaceDetector 实例 + state_detector: StateDetector 实例 + + 注意: + 此过程会阻塞 3-5 秒,期间屏幕显示会冻结 + """ + print("\n" + "=" * 50) + print("触发状态检测...") + print("=" * 50) + + try: + # 暂停人脸检测 + face_detector.stop() + print("[检测] 暂停人脸显示") + + # ===== 步骤1:播放提示音 ===== + print("[检测] 播放提示音...") + try: + if play_audio_safe(AUDIO_FILE): + print("[检测] 提示音播放完成") + else: + print("[警告] 播放音频失败") + except Exception as e: + print("[警告] 播放音频异常: {}".format(e)) + + # ===== 步骤2:捕获图像 ===== + print("[检测] 捕获图像...") + rgb888_data = capture_detection_image() + + if rgb888_data is None: + print("[错误] 无法获取图像数据,检测失败") + return + + print("[检测] 图像数据大小: {} bytes".format(len(rgb888_data))) + + # ===== 步骤3:发送检测请求 ===== + print("[检测] 发送状态检测请求...") + result = state_detector.detect( + rgb888_data, + width=DETECTION_IMAGE_WIDTH, + height=DETECTION_IMAGE_HEIGHT + ) + + # ===== 步骤4:输出结果 ===== + print("\n" + "=" * 45 + " 检测结果 " + "=" * 45) + + if result.get("success"): + emotion = result.get('emotion', 'unknown') + confidence = result.get('emotion_confidence', 0) + skin = result.get('skin_status', {}) + + print("情绪: {} (置信度: {:.2f})".format(emotion, confidence)) + print("皮肤状态:") + print(" - 痘痘: {}".format(skin.get('acne', 0))) + print(" - 皱纹: {}".format(skin.get('wrinkles', 0))) + print(" - 毛孔: {}".format(skin.get('pores', 0))) + print(" - 黑眼圈: {}".format(skin.get('dark_circles', 0))) + else: + print("检测失败: {}".format(result.get('error', '未知错误'))) + + print("=" * 100) + + except Exception as e: + print("[错误] 检测过程异常: {}".format(e)) + try: + sys.print_exception(e) + except BaseException: + pass + + finally: + # 恢复人脸检测显示 + print("\n[检测] 恢复人脸检测...") + try: + face_detector.start() + print("[检测] 人脸检测已恢复\n") + except Exception as e: + print("[错误] 恢复人脸检测失败: {}".format(e)) + + +def main_loop(face_detector, state_detector): + """ + 主检测循环 + + 流程: + 1. 检测人脸 + 2. 更新状态(出现/消失) + 3. 判断是否触发检测 + 4. 绘制和显示 + 5. 垃圾回收 + + 参数: + face_detector: FaceDetector 实例 + state_detector: StateDetector 实例 + """ + state = FaceDetectionState() + frame_count = 0 + + while True: + try: + os.exitpoint() + + # ===== 步骤1:检测人脸 ===== + img, faces = face_detector.detect() + frame_count += 1 + + # ===== 步骤2:更新状态 ===== + if faces: + state.on_face_detected() + + # 仅每秒打印一次(减少日志输出) + if frame_count % 25 == 0: + print("[检测] 检测到 {} 个人脸,持续时间: {:.1f} 秒".format( + len(faces), state.get_face_duration() + )) + else: + state.on_face_lost() + + # ===== 步骤3:判断是否触发检测 ===== + if state.should_trigger_detection(): + state.start_detection() + trigger_detection(face_detector, state_detector) + state.end_detection() + + # ===== 步骤4:绘制和显示 ===== + face_detector.draw_boxes() + face_detector.show() + + # ===== 步骤5:垃圾回收 ===== + gc.collect() + + except KeyboardInterrupt: + print("\n[用户] 用户停止") + break + except Exception as e: + print("[错误] 主循环异常: {}".format(e)) + try: + sys.print_exception(e) + except BaseException: + pass + time.sleep(1) + + +# ========== 主程序入口 ========== + +if __name__ == "__main__": + os.exitpoint(os.EXITPOINT_ENABLE) + + face_detector = None + state_detector = None + + try: + # 初始化 + face_detector, state_detector = initialize() + + # 启动主循环 + if face_detector and state_detector: + main_loop(face_detector, state_detector) + else: + print("[错误] 初始化失败,程序退出") + + except KeyboardInterrupt: + print("\n[用户] 用户中断") + except Exception as e: + print("[错误] 程序异常: {}".format(e)) + try: + sys.print_exception(e) + except BaseException: + pass + + finally: + # 清理资源 + print("\n[清理] 释放资源...") + + if face_detector: + try: + face_detector.stop() + print("[清理] 人脸检测器已停止") + except BaseException: + pass + + if state_detector: + try: + state_detector.disconnect() + print("[清理] WiFi已断开") + except BaseException: + pass + + print("[清理] 程序退出") diff --git a/k230/servo.py b/k230/servo.py new file mode 100644 index 0000000..1ef30ac --- /dev/null +++ b/k230/servo.py @@ -0,0 +1,28 @@ +# 02test_servo.py +# 舵机测试程序:最大 -> 最小 -> 中间 +# 适用于庐山派 K230-CanMV 开发板 + +from servo_module import ServoController +import time +import sys + +sys.path.append("/sdcard") + + +# 俯仰角 +servo1 = ServoController(gpio_pin=42, min_angle=45, max_angle=225, servo_range=270) +# 横滚 +servo2 = ServoController(gpio_pin=52, min_angle=45, max_angle=225, servo_range=270) +print("舵机测试开始...") + +try: + servo1.set_angle(200) + servo2.set_angle(135) + time.sleep(1) + + +except KeyboardInterrupt: + print("用户停止") +finally: + servo1.deinit() + print("舵机资源已释放")