Compare commits

..

10 Commits

Author SHA1 Message Date
wds
b223b32fb4 feat: update 2026-01-02 09:45:27 +08:00
wds
9f2f086cdf feat: 添加启动提示声音 2026-01-02 07:38:17 +08:00
wds
91efe70da0 feat: http简单上报异常以及对话 2026-01-02 05:13:03 +08:00
wds
3b49a8d776 feat: boot测试 2026-01-02 03:01:21 +08:00
wds
2b808dd35b feat: 人物状态检测 2026-01-01 23:52:26 +08:00
wds
bda636b0ef feat: 人脸识别类以及使用示例! 2026-01-01 15:41:58 +08:00
wds
d49748b516 build: 🏗️ 添加人脸识别功能! 2026-01-01 15:13:08 +08:00
wds
20ea76dccb feat: 音频播放以及录制模块! 2026-01-01 15:06:31 +08:00
wds
8efb28d725 build: 🏗 添加k230部分简单的软件架构 2026-01-01 14:33:24 +08:00
wds
92f40c1cf4 feat: 舵机控制模块 2026-01-01 14:27:05 +08:00
20 changed files with 2414 additions and 2 deletions

47
k230/02test_servo.py Normal file
View File

@ -0,0 +1,47 @@
# 02test_servo.py
# 舵机测试程序:最大 -> 最小 -> 中间
# 适用于庐山派 K230-CanMV 开发板
from servo_module import ServoController
import time
import sys
sys.path.append("/sdcard")
# 创建舵机对象 (GPIO47, 角度范围0-270度)
servo = ServoController(gpio_pin=46, min_angle=0, max_angle=360, servo_range=360)
servo = ServoController(gpio_pin=61, min_angle=0, max_angle=360, servo_range=360)
print("舵机测试开始...")
# try:
# # 1. 移动到最大角度
# print("移动到最大角度")
# servo.set_angle(360)
# time.sleep(1)
# # 2. 移动到最小角度 (0度)
# print("移动到最小角度: 0")
# servo.set_angle(0)
# time.sleep(1)
# # 3. 回到中间位置 (135度)
# print("回到中间位置: 135")
# servo.center()
# time.sleep(1)
# print("测试完成!")
# except KeyboardInterrupt:
# print("用户停止")
# finally:
# servo.deinit()
# print("舵机资源已释放")
# 360度舵机需要一直发 sg90。 0-180为顺(速度变小) 180-360为逆速度变大
while True:
servo.set_angle(100)
time.sleep(1)

124
k230/03test_audio.py Normal file
View File

@ -0,0 +1,124 @@
# # 03test_audio.py
# # 音频测试程序:先录制再播放
# # 适用于庐山派 K230-CanMV 开发板
# from audio_module import AudioRecorder, AudioPlayer
# import os
# import sys
# import time
# sys.path.append("/sdcard")
# os.exitpoint(os.EXITPOINT_ENABLE)
# # 配置
# AUDIO_FILE = '/sdcard/test_record.wav'
# RECORD_DURATION = 5 # 录制5秒
# print("=" * 40)
# print("音频录制和播放测试")
# print("=" * 40)
# # ===== 录制音频 =====
# print("\n[1] 开始录制音频...")
# print(" 时长: {}秒".format(RECORD_DURATION))
# print(" 请对着麦克风说话...")
# recorder = AudioRecorder()
# result = recorder.record_to_file(AUDIO_FILE, RECORD_DURATION)
# if result:
# print(" 录制完成!")
# else:
# print(" 录制失败!")
# # 等待一下
# time.sleep(1)
# # ===== 播放音频 =====
# print("\n[2] 开始播放录制的音频...")
# print(" 请确保已插入3.5mm耳机")
# player = AudioPlayer()
# result = player.play_file(AUDIO_FILE)
# if result:
# print(" 播放完成!")
# else:
# print(" 播放失败!")
# print("\n" + "=" * 40)
# print("测试结束")
# print("=" * 40)
# ===== 同时录播测试 =====
# 使用流式接口实现:麦克风录制 -> 耳机播放
from media.pyaudio import *
from media.media import *
import os
import sys
sys.path.append("/sdcard")
os.exitpoint(os.EXITPOINT_ENABLE)
DURATION = 15 # 持续15秒
RATE = 44100
CHANNELS = 1
CHUNK = int(RATE / 25)
FORMAT = paInt16
print("=" * 40)
print("同时录播测试 ({}秒)".format(DURATION))
print("请插入耳机并对着麦克风说话")
print("=" * 40)
try:
p = PyAudio()
p.initialize(CHUNK)
MediaManager.init()
# 创建输入流 (录制)
input_stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
# 创建输出流 (播放)
output_stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK
)
print("开始录播...")
# 实时录播
total_chunks = int(RATE / CHUNK * DURATION)
for i in range(total_chunks):
data = input_stream.read()
output_stream.write(data)
try:
os.exitpoint()
except KeyboardInterrupt:
print("用户停止")
break
print("录播完成!")
except BaseException as e:
print("异常: ", e)
finally:
input_stream.stop_stream()
output_stream.stop_stream()
input_stream.close()
output_stream.close()
p.terminate()
MediaManager.deinit()
print("资源已释放")

View File

@ -0,0 +1,65 @@
# 04test_face_detect.py
# 人脸检测测试程序
# 适用于庐山派 K230-CanMV 开发板
from face_detect_module import FaceDetector
import os
import sys
import gc
sys.path.append("/sdcard")
os.exitpoint(os.EXITPOINT_ENABLE)
# ============ 配置 ============
DISPLAY_MODE = "lcd" # 显示模式: "lcd" 或 "hdmi"
CONFIDENCE = 0.5 # 置信度阈值
# ==============================
print("=" * 40)
print("人脸检测测试")
print("显示模式: {}".format(DISPLAY_MODE))
print("=" * 40)
# 创建人脸检测器
detector = FaceDetector(
display_mode=DISPLAY_MODE,
confidence_threshold=CONFIDENCE
)
try:
# 启动检测器
print("正在初始化...")
detector.start()
print("初始化完成,开始检测...")
while True:
os.exitpoint()
# 检测人脸
img, faces = detector.detect()
# 打印检测结果
if faces:
print("检测到 {} 个人脸:".format(len(faces)))
for i, (x, y, w, h, score) in enumerate(faces):
print(" 人脸{}: x={}, y={}, w={}, h={}, score={:.2f}".format(
i + 1, x, y, w, h, score
))
# 绘制检测框
detector.draw_boxes(color=(255, 255, 0, 255), thickness=2)
# 显示结果
detector.show()
gc.collect()
except KeyboardInterrupt:
print("\n用户停止")
except Exception as e:
print("异常: ", e)
sys.print_exception(e)
finally:
detector.stop()
print("检测器已停止")

View File

@ -0,0 +1,110 @@
# 05test_state_detection.py
# 状态检测测试 - 简化版:仅发送一次请求后退出
# 适用于庐山派 K230-CanMV 开发板
import time
import os
import gc
import camera_module
from state_detection import StateDetector
# ========== 配置参数 ==========
# WiFi配置
#WIFI_SSID = "dongshengwu"
#WIFI_PASSWORD = "wds666666"
WIFI_SSID = "ZTE_969121"
WIFI_PASSWORD = "wds666666"
# 服务器配置
#SERVER_HOST = "172.20.10.9"
SERVER_HOST = "192.168.0.21"
SERVER_PORT = 8081
API_PATH = "/api/detection/analyze"
# 摄像头配置
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
# ========== 辅助函数 ==========
def image_to_rgb888(img):
"""将图像转换为RGB888格式的bytes数据"""
try:
if hasattr(img, 'to_bytes'):
return img.to_bytes()
if hasattr(img, 'tobytes'):
return img.tobytes()
return bytes(img)
except Exception as e:
print("图像转换错误: " + str(e))
return None
def test_single_detection():
"""执行单次状态检测测试"""
print("=" * 50)
print("状态检测测试 (单次运行模式)")
print("=" * 50)
state_detector = StateDetector(SERVER_HOST, SERVER_PORT, API_PATH)
try:
# 1. 初始化并连接网络
print("连接WiFi: " + WIFI_SSID)
state_detector.connect_wifi(WIFI_SSID, WIFI_PASSWORD)
# 2. 初始化摄像头
print("启动摄像头...")
camera_module.camera_init(CAMERA_WIDTH, CAMERA_HEIGHT)
camera_module.camera_start()
# 等待摄像头稳定
time.sleep(1)
# 3. 拍照并转换
print("正在拍照...")
img = camera_module.camera_snapshot()
rgb888_data = image_to_rgb888(img)
if rgb888_data:
# 4. 发送请求
print("发送图像到服务器 (" + str(len(rgb888_data)) + " bytes)...")
result = state_detector.detect(
rgb888_data, width=CAMERA_WIDTH, height=CAMERA_HEIGHT)
# 5. 输出结果
print("\n" + "=" * 15 + " 检测结果 " + "=" * 15)
if result.get("success"):
print(
"心情: %s (置信度: %.2f)" %
(result.get('emotion'), result.get(
'emotion_confidence', 0)))
skin = result.get("skin_status", {})
print(
"皮肤状态: 痘痘:%d, 皱纹:%d, 毛孔:%d, 黑眼圈:%d" %
(skin.get(
'acne', 0), skin.get(
'wrinkles', 0), skin.get(
'pores', 0), skin.get(
'dark_circles', 0)))
else:
print("检测失败: " + str(result.get('error', '未知错误')))
print("=" * 40)
else:
print("错误: 无法获取图像数据")
except Exception as e:
print("测试出错: " + str(e))
import sys
sys.print_exception(e)
finally:
print("\n正在清理资源...")
state_detector.disconnect()
camera_module.camera_deinit()
print("测试完成,程序退出")
if __name__ == "__main__":
test_single_detection()

BIN
k230/HaveFace.wav Normal file

Binary file not shown.

248
k230/agent_client_http.py Normal file
View File

@ -0,0 +1,248 @@
import socket
import json
import time
import os
import network
import struct
import gc
from audio_module import AudioPlayer
# Try importing wave, handle different environments
try:
import media.wave as wave
except ImportError:
try:
import wave
except ImportError:
wave = None
class AgentClientHTTP:
"""
K230 Agent Client (HTTP)
"""
def __init__(self, server_host, server_port=8000):
self._server_host = server_host
self._server_port = server_port
self._sta = None
self._is_connected = False
def connect_wifi(self, ssid, password=None, timeout=15):
"""
Connect to WiFi
"""
self._sta = network.WLAN(network.STA_IF)
self._sta.active(True)
if password:
self._sta.connect(ssid, password)
else:
self._sta.connect(ssid)
print("正在连接WiFi: " + ssid + "...")
start_time = time.time()
while not self._sta.isconnected():
if time.time() - start_time > timeout:
raise RuntimeError("WiFi连接超时: " + ssid)
time.sleep(1)
try:
os.exitpoint()
except BaseException:
pass
ip = self._sta.ifconfig()[0]
print("WiFi连接成功! IP: " + ip)
self._is_connected = True
return ip
def disconnect(self):
"""断开网络连接"""
if self._sta:
if self._sta.isconnected():
self._sta.disconnect()
self._sta.active(False)
print("WiFi已断开网卡已关闭。")
self._is_connected = False
def trigger_abnormal_state(self, reason, context_data=None):
"""
Trigger abnormal state and save response as WAV.
"""
url_path = "/abnormal_trigger"
payload = {
"type": "abnormal_trigger",
"trigger_reason": reason,
"enable_streaming": True,
"context_data": context_data or {}
}
body_json = json.dumps(payload)
print(f"[AgentHTTP] 请求服务器: {self._server_host}:{self._server_port}...")
try:
# Send Request
response_data = self._send_http_request(url_path, body_json)
# Parse Header/Body
header_end = response_data.find(b"\r\n\r\n")
if header_end == -1:
print("[AgentHTTP] 响应格式错误")
return None
body_bytes = response_data[header_end + 4:]
if len(body_bytes) == 0:
print("[AgentHTTP] 响应正文为空")
return None
print(f"[AgentHTTP] 成功接收音频数据: {len(body_bytes)} 字节")
# Save as WAV file
# TTS server returns 24000Hz, 1 channel, 16bit (2 bytes) PCM
wav_file = "/sdcard/agent_response.wav"
self._save_wav(wav_file, body_bytes, rate=24000, channels=1, sampwidth=2)
return wav_file
except Exception as e:
print(f"[AgentHTTP] 请求失败: {e}")
return None
def _save_wav(self, filename, pcm_data, rate=24000, channels=1, sampwidth=2):
"""
Save PCM data with WAV header
"""
print(f"[AgentHTTP] 保存 WAV 文件: {filename} (Rate: {rate}, Ch: {channels})")
try:
if wave:
# Use wave module if available
wf = wave.open(filename, 'wb')
wf.set_channels(channels)
wf.set_sampwidth(sampwidth)
wf.set_framerate(rate)
wf.write_frames(pcm_data)
wf.close()
else:
# Manual WAV header creation
total_len = len(pcm_data) + 36
header = struct.pack(
'<4sI4s4sIHHIIHH4sI',
b'RIFF',
total_len,
b'WAVE',
b'fmt ',
16,
1,
channels,
rate,
rate * channels * sampwidth,
channels * sampwidth,
sampwidth * 8,
b'data',
len(pcm_data))
with open(filename, 'wb') as f:
f.write(header)
f.write(pcm_data)
except Exception as e:
print(f"[AgentHTTP] 保存 WAV 失败: {e}")
def play_audio_file(self, file_path):
"""
Play audio file using AudioPlayer.play_file (handles WAV)
"""
if not file_path:
return
print(f"[AgentHTTP] 正在播放: {file_path}...")
player = None
try:
player = AudioPlayer()
# play_file will init and start stream, write frames, and then deinit in
# finally block
player.play_file(file_path)
print("[AgentHTTP] 播放完成。")
except Exception as e:
print(f"[AgentHTTP] 播放出错: {e}")
finally:
# Double check deinit to prevent noise/resource leak
if player:
try:
if player.is_running: # Check if flag exists
player.deinit()
except BaseException:
pass
def _send_http_request(self, path, body_json):
"""
Send HTTP POST
"""
addr_info = socket.getaddrinfo(self._server_host, self._server_port)
addr = addr_info[0][-1]
s = socket.socket()
s.settimeout(10)
try:
s.connect(addr)
body_bytes = body_json.encode('utf-8')
req = f"POST {path} HTTP/1.1\r\n"
req += f"Host: {self._server_host}:{self._server_port}\r\n"
req += "Content-Type: application/json\r\n"
req += f"Content-Length: {len(body_bytes)}\r\n"
req += "Connection: close\r\n\r\n"
s.send(req.encode('utf-8'))
s.send(body_bytes)
response = bytearray()
while True:
chunk = s.recv(4096)
if not chunk:
break
response.extend(chunk)
return response
finally:
s.close()
if __name__ == "__main__":
# ========== CONFIG ==========
# WIFI_SSID = "dongshengwu"
WIFI_SSID = "ZTE_969121"
WIFI_PASSWORD = "wds666666"
# SERVER_HOST = "172.20.10.2"
SERVER_HOST = "192.168.0.93"
SERVER_PORT = 8000
# ============================
client = AgentClientHTTP(server_host=SERVER_HOST, server_port=SERVER_PORT)
try:
# 1. Connect
client.connect_wifi(WIFI_SSID, WIFI_PASSWORD)
# 2. Trigger & Save
print("\n[测试] 触发皮肤状态异常提示...")
wav_file = client.trigger_abnormal_state("poor_skin")
# 3. Play
if wav_file:
client.play_audio_file(wav_file)
except Exception as e:
print("程序异常: " + str(e))
finally:
# 4. Clean up resources
print("\n[资源清理] 断开 WiFi 并回收资源...")
gc.collect()
print("测试结束,已安全退出。")

314
k230/audio_module.py Normal file
View File

@ -0,0 +1,314 @@
# audio_module.py
# 音频录制和播放模块
# 适用于庐山派 K230-CanMV 开发板
import os
from media.media import *
from media.pyaudio import *
import media.wave as wave
class AudioRecorder:
"""
音频录制类 - 使用板载麦克风录制音频
参数:
rate: 采样率 (默认44100Hz)
channels: 声道数 (默认1单声道)
format: 采样精度 (默认paInt1616)
使用方式1 - 直接录制到文件:
recorder = AudioRecorder()
recorder.record_to_file('/sdcard/test.wav', duration=5)
recorder.deinit()
使用方式2 - 流式录制:
recorder = AudioRecorder()
recorder.start()
for i in range(100):
data = recorder.read() # 获取一帧数据
# 处理数据...
recorder.save('/sdcard/test.wav', frames)
recorder.stop()
recorder.deinit()
"""
def __init__(self, rate=44100, channels=1, format=paInt16):
"""
初始化音频录制器
参数:
rate: 采样率默认44100Hz
channels: 声道数默认1(单声道)
format: 采样精度默认paInt16(16)
"""
self._rate = rate
self._channels = channels
self._format = format
self._chunk = int(rate / 25)
self._pyaudio = None
self._stream = None
self._is_running = False
def start(self):
"""
启动录制流流式录制模式
调用后可以使用 read() 方法获取音频数据
"""
if self._is_running:
return
self._pyaudio = PyAudio()
self._pyaudio.initialize(self._chunk)
MediaManager.init()
self._stream = self._pyaudio.open(
format=self._format,
channels=self._channels,
rate=self._rate,
input=True,
frames_per_buffer=self._chunk
)
self._is_running = True
def read(self):
"""
读取一帧音频数据流式录制模式
返回:
bytes: 一帧音频数据
"""
if not self._is_running:
raise RuntimeError("录制器未启动,请先调用 start()")
return self._stream.read()
def stop(self):
"""停止录制流"""
if self._stream:
self._stream.stop_stream()
self._stream.close()
self._stream = None
self._is_running = False
def deinit(self):
"""释放所有资源"""
self.stop()
if self._pyaudio:
self._pyaudio.terminate()
self._pyaudio = None
MediaManager.deinit()
def save(self, filename, frames):
"""
将音频帧数据保存为WAV文件
参数:
filename: 保存的文件路径
frames: 音频帧数据列表
"""
wf = wave.open(filename, 'wb')
wf.set_channels(self._channels)
wf.set_sampwidth(self._pyaudio.get_sample_size(self._format))
wf.set_framerate(self._rate)
wf.write_frames(b''.join(frames))
wf.close()
def record_to_file(self, filename, duration):
"""
直接录制音频到WAV文件
参数:
filename: 保存的文件路径 ( '/sdcard/test.wav')
duration: 录制时长 ()
返回:
bool: True成功, False失败
"""
try:
self.start()
frames = []
total_chunks = int(self._rate / self._chunk * duration)
for i in range(total_chunks):
data = self.read()
frames.append(data)
# 检查退出信号
try:
os.exitpoint()
except KeyboardInterrupt:
break
self.save(filename, frames)
return True
except BaseException as e:
print("录制异常: ", e)
return False
finally:
self.stop()
self.deinit()
@property
def is_running(self):
"""是否正在录制"""
return self._is_running
@property
def chunk_size(self):
"""每帧数据大小"""
return self._chunk
@property
def rate(self):
"""采样率"""
return self._rate
class AudioPlayer:
"""
音频播放类 - 通过3.5mm耳机接口播放音频
使用方式1 - 直接播放文件:
player = AudioPlayer()
player.play_file('/sdcard/test.wav')
player.deinit()
使用方式2 - 流式播放:
player = AudioPlayer(rate=44100, channels=1)
player.start()
player.write(audio_data) # 写入音频数据
player.stop()
player.deinit()
"""
def __init__(self, rate=44100, channels=1, format=paInt16):
"""
初始化音频播放器
参数:
rate: 采样率默认44100Hz
channels: 声道数默认1(单声道)
format: 采样精度默认paInt16(16)
"""
self._rate = rate
self._channels = channels
self._format = format
self._chunk = int(rate / 25)
self._pyaudio = None
self._stream = None
self._is_running = False
def start(self):
"""
启动播放流流式播放模式
调用后可以使用 write() 方法写入音频数据
"""
if self._is_running:
return
self._pyaudio = PyAudio()
self._pyaudio.initialize(self._chunk)
MediaManager.init()
self._stream = self._pyaudio.open(
format=self._format,
channels=self._channels,
rate=self._rate,
output=True,
frames_per_buffer=self._chunk
)
self._is_running = True
def write(self, data):
"""
写入音频数据进行播放流式播放模式
参数:
data: 音频数据 (bytes)
"""
if not self._is_running:
raise RuntimeError("播放器未启动,请先调用 start()")
self._stream.write(data)
def stop(self):
"""停止播放流"""
if self._stream:
self._stream.stop_stream()
self._stream.close()
self._stream = None
self._is_running = False
def deinit(self):
"""释放所有资源"""
self.stop()
if self._pyaudio:
self._pyaudio.terminate()
self._pyaudio = None
MediaManager.deinit()
def play_file(self, filename):
"""
播放WAV音频文件
参数:
filename: WAV文件路径 ( '/sdcard/test.wav')
返回:
bool: True成功, False失败
"""
wf = None
try:
wf = wave.open(filename, 'rb')
chunk = int(wf.get_framerate() / 25)
self._pyaudio = PyAudio()
self._pyaudio.initialize(chunk)
MediaManager.init()
self._stream = self._pyaudio.open(
format=self._pyaudio.get_format_from_width(wf.get_sampwidth()),
channels=wf.get_channels(),
rate=wf.get_framerate(),
output=True,
frames_per_buffer=chunk
)
self._is_running = True
data = wf.read_frames(chunk)
while data:
self._stream.write(data)
data = wf.read_frames(chunk)
# 检查退出信号
try:
os.exitpoint()
except KeyboardInterrupt:
break
return True
except BaseException as e:
print("播放异常: ", e)
return False
finally:
if wf:
wf.close()
self.stop()
self.deinit()
@property
def is_running(self):
"""是否正在播放"""
return self._is_running
@property
def chunk_size(self):
"""每帧数据大小"""
return self._chunk
@property
def rate(self):
"""采样率"""
return self._rate

59
k230/boot.py Normal file
View File

@ -0,0 +1,59 @@
# audio input and output example
#
# Note: You will need an SD card to run this example.
#
# You can play wav files or capture audio to save as wav
import os
from media.media import * # 导入media模块用于初始化vb buffer
from media.pyaudio import * # 导入pyaudio模块用于采集和播放音频
import media.wave as wave # 导入wav模块用于保存和加载wav音频文件
def exit_check():
try:
os.exitpoint()
except KeyboardInterrupt as e:
print("user stop: ", e)
return True
return False
def play_audio(filename):
try:
wf = wave.open(filename, 'rb') # 打开wav文件
CHUNK = int(wf.get_framerate() / 25) # 设置音频chunk值
p = PyAudio()
p.initialize(CHUNK) # 初始化PyAudio对象
MediaManager.init() # vb buffer初始化
# 创建音频输出流设置的音频参数均为wave中获取到的参数
stream = p.open(format=p.get_format_from_width(wf.get_sampwidth()),
channels=wf.get_channels(),
rate=wf.get_framerate(),
output=True, frames_per_buffer=CHUNK)
data = wf.read_frames(CHUNK) # 从wav文件中读取数一帧数据
while data:
stream.write(data) # 将帧数据写入到音频输出流中
data = wf.read_frames(CHUNK) # 从wav文件中读取数一帧数据
if exit_check():
break
except BaseException as e:
print(f"Exception {e}")
finally:
stream.stop_stream() # 停止音频输出流
stream.close() # 关闭音频输出流
p.terminate() # 释放音频对象
wf.close() # 关闭wav文件
MediaManager.deinit() # 释放vb buffer
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
print("play_audio sample start")
play_audio('/sdcard/hello.wav') # 播放wav文件
print("play_audio sample done")

View File

@ -40,8 +40,8 @@ def camera_init(width, height, chn=CAM_CHN_ID_0, hmirror=False, vflip=False):
# 设置通道的输出尺寸 # 设置通道的输出尺寸
sensor.set_framesize(width=width, height=height, chn=chn) sensor.set_framesize(width=width, height=height, chn=chn)
# 设置通道的输出像素格式为RGB565 # 设置通道的输出像素格式为RGB888
sensor.set_pixformat(Sensor.RGB565, chn=chn) sensor.set_pixformat(Sensor.RGB888, chn=chn)
# 初始化媒体管理器 # 初始化媒体管理器
MediaManager.init() MediaManager.init()

280
k230/face_detect_module.py Normal file
View File

@ -0,0 +1,280 @@
# face_detect_module.py
# 人脸检测模块
# 适用于庐山派 K230-CanMV 开发板
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
import nncase_runtime as nn
import ulab.numpy as np
import aidemo
import gc
class FaceDetector:
"""
人脸检测类 - 使用K230的AI能力检测人脸
功能:
- 检测图像中的人脸
- 返回人脸边界框坐标 (x, y, w, h, score)
- 支持在图像上绘制检测框
参数:
display_mode: 显示模式 "lcd" "hdmi"
confidence_threshold: 置信度阈值 (默认0.5)
nms_threshold: 非极大值抑制阈值 (默认0.2)
debug_mode: 调试模式开关 (默认0关闭)
使用示例:
detector = FaceDetector(display_mode="lcd")
detector.start()
while True:
img, faces = detector.detect()
# faces = [(x, y, w, h, score), ...]
detector.show()
detector.stop()
"""
# 模型配置
KMODEL_PATH = "/sdcard/examples/kmodel/face_detection_320.kmodel"
ANCHORS_PATH = "/sdcard/examples/utils/prior_data_320.bin"
MODEL_INPUT_SIZE = [320, 320]
ANCHOR_LEN = 4200
DET_DIM = 4
# 显示配置
DISPLAY_CONFIGS = {
"lcd": {
"display_size": [800, 480],
"rgb888p_size": [1920, 1080],
},
"hdmi": {
"display_size": [1920, 1080],
"rgb888p_size": [1920, 1080],
}
}
def __init__(self, display_mode="lcd", confidence_threshold=0.5,
nms_threshold=0.2, debug_mode=0):
"""
初始化人脸检测器
参数:
display_mode: 显示模式 "lcd" "hdmi"
confidence_threshold: 置信度阈值值越高过滤越严格
nms_threshold: NMS阈值防止重复检测
debug_mode: 调试模式1开启计时输出
"""
if display_mode not in self.DISPLAY_CONFIGS:
raise ValueError("display_mode必须是 'lcd''hdmi'")
self._display_mode = display_mode
self._confidence_threshold = confidence_threshold
self._nms_threshold = nms_threshold
self._debug_mode = debug_mode
# 获取显示配置
config = self.DISPLAY_CONFIGS[display_mode]
self._display_size = config["display_size"]
self._rgb888p_size = config["rgb888p_size"]
# 加载锚点数据
self._anchors = np.fromfile(self.ANCHORS_PATH, dtype=np.float)
self._anchors = self._anchors.reshape((self.ANCHOR_LEN, self.DET_DIM))
self._pipeline = None
self._face_det = None
self._is_running = False
self._last_faces = []
def start(self):
"""启动人脸检测器,初始化摄像头和模型"""
if self._is_running:
return
# 初始化Pipeline
self._pipeline = PipeLine(
rgb888p_size=self._rgb888p_size,
display_size=self._display_size,
display_mode=self._display_mode
)
self._pipeline.create()
# 初始化人脸检测模型
self._face_det = _FaceDetectionApp(
kmodel_path=self.KMODEL_PATH,
model_input_size=self.MODEL_INPUT_SIZE,
anchors=self._anchors,
confidence_threshold=self._confidence_threshold,
nms_threshold=self._nms_threshold,
rgb888p_size=self._rgb888p_size,
display_size=self._display_size,
debug_mode=self._debug_mode
)
self._face_det.config_preprocess()
self._is_running = True
def detect(self):
"""
检测当前帧中的人脸
返回:
tuple: (img, faces)
- img: 当前帧图像
- faces: 人脸列表每个元素为 (x, y, w, h, score)
坐标已转换为显示分辨率下的实际坐标
"""
if not self._is_running:
raise RuntimeError("检测器未启动,请先调用 start()")
os.exitpoint()
# 获取当前帧
img = self._pipeline.get_frame()
# 推理
res = self._face_det.run(img)
# 转换坐标为显示分辨率
faces = []
if res:
for det in res:
x, y, w, h = map(lambda v: int(round(v, 0)), det[:4])
score = det[4] if len(det) > 4 else 1.0
# 转换为显示坐标
x = x * self._display_size[0] // self._rgb888p_size[0]
y = y * self._display_size[1] // self._rgb888p_size[1]
w = w * self._display_size[0] // self._rgb888p_size[0]
h = h * self._display_size[1] // self._rgb888p_size[1]
faces.append((x, y, w, h, score))
self._last_faces = faces
return img, faces
def draw_boxes(self, color=(255, 255, 0, 255), thickness=2):
"""
在OSD层绘制人脸检测框
参数:
color: 框颜色 (R, G, B, A)
thickness: 线条粗细
"""
if not self._is_running:
return
self._pipeline.osd_img.clear()
for (x, y, w, h, score) in self._last_faces:
self._pipeline.osd_img.draw_rectangle(
x, y, w, h,
color=color,
thickness=thickness
)
def show(self):
"""显示当前帧需要先调用detect和draw_boxes"""
if self._is_running:
self._pipeline.show_image()
gc.collect()
def stop(self):
"""停止人脸检测器,释放资源"""
if self._face_det:
self._face_det.deinit()
self._face_det = None
if self._pipeline:
self._pipeline.destroy()
self._pipeline = None
self._is_running = False
@property
def is_running(self):
"""是否正在运行"""
return self._is_running
@property
def display_size(self):
"""显示分辨率"""
return self._display_size
@property
def last_faces(self):
"""上次检测到的人脸列表"""
return self._last_faces
class _FaceDetectionApp(AIBase):
"""内部人脸检测应用类继承自AIBase"""
def __init__(self, kmodel_path, model_input_size, anchors,
confidence_threshold=0.5, nms_threshold=0.2,
rgb888p_size=[224, 224], display_size=[1920, 1080],
debug_mode=0):
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
self.kmodel_path = kmodel_path
self.model_input_size = model_input_size
self.confidence_threshold = confidence_threshold
self.nms_threshold = nms_threshold
self.anchors = anchors
# 对宽度进行16字节对齐
self.rgb888p_size = [self._align_up(rgb888p_size[0], 16), rgb888p_size[1]]
self.display_size = [self._align_up(display_size[0], 16), display_size[1]]
self.debug_mode = debug_mode
self.ai2d = Ai2d(debug_mode)
self.ai2d.set_ai2d_dtype(
nn.ai2d_format.NCHW_FMT,
nn.ai2d_format.NCHW_FMT,
np.uint8,
np.uint8
)
def _align_up(self, value, alignment):
"""向上对齐"""
return ((value + alignment - 1) // alignment) * alignment
def config_preprocess(self, input_image_size=None):
"""配置预处理"""
with ScopedTiming("set preprocess config", self.debug_mode > 0):
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
top, bottom, left, right = self.get_padding_param()
self.ai2d.pad([0, 0, 0, 0, top, bottom, left, right], 0, [104, 117, 123])
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build(
[1, 3, ai2d_input_size[1], ai2d_input_size[0]],
[1, 3, self.model_input_size[1], self.model_input_size[0]]
)
def postprocess(self, results):
"""后处理"""
with ScopedTiming("postprocess", self.debug_mode > 0):
post_ret = aidemo.face_det_post_process(
self.confidence_threshold,
self.nms_threshold,
self.model_input_size[1],
self.anchors,
self.rgb888p_size,
results
)
if len(post_ret) == 0:
return post_ret
else:
return post_ret[0]
def get_padding_param(self):
"""计算填充参数"""
dst_w = self.model_input_size[0]
dst_h = self.model_input_size[1]
ratio_w = dst_w / self.rgb888p_size[0]
ratio_h = dst_h / self.rgb888p_size[1]
ratio = min(ratio_w, ratio_h)
new_w = int(ratio * self.rgb888p_size[0])
new_h = int(ratio * self.rgb888p_size[1])
dw = (dst_w - new_w) / 2
dh = (dst_h - new_h) / 2
top = int(round(0))
bottom = int(round(dh * 2 + 0.1))
left = int(round(0))
right = int(round(dw * 2 - 0.1))
return top, bottom, left, right

BIN
k230/hello.wav Normal file

Binary file not shown.

24
k230/k230_readme.md Normal file
View File

@ -0,0 +1,24 @@
## k230模块
1. 3.1寸显示驱动 ---> 已实现
2. 摄像头图像获取 ---> 已实现
3. 舵机控制 ---> 已实现
4. 麦克风 ---> 已实现
5. 喇叭播放 ---> 已实现
## 软件驱动
1. 人脸检测接口 ---> 已实现
2. ws请求接口
3. 播放本地音频文件接口(提前内置,直接使用前面实现了的喇叭播放)
4. 人脸追踪
## 流程
1. 获取图像数据
2. 检测人脸( 超过5s
3. 请求模型端( 情绪检测以及皮肤状态打分,注意时间戳)
4. 保存到数据库( 图像数据 ,心情,皮肤状态)
5. 判断时候状态不好! 然后发起语音对话
## 其他
### 音频格式转换
ffmpeg -y -i hello_24k.wav -ar 44100 -ac 1 -sample_fmt s16 hello.wav

BIN
k230/logo.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

BIN
k230/logo_3s.mp4 Normal file

Binary file not shown.

547
k230/main.py Normal file
View File

@ -0,0 +1,547 @@
# main.py
# 人脸检测与状态分析主程序
# 功能:实时人脸检测 -> 持续3秒触发情绪和皮肤状态检测 -> 输出结果
# 适用于庐山派 K230-CanMV 开发板
import os
import time
import gc
import sys
from face_detect_module import FaceDetector
from state_detection import StateDetector
from audio_module import AudioPlayer
import camera_module
# 音频播放相关导入
from media.media import *
from media.pyaudio import *
import media.wave as wave
# ========== 配置参数 ==========
# WiFi 配置
WIFI_SSID = "ZTE_969121"
WIFI_PASSWORD = "wds666666"
# 服务器配置
SERVER_HOST = "192.168.0.21"
SERVER_PORT = 8081
API_PATH = "/api/detection/analyze"
# 检测参数
DETECTION_THRESHOLD = 3.0 # 人脸持续3秒后触发检测
COOLDOWN_PERIOD = 10.0 # 检测完成后10秒冷却期避免频繁检测
# 显示和音频配置
DISPLAY_MODE = "lcd" # LCD显示模式
AUDIO_FILE = "/sdcard/HaveFace.wav" # 检测到人脸时的提示音
# 摄像头配置
DETECTION_IMAGE_WIDTH = 640
DETECTION_IMAGE_HEIGHT = 480
# ========== 状态管理类 ==========
class FaceDetectionState:
"""
人脸检测状态管理
追踪
- 人脸首次出现时间
- 上次检测触发时间
- 是否正在检测中
"""
def __init__(self):
self.face_start_time = None # 人脸首次出现的时间戳None表示无人脸
self.last_detection_time = None # 上次触发检测的时间戳
self.is_detecting = False # 是否正在执行检测中
def on_face_detected(self):
"""
人脸检测到时调用有人脸
"""
if self.face_start_time is None:
self.face_start_time = time.time()
def on_face_lost(self):
"""
人脸丢失时调用无人脸
重置人脸计时器
"""
self.face_start_time = None
def get_face_duration(self):
"""
获取人脸持续时间
返回
float: 人脸持续的秒数无人脸时返回0
"""
if self.face_start_time is None:
return 0.0
return time.time() - self.face_start_time
def should_trigger_detection(
self,
threshold=DETECTION_THRESHOLD,
cooldown=COOLDOWN_PERIOD):
"""
判断是否应该触发检测
条件
1. 人脸持续时间 >= threshold
2. 不在检测中
3. 超过冷却期避免重复检测
参数
threshold: 触发检测的时间阈值
cooldown: 冷却期
返回
bool: 是否应该触发检测
"""
# 如果正在检测中,不再触发
if self.is_detecting:
return False
# 人脸持续时间不足,不触发
if self.get_face_duration() < threshold:
return False
# 检查冷却期
if self.last_detection_time is not None:
time_since_last = time.time() - self.last_detection_time
if time_since_last < cooldown:
return False
return True
def start_detection(self):
"""标记开始执行检测"""
self.is_detecting = True
def end_detection(self):
"""标记检测完成,记录时间用于冷却期计算"""
self.is_detecting = False
self.last_detection_time = time.time()
def has_active_face(self):
"""是否有活跃人脸(人脸出现过)"""
return self.face_start_time is not None
# ========== 辅助函数 ==========
def exit_check():
"""
检查退出信号参考boot.py
"""
try:
os.exitpoint()
except KeyboardInterrupt as e:
print("[用户] 中断: {}".format(e))
return True
return False
def play_audio_safe(filename):
"""
安全播放音频文件完全参考boot.py实现
特点
- 完整播放音频
- 正确初始化和释放资源
- 支持用户中断
- 参考 boot.py 的实现方式
参数
filename: WAV文件路径
返回
bool: True成功, False失败
"""
try:
# 打开wav文件
wf = wave.open(filename, 'rb')
chunk = int(wf.get_framerate() / 25)
# 初始化PyAudio
p = PyAudio()
p.initialize(chunk)
MediaManager.init() # vb buffer初始化重要
# 创建音频输出流设置的音频参数均为wave中获取到的参数
stream = p.open(
format=p.get_format_from_width(wf.get_sampwidth()),
channels=wf.get_channels(),
rate=wf.get_framerate(),
output=True,
frames_per_buffer=chunk
)
# 从wav文件中读取第一帧数据
data = wf.read_frames(chunk)
# 逐帧读取并播放
while data:
stream.write(data) # 将帧数据写入到音频输出流中
data = wf.read_frames(chunk) # 从wav文件中读取数一帧数据
if exit_check(): # 检查退出信号
break
return True
except BaseException as e:
print("[错误] 音频播放异常: {}".format(e))
return False
finally:
# 清理资源
try:
stream.stop_stream() # 停止音频输出流
stream.close() # 关闭音频输出流
except:
pass
try:
p.terminate() # 释放音频对象
except:
pass
try:
wf.close() # 关闭wav文件
except:
pass
try:
MediaManager.deinit() # 释放vb buffer
except:
pass
def image_to_rgb888(img):
"""
将图像转换为RGB888格式的bytes数据
支持多种Image对象格式兼容不同的K230库版本
参数
img: Image对象
返回
bytes: RGB888格式的图像数据
转换失败返回 None
"""
try:
if hasattr(img, 'to_bytes'):
return img.to_bytes()
if hasattr(img, 'tobytes'):
return img.tobytes()
return bytes(img)
except Exception as e:
print("[错误] 图像转换错误: {}".format(e))
return None
def initialize():
"""
初始化程序
步骤
1. 连接 WiFi重试3次
2. 初始化 StateDetector
3. 初始化 FaceDetector
返回
tuple: (face_detector, state_detector)
初始化失败返回 (None, None)
"""
print("=" * 50)
print("程序启动 - 人脸检测与状态分析系统")
print("=" * 50)
# ===== 第1步连接WiFi =====
print("\n[初始化] 连接WiFi: {}".format(WIFI_SSID))
state_detector = StateDetector(SERVER_HOST, SERVER_PORT, API_PATH)
for attempt in range(3):
try:
state_detector.connect_wifi(WIFI_SSID, WIFI_PASSWORD)
print("[初始化] WiFi连接成功")
break
except Exception as e:
print("[初始化] WiFi连接失败 (尝试 {}/3): {}".format(attempt + 1, e))
if attempt < 2:
time.sleep(2)
else:
print("[错误] WiFi连接失败程序退出")
return None, None
# ===== 第2步初始化人脸检测器 =====
print("[初始化] 启动人脸检测器...")
try:
face_detector = FaceDetector(
display_mode=DISPLAY_MODE,
confidence_threshold=0.5,
debug_mode=0
)
face_detector.start()
print("[初始化] 人脸检测器启动成功")
except Exception as e:
print("[错误] 人脸检测器启动失败: {}".format(e))
state_detector.disconnect()
return None, None
print("\n[初始化] 初始化完成,开始检测...\n")
return face_detector, state_detector
def capture_detection_image():
"""
捕获用于状态检测的图像640x480
由于 FaceDetector 使用摄像头的 1920x1080 分辨率
而状态检测需要 640x480 的图像
此函数会临时停止 FaceDetector使用 camera_module 快速拍摄
返回
bytes: RGB888 格式的图像数据
失败返回 None
"""
try:
# 初始化 640x480 摄像头
camera_module.camera_init(DETECTION_IMAGE_WIDTH, DETECTION_IMAGE_HEIGHT)
camera_module.camera_start()
# 等待摄像头稳定(参考 05test_state_detection.py
time.sleep(1)
# 拍照
print("[检测] 拍照中...")
img = camera_module.camera_snapshot()
# 转换为RGB888格式
rgb888_data = image_to_rgb888(img)
# 清理摄像头资源
camera_module.camera_stop()
camera_module.camera_deinit()
return rgb888_data
except Exception as e:
print("[错误] 图像捕获失败: {}".format(e))
try:
camera_module.camera_stop()
camera_module.camera_deinit()
except BaseException:
pass
return None
def trigger_detection(face_detector, state_detector):
"""
触发状态检测的完整流程
步骤
1. 暂停人脸检测显示屏幕会冻结
2. 播放提示音
3. 捕获 640x480 图像
4. 发送状态检测请求情绪+皮肤
5. 输出结果
6. 恢复人脸检测显示
参数
face_detector: FaceDetector 实例
state_detector: StateDetector 实例
注意
此过程会阻塞 3-5 期间屏幕显示会冻结
"""
print("\n" + "=" * 50)
print("触发状态检测...")
print("=" * 50)
try:
# 暂停人脸检测
face_detector.stop()
print("[检测] 暂停人脸显示")
# ===== 步骤1播放提示音 =====
print("[检测] 播放提示音...")
try:
if play_audio_safe(AUDIO_FILE):
print("[检测] 提示音播放完成")
else:
print("[警告] 播放音频失败")
except Exception as e:
print("[警告] 播放音频异常: {}".format(e))
# ===== 步骤2捕获图像 =====
print("[检测] 捕获图像...")
rgb888_data = capture_detection_image()
if rgb888_data is None:
print("[错误] 无法获取图像数据,检测失败")
return
print("[检测] 图像数据大小: {} bytes".format(len(rgb888_data)))
# ===== 步骤3发送检测请求 =====
print("[检测] 发送状态检测请求...")
result = state_detector.detect(
rgb888_data,
width=DETECTION_IMAGE_WIDTH,
height=DETECTION_IMAGE_HEIGHT
)
# ===== 步骤4输出结果 =====
print("\n" + "=" * 45 + " 检测结果 " + "=" * 45)
if result.get("success"):
emotion = result.get('emotion', 'unknown')
confidence = result.get('emotion_confidence', 0)
skin = result.get('skin_status', {})
print("情绪: {} (置信度: {:.2f})".format(emotion, confidence))
print("皮肤状态:")
print(" - 痘痘: {}".format(skin.get('acne', 0)))
print(" - 皱纹: {}".format(skin.get('wrinkles', 0)))
print(" - 毛孔: {}".format(skin.get('pores', 0)))
print(" - 黑眼圈: {}".format(skin.get('dark_circles', 0)))
else:
print("检测失败: {}".format(result.get('error', '未知错误')))
print("=" * 100)
except Exception as e:
print("[错误] 检测过程异常: {}".format(e))
try:
sys.print_exception(e)
except BaseException:
pass
finally:
# 恢复人脸检测显示
print("\n[检测] 恢复人脸检测...")
try:
face_detector.start()
print("[检测] 人脸检测已恢复\n")
except Exception as e:
print("[错误] 恢复人脸检测失败: {}".format(e))
def main_loop(face_detector, state_detector):
"""
主检测循环
流程
1. 检测人脸
2. 更新状态出现/消失
3. 判断是否触发检测
4. 绘制和显示
5. 垃圾回收
参数
face_detector: FaceDetector 实例
state_detector: StateDetector 实例
"""
state = FaceDetectionState()
frame_count = 0
while True:
try:
os.exitpoint()
# ===== 步骤1检测人脸 =====
img, faces = face_detector.detect()
frame_count += 1
# ===== 步骤2更新状态 =====
if faces:
state.on_face_detected()
# 仅每秒打印一次(减少日志输出)
if frame_count % 25 == 0:
print("[检测] 检测到 {} 个人脸,持续时间: {:.1f}".format(
len(faces), state.get_face_duration()
))
else:
state.on_face_lost()
# ===== 步骤3判断是否触发检测 =====
if state.should_trigger_detection():
state.start_detection()
trigger_detection(face_detector, state_detector)
state.end_detection()
# ===== 步骤4绘制和显示 =====
face_detector.draw_boxes()
face_detector.show()
# ===== 步骤5垃圾回收 =====
gc.collect()
except KeyboardInterrupt:
print("\n[用户] 用户停止")
break
except Exception as e:
print("[错误] 主循环异常: {}".format(e))
try:
sys.print_exception(e)
except BaseException:
pass
time.sleep(1)
# ========== 主程序入口 ==========
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
face_detector = None
state_detector = None
try:
# 初始化
face_detector, state_detector = initialize()
# 启动主循环
if face_detector and state_detector:
main_loop(face_detector, state_detector)
else:
print("[错误] 初始化失败,程序退出")
except KeyboardInterrupt:
print("\n[用户] 用户中断")
except Exception as e:
print("[错误] 程序异常: {}".format(e))
try:
sys.print_exception(e)
except BaseException:
pass
finally:
# 清理资源
print("\n[清理] 释放资源...")
if face_detector:
try:
face_detector.stop()
print("[清理] 人脸检测器已停止")
except BaseException:
pass
if state_detector:
try:
state_detector.disconnect()
print("[清理] WiFi已断开")
except BaseException:
pass
print("[清理] 程序退出")

28
k230/servo.py Normal file
View File

@ -0,0 +1,28 @@
# 02test_servo.py
# 舵机测试程序:最大 -> 最小 -> 中间
# 适用于庐山派 K230-CanMV 开发板
from servo_module import ServoController
import time
import sys
sys.path.append("/sdcard")
# 俯仰角
servo1 = ServoController(gpio_pin=42, min_angle=45, max_angle=225, servo_range=270)
# 横滚
servo2 = ServoController(gpio_pin=52, min_angle=45, max_angle=225, servo_range=270)
print("舵机测试开始...")
try:
servo1.set_angle(200)
servo2.set_angle(135)
time.sleep(1)
except KeyboardInterrupt:
print("用户停止")
finally:
servo1.deinit()
print("舵机资源已释放")

141
k230/servo_module.py Normal file
View File

@ -0,0 +1,141 @@
# servo_module.py
# 舵机控制模块
# 适用于庐山派 K230-CanMV 开发板
from machine import PWM, FPIOA
# GPIO引脚与PWM通道的固定映射 (40PIN排针可用)
# 排针引脚号 | 芯片引脚号 | PWM通道号
# 12 | GPIO 47 | PWM3
# 26 | GPIO 61 | PWM1
# 32 | GPIO 46 | PWM2
# 33 | GPIO 52 | PWM4
# 35 | GPIO 42 | PWM0
GPIO_PWM_MAP = {
42: 0, # GPIO42 -> PWM0 (排针35)
46: 2, # GPIO46 -> PWM2 (排针32)
47: 3, # GPIO47 -> PWM3 (排针12)
52: 4, # GPIO52 -> PWM4 (排针33)
61: 1, # GPIO61 -> PWM1 (排针26)
}
# PWM通道对应的FPIOA功能
PWM_FPIOA_MAP = {
0: FPIOA.PWM0,
1: FPIOA.PWM1,
2: FPIOA.PWM2,
3: FPIOA.PWM3,
4: FPIOA.PWM4,
}
class ServoController:
"""舵机控制类,支持角度控制和增量控制"""
def __init__(self, gpio_pin, min_angle=0, max_angle=270, servo_range=None):
"""
初始化舵机控制器
参数:
gpio_pin: GPIO引脚号 (42, 46, 47, 52, 61)
min_angle: 软件最小角度限制 (默认0)
max_angle: 软件最大角度限制 (默认270)
servo_range: 舵机物理最大角度范围 (默认与max_angle相同)
用于脉宽计算如180度舵机填180270度舵机填270
"""
# 检查引脚是否支持
if gpio_pin not in GPIO_PWM_MAP:
raise ValueError("不支持的GPIO引脚请使用42, 46, 47, 52, 61")
self._gpio_pin = gpio_pin
self._pwm_channel = GPIO_PWM_MAP[gpio_pin]
self._min_angle = min_angle
self._max_angle = max_angle
# 舵机物理角度范围,用于脉宽计算
self._servo_range = servo_range if servo_range is not None else max_angle
self._current_angle = (min_angle + max_angle) / 2 # 默认中间位置
# 配置GPIO为PWM功能
self._fpioa = FPIOA()
self._fpioa.set_function(gpio_pin, PWM_FPIOA_MAP[self._pwm_channel])
# 初始化PWM频率50Hz
self._pwm = PWM(self._pwm_channel)
self._pwm.freq(50)
# 移动到初始位置
self._update_pwm()
def _angle_to_duty(self, angle):
"""
将角度转换为PWM占空比值
舵机PWM信号: 周期20ms
- 0.5ms脉宽 -> 0
- 2.5ms脉宽 -> 舵机最大角度
根据servo_range动态计算
"""
# 脉宽范围: 0.5ms - 2.5ms,根据舵机物理范围计算
pulse_ms = 0.5 + (angle / self._servo_range) * 2.0
# 占空比: pulse_ms / 20ms * 65535
duty = int((pulse_ms / 20.0) * 65535)
return duty
def _clamp_angle(self, angle):
"""限制角度在有效范围内"""
if angle < self._min_angle:
return self._min_angle
if angle > self._max_angle:
return self._max_angle
return angle
def _update_pwm(self):
"""更新PWM输出"""
duty = self._angle_to_duty(self._current_angle)
self._pwm.duty_u16(duty)
def set_angle(self, angle):
"""
设置舵机到指定角度
参数:
angle: 目标角度
"""
self._current_angle = self._clamp_angle(angle)
self._update_pwm()
def get_angle(self):
"""
获取当前角度
返回:
当前角度值
"""
return self._current_angle
def rotate(self, direction, step):
"""
增量控制舵机
参数:
direction: 方向1为顺时针(角度增大)-1为逆时针(角度减小)
step: 步进角度 (如0.1, 1, 5)
返回:
转动后的角度
"""
new_angle = self._current_angle + (direction * step)
self._current_angle = self._clamp_angle(new_angle)
self._update_pwm()
return self._current_angle
def center(self):
"""舵机归中,移动到角度范围的中间位置"""
center_angle = (self._min_angle + self._max_angle) / 2
self.set_angle(center_angle)
def deinit(self):
"""释放PWM资源"""
if self._pwm:
self._pwm.deinit()
self._pwm = None

375
k230/state_detection.py Normal file
View File

@ -0,0 +1,375 @@
# state_detection.py
# 状态检测模块 - 通过HTTP请求发送图像进行心情和皮肤状态检测
# 适用于庐山派 K230-CanMV 开发板
import network
import socket
import time
import os
try:
import ubinascii as binascii
except ImportError:
import binascii
try:
import ujson as json
except ImportError:
import json
try:
import ussl as ssl
except ImportError:
try:
import ssl
except ImportError:
ssl = None
class StateDetector:
"""
状态检测类 - 通过HTTP POST请求发送图像数据到服务器进行心情和皮肤状态检测
功能:
- 连接WiFi网络
- 发送图像数据到检测服务器 (POST /api/detection/analyze)
- 接收并解析心情和皮肤状态检测结果
参数:
server_host: 检测服务器主机地址
server_port: 检测服务器端口 (默认80)
api_path: API路径 (默认 "/api/detection/analyze")
使用示例:
detector = StateDetector(server_host="192.168.0.21", server_port=8081)
detector.connect_wifi("SSID", "password")
result = detector.detect(image_data)
print(result["emotion"])
detector.disconnect()
"""
# 情绪类型定义
EMOTIONS = ["angry", "disgust", "fear", "happy", "sad", "surprise", "neutral"]
def __init__(self, server_host, server_port=80, api_path="/api/detection/analyze"):
"""
初始化状态检测器
参数:
server_host: 检测服务器主机地址
server_port: 检测服务器端口
api_path: API路径
"""
self._server_host = server_host
self._server_port = server_port
self._api_path = api_path
self._sta = None
self._is_connected = False
def connect_wifi(self, ssid, password=None, timeout=15):
"""
连接WiFi网络
参数:
ssid: WiFi SSID
password: WiFi密码 (无密码时可为None)
timeout: 连接超时时间
返回:
str: 获取的IP地址
"""
self._sta = network.WLAN(network.STA_IF)
if password:
self._sta.connect(ssid, password)
else:
self._sta.connect(ssid)
print("正在连接WiFi: " + ssid + "...")
start_time = time.time()
while not self._sta.isconnected():
if time.time() - start_time > timeout:
raise RuntimeError("WiFi连接超时: " + ssid)
time.sleep(1)
os.exitpoint()
ip = self._sta.ifconfig()[0]
print("WiFi连接成功! IP: " + ip)
self._is_connected = True
return ip
def detect(self, image_data, width=640, height=480, timestamp=None):
"""
发送图像数据进行状态检测
参数:
image_data: 图像数据 (bytes类型RGB888格式)
width: 图像宽度 (默认640)
height: 图像高度 (默认480)
timestamp: 图像采集时间 (可选格式: "YYYY-MM-DD HH:MM:SS")
返回:
dict: 检测结果
"""
if not self._is_connected:
raise RuntimeError("网络未连接,请先调用 connect_wifi() 或 connect_lan()")
# 添加BMP头将原始RGB数据包装为BMP图片
bmp_image = self._add_bmp_header(image_data, width, height)
# 将BMP图像数据编码为base64
image_base64 = binascii.b2a_base64(bmp_image).decode('utf-8').strip()
# 构建请求体
request_body = {
"type": "status_detection",
"image": image_base64
}
if timestamp:
request_body["timestamp"] = timestamp
else:
# 使用当前时间
t = time.localtime()
ts = "%04d-%02d-%02d %02d:%02d:%02d" % (t[0], t[1], t[2], t[3], t[4], t[5])
request_body["timestamp"] = ts
body_json = json.dumps(request_body)
# 创建socket并发送HTTP请求
response = self._send_http_request(body_json)
# 解析响应
return self._parse_response(response)
def _add_bmp_header(self, rgb_data, width, height):
"""
为RGB888数据添加BMP文件头
"""
# BMP文件头 (14 bytes)
# 0x42, 0x4D ("BM")
# File size (4 bytes)
# Reserved (4 bytes)
# Data offset (4 bytes) -> 54
# DIB Header (40 bytes)
# Header size (4 bytes) -> 40
# Width (4 bytes)
# Height (4 bytes) -> -height for top-down
# Planes (2 bytes) -> 1
# BPP (2 bytes) -> 24
# Compression (4 bytes) -> 0
# Image size (4 bytes)
# X ppm (4 bytes)
# Y ppm (4 bytes)
# Colors used (4 bytes)
# Colors important (4 bytes)
file_size = 54 + len(rgb_data)
# 构建头部
header = bytearray(54)
# BM
header[0], header[1] = 0x42, 0x4D
# File Size
header[2] = file_size & 0xFF
header[3] = (file_size >> 8) & 0xFF
header[4] = (file_size >> 16) & 0xFF
header[5] = (file_size >> 24) & 0xFF
# Data Offset (54)
header[10] = 54
# DIB Header Size (40)
header[14] = 40
# Width
header[18] = width & 0xFF
header[19] = (width >> 8) & 0xFF
header[20] = (width >> 16) & 0xFF
header[21] = (width >> 24) & 0xFF
# Height (Use negative for top-down RGB)
# 2's complement for negative number
h = -height
header[22] = h & 0xFF
header[23] = (h >> 8) & 0xFF
header[24] = (h >> 16) & 0xFF
header[25] = (h >> 24) & 0xFF
# Planes (1)
header[26] = 1
# BPP (24)
header[28] = 24
# Image Size
data_len = len(rgb_data)
header[34] = data_len & 0xFF
header[35] = (data_len >> 8) & 0xFF
header[36] = (data_len >> 16) & 0xFF
header[37] = (data_len >> 24) & 0xFF
return header + rgb_data
def _send_http_request(self, body, max_retries=3):
"""
发送HTTP POST请求
参数:
body: 请求体(JSON字符串)
max_retries: 最大重试次数
返回:
str: HTTP响应内容
"""
# 获取服务器地址
addr_info = None
for attempt in range(max_retries):
try:
addr_info = socket.getaddrinfo(self._server_host, self._server_port)
break
except BaseException:
print("DNS解析重试 (" + str(attempt + 1) + "/" + str(max_retries) + ")")
time.sleep(1)
if not addr_info:
raise RuntimeError("无法解析服务器地址: " + self._server_host)
addr = addr_info[0][-1]
print("连接服务器: " + str(addr))
# 确保body是bytes类型
if isinstance(body, str):
body_bytes = body.encode('utf-8')
else:
body_bytes = body
# 创建socket并连接
s = socket.socket()
try:
s.connect(addr)
# 构建HTTP请求头
host_header = self._server_host
if self._server_port != 80:
host_header = self._server_host + ":" + str(self._server_port)
# 构建头部
header = "POST " + self._api_path + " HTTP/1.1\r\n"
header += "Host: " + host_header + "\r\n"
header += "Content-Type: application/json\r\n"
header += "Content-Length: " + str(len(body_bytes)) + "\r\n"
header += "Connection: close\r\n"
header += "\r\n"
# 发送头部 (确保全部发送)
self._send_all(s, header.encode('utf-8'))
# 发送body (确保全部发送)
self._send_all(s, body_bytes)
# 接收响应
response = b""
while True:
chunk = s.recv(4096)
if not chunk:
break
response += chunk
return response.decode('utf-8')
finally:
s.close()
def _send_all(self, s, data):
"""
辅助函数确保数据全部发送
使用memoryview避免内存复制分块发送
"""
# 使用memoryview避免切片时的内存复制
mv = memoryview(data)
total_len = len(data)
total_sent = 0
chunk_size = 4096 # 每次发送4KB
while total_sent < total_len:
try:
# 计算本次发送的切片
remaining = total_len - total_sent
to_send = min(chunk_size, remaining)
# 发送数据
# mv[start:end] 创建新的memoryview切片不复制数据
sent = s.send(mv[total_sent : total_sent + to_send])
if sent == 0:
raise RuntimeError("Socket连接断开 (sent=0)")
total_sent += sent
except OSError as e:
# EAGAIN (11) or EWOULDBLOCK
if e.args[0] == 11:
time.sleep(0.01)
continue
# 重新抛出其他错误 (如 ECONNRESET)
raise e
def _parse_response(self, response):
"""
解析HTTP响应
参数:
response: HTTP响应字符串
返回:
dict: 解析后的JSON响应体
"""
# 分离HTTP头部和body
if "\r\n\r\n" in response:
header, body = response.split("\r\n\r\n", 1)
elif "\n\n" in response:
header, body = response.split("\n\n", 1)
else:
raise RuntimeError("无效的HTTP响应格式")
# 解析JSON
try:
result = json.loads(body)
return result
except Exception as e:
print("JSON解析错误: " + str(e))
print("响应内容: " + body)
return {
"type": "status_detection_response",
"success": False,
"error": "JSON解析失败: " + str(e)
}
def disconnect(self):
"""断开网络连接"""
if self._sta and self._sta.isconnected():
self._sta.disconnect()
print("WiFi已断开")
self._is_connected = False
@property
def is_connected(self):
"""是否已连接网络"""
return self._is_connected
@property
def server_host(self):
"""服务器主机地址"""
return self._server_host
@property
def server_port(self):
"""服务器端口"""
return self._server_port

3
k230/test.py Normal file
View File

@ -0,0 +1,3 @@
# 查看/sdcard下的文件
import os
print(os.listdir('/sdcard'))

View File

@ -0,0 +1,47 @@
**接口描述**: 发送图像数据,检测用户的心情和皮肤状态
**请求方式**: `POST /api/detection/analyze`
**请求参数**:
```json
{
"type": "status_detection", // 类型:状态检测
"image": "string", // 图像数据base64编码的RGB888格式
"timestamp": "2024-01-01 12:30:45" // 可选,图像采集时间
}
```
**字段说明**:
- `type`: 固定值 "status_detection"
- `image`: base64编码的RGB888格式图像数据
- `timestamp`: 图像采集的时间(可选),精确到秒
**响应参数**:
```json
{
"type": "status_detection_response",
"success": true,
"emotion": "happy", // 检测到的心情7选1
"emotion_confidence": 0.92, // 可选情绪识别置信度0-1
"skin_status": {
"acne": 0-100, // 是否有痘痘
"wrinkles": 0-100, // 是否有皱纹
"pores": 0-100, // 是否有毛孔粗大
"dark_circles": 0-100 // 是否有黑眼圈
},
"timestamp": "2024-01-01 12:30:46" // 检测完成时间
}
```
**emotion可选值**:
- `angry`: 愤怒
- `disgust`: 厌恶
- `fear`: 恐惧
- `happy`: 快乐
- `sad`: 悲伤
- `surprise`: 惊讶
- `neutral`: 中性