Compare commits
10 Commits
d96a7dc599
...
b223b32fb4
| Author | SHA1 | Date | |
|---|---|---|---|
| b223b32fb4 | |||
| 9f2f086cdf | |||
| 91efe70da0 | |||
| 3b49a8d776 | |||
| 2b808dd35b | |||
| bda636b0ef | |||
| d49748b516 | |||
| 20ea76dccb | |||
| 8efb28d725 | |||
| 92f40c1cf4 |
47
k230/02test_servo.py
Normal file
47
k230/02test_servo.py
Normal file
@ -0,0 +1,47 @@
|
||||
# 02test_servo.py
|
||||
# 舵机测试程序:最大 -> 最小 -> 中间
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
from servo_module import ServoController
|
||||
import time
|
||||
import sys
|
||||
|
||||
sys.path.append("/sdcard")
|
||||
|
||||
|
||||
# 创建舵机对象 (GPIO47, 角度范围0-270度)
|
||||
servo = ServoController(gpio_pin=46, min_angle=0, max_angle=360, servo_range=360)
|
||||
servo = ServoController(gpio_pin=61, min_angle=0, max_angle=360, servo_range=360)
|
||||
|
||||
print("舵机测试开始...")
|
||||
|
||||
# try:
|
||||
# # 1. 移动到最大角度
|
||||
# print("移动到最大角度")
|
||||
# servo.set_angle(360)
|
||||
# time.sleep(1)
|
||||
|
||||
# # 2. 移动到最小角度 (0度)
|
||||
# print("移动到最小角度: 0")
|
||||
# servo.set_angle(0)
|
||||
# time.sleep(1)
|
||||
|
||||
# # 3. 回到中间位置 (135度)
|
||||
# print("回到中间位置: 135")
|
||||
# servo.center()
|
||||
# time.sleep(1)
|
||||
|
||||
# print("测试完成!")
|
||||
|
||||
|
||||
# except KeyboardInterrupt:
|
||||
# print("用户停止")
|
||||
# finally:
|
||||
# servo.deinit()
|
||||
# print("舵机资源已释放")
|
||||
|
||||
|
||||
# 360度舵机需要一直发! sg90。 0-180为顺(速度变小) 180-360为逆(速度变大)。
|
||||
while True:
|
||||
servo.set_angle(100)
|
||||
time.sleep(1)
|
||||
124
k230/03test_audio.py
Normal file
124
k230/03test_audio.py
Normal file
@ -0,0 +1,124 @@
|
||||
# # 03test_audio.py
|
||||
# # 音频测试程序:先录制再播放
|
||||
# # 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
# from audio_module import AudioRecorder, AudioPlayer
|
||||
# import os
|
||||
# import sys
|
||||
# import time
|
||||
|
||||
# sys.path.append("/sdcard")
|
||||
# os.exitpoint(os.EXITPOINT_ENABLE)
|
||||
|
||||
|
||||
# # 配置
|
||||
# AUDIO_FILE = '/sdcard/test_record.wav'
|
||||
# RECORD_DURATION = 5 # 录制5秒
|
||||
|
||||
# print("=" * 40)
|
||||
# print("音频录制和播放测试")
|
||||
# print("=" * 40)
|
||||
|
||||
# # ===== 录制音频 =====
|
||||
# print("\n[1] 开始录制音频...")
|
||||
# print(" 时长: {}秒".format(RECORD_DURATION))
|
||||
# print(" 请对着麦克风说话...")
|
||||
|
||||
# recorder = AudioRecorder()
|
||||
# result = recorder.record_to_file(AUDIO_FILE, RECORD_DURATION)
|
||||
|
||||
# if result:
|
||||
# print(" 录制完成!")
|
||||
# else:
|
||||
# print(" 录制失败!")
|
||||
|
||||
# # 等待一下
|
||||
# time.sleep(1)
|
||||
|
||||
# # ===== 播放音频 =====
|
||||
# print("\n[2] 开始播放录制的音频...")
|
||||
# print(" 请确保已插入3.5mm耳机")
|
||||
|
||||
# player = AudioPlayer()
|
||||
# result = player.play_file(AUDIO_FILE)
|
||||
|
||||
# if result:
|
||||
# print(" 播放完成!")
|
||||
# else:
|
||||
# print(" 播放失败!")
|
||||
|
||||
# print("\n" + "=" * 40)
|
||||
# print("测试结束")
|
||||
# print("=" * 40)
|
||||
|
||||
|
||||
# ===== 同时录播测试 =====
|
||||
# 使用流式接口实现:麦克风录制 -> 耳机播放
|
||||
|
||||
from media.pyaudio import *
|
||||
from media.media import *
|
||||
import os
|
||||
import sys
|
||||
sys.path.append("/sdcard")
|
||||
os.exitpoint(os.EXITPOINT_ENABLE)
|
||||
|
||||
|
||||
DURATION = 15 # 持续15秒
|
||||
RATE = 44100
|
||||
CHANNELS = 1
|
||||
CHUNK = int(RATE / 25)
|
||||
FORMAT = paInt16
|
||||
|
||||
print("=" * 40)
|
||||
print("同时录播测试 ({}秒)".format(DURATION))
|
||||
print("请插入耳机并对着麦克风说话")
|
||||
print("=" * 40)
|
||||
|
||||
try:
|
||||
p = PyAudio()
|
||||
p.initialize(CHUNK)
|
||||
MediaManager.init()
|
||||
|
||||
# 创建输入流 (录制)
|
||||
input_stream = p.open(
|
||||
format=FORMAT,
|
||||
channels=CHANNELS,
|
||||
rate=RATE,
|
||||
input=True,
|
||||
frames_per_buffer=CHUNK
|
||||
)
|
||||
|
||||
# 创建输出流 (播放)
|
||||
output_stream = p.open(
|
||||
format=FORMAT,
|
||||
channels=CHANNELS,
|
||||
rate=RATE,
|
||||
output=True,
|
||||
frames_per_buffer=CHUNK
|
||||
)
|
||||
|
||||
print("开始录播...")
|
||||
|
||||
# 实时录播
|
||||
total_chunks = int(RATE / CHUNK * DURATION)
|
||||
for i in range(total_chunks):
|
||||
data = input_stream.read()
|
||||
output_stream.write(data)
|
||||
try:
|
||||
os.exitpoint()
|
||||
except KeyboardInterrupt:
|
||||
print("用户停止")
|
||||
break
|
||||
|
||||
print("录播完成!")
|
||||
|
||||
except BaseException as e:
|
||||
print("异常: ", e)
|
||||
finally:
|
||||
input_stream.stop_stream()
|
||||
output_stream.stop_stream()
|
||||
input_stream.close()
|
||||
output_stream.close()
|
||||
p.terminate()
|
||||
MediaManager.deinit()
|
||||
print("资源已释放")
|
||||
65
k230/04test_face_detect.py
Normal file
65
k230/04test_face_detect.py
Normal file
@ -0,0 +1,65 @@
|
||||
# 04test_face_detect.py
|
||||
# 人脸检测测试程序
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
from face_detect_module import FaceDetector
|
||||
import os
|
||||
import sys
|
||||
import gc
|
||||
|
||||
sys.path.append("/sdcard")
|
||||
os.exitpoint(os.EXITPOINT_ENABLE)
|
||||
|
||||
|
||||
# ============ 配置 ============
|
||||
DISPLAY_MODE = "lcd" # 显示模式: "lcd" 或 "hdmi"
|
||||
CONFIDENCE = 0.5 # 置信度阈值
|
||||
# ==============================
|
||||
|
||||
print("=" * 40)
|
||||
print("人脸检测测试")
|
||||
print("显示模式: {}".format(DISPLAY_MODE))
|
||||
print("=" * 40)
|
||||
|
||||
# 创建人脸检测器
|
||||
detector = FaceDetector(
|
||||
display_mode=DISPLAY_MODE,
|
||||
confidence_threshold=CONFIDENCE
|
||||
)
|
||||
|
||||
try:
|
||||
# 启动检测器
|
||||
print("正在初始化...")
|
||||
detector.start()
|
||||
print("初始化完成,开始检测...")
|
||||
|
||||
while True:
|
||||
os.exitpoint()
|
||||
|
||||
# 检测人脸
|
||||
img, faces = detector.detect()
|
||||
|
||||
# 打印检测结果
|
||||
if faces:
|
||||
print("检测到 {} 个人脸:".format(len(faces)))
|
||||
for i, (x, y, w, h, score) in enumerate(faces):
|
||||
print(" 人脸{}: x={}, y={}, w={}, h={}, score={:.2f}".format(
|
||||
i + 1, x, y, w, h, score
|
||||
))
|
||||
|
||||
# 绘制检测框
|
||||
detector.draw_boxes(color=(255, 255, 0, 255), thickness=2)
|
||||
|
||||
# 显示结果
|
||||
detector.show()
|
||||
|
||||
gc.collect()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n用户停止")
|
||||
except Exception as e:
|
||||
print("异常: ", e)
|
||||
sys.print_exception(e)
|
||||
finally:
|
||||
detector.stop()
|
||||
print("检测器已停止")
|
||||
110
k230/05test_state_detection.py
Normal file
110
k230/05test_state_detection.py
Normal file
@ -0,0 +1,110 @@
|
||||
# 05test_state_detection.py
|
||||
# 状态检测测试 - 简化版:仅发送一次请求后退出
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
import time
|
||||
import os
|
||||
import gc
|
||||
import camera_module
|
||||
from state_detection import StateDetector
|
||||
|
||||
# ========== 配置参数 ==========
|
||||
|
||||
# WiFi配置
|
||||
#WIFI_SSID = "dongshengwu"
|
||||
#WIFI_PASSWORD = "wds666666"
|
||||
WIFI_SSID = "ZTE_969121"
|
||||
WIFI_PASSWORD = "wds666666"
|
||||
|
||||
# 服务器配置
|
||||
#SERVER_HOST = "172.20.10.9"
|
||||
SERVER_HOST = "192.168.0.21"
|
||||
SERVER_PORT = 8081
|
||||
API_PATH = "/api/detection/analyze"
|
||||
|
||||
# 摄像头配置
|
||||
CAMERA_WIDTH = 640
|
||||
CAMERA_HEIGHT = 480
|
||||
|
||||
# ========== 辅助函数 ==========
|
||||
|
||||
|
||||
def image_to_rgb888(img):
|
||||
"""将图像转换为RGB888格式的bytes数据"""
|
||||
try:
|
||||
if hasattr(img, 'to_bytes'):
|
||||
return img.to_bytes()
|
||||
if hasattr(img, 'tobytes'):
|
||||
return img.tobytes()
|
||||
return bytes(img)
|
||||
except Exception as e:
|
||||
print("图像转换错误: " + str(e))
|
||||
return None
|
||||
|
||||
|
||||
def test_single_detection():
|
||||
"""执行单次状态检测测试"""
|
||||
print("=" * 50)
|
||||
print("状态检测测试 (单次运行模式)")
|
||||
print("=" * 50)
|
||||
|
||||
state_detector = StateDetector(SERVER_HOST, SERVER_PORT, API_PATH)
|
||||
|
||||
try:
|
||||
# 1. 初始化并连接网络
|
||||
print("连接WiFi: " + WIFI_SSID)
|
||||
state_detector.connect_wifi(WIFI_SSID, WIFI_PASSWORD)
|
||||
|
||||
# 2. 初始化摄像头
|
||||
print("启动摄像头...")
|
||||
camera_module.camera_init(CAMERA_WIDTH, CAMERA_HEIGHT)
|
||||
camera_module.camera_start()
|
||||
|
||||
# 等待摄像头稳定
|
||||
time.sleep(1)
|
||||
|
||||
# 3. 拍照并转换
|
||||
print("正在拍照...")
|
||||
img = camera_module.camera_snapshot()
|
||||
rgb888_data = image_to_rgb888(img)
|
||||
|
||||
if rgb888_data:
|
||||
# 4. 发送请求
|
||||
print("发送图像到服务器 (" + str(len(rgb888_data)) + " bytes)...")
|
||||
result = state_detector.detect(
|
||||
rgb888_data, width=CAMERA_WIDTH, height=CAMERA_HEIGHT)
|
||||
|
||||
# 5. 输出结果
|
||||
print("\n" + "=" * 15 + " 检测结果 " + "=" * 15)
|
||||
if result.get("success"):
|
||||
print(
|
||||
"心情: %s (置信度: %.2f)" %
|
||||
(result.get('emotion'), result.get(
|
||||
'emotion_confidence', 0)))
|
||||
skin = result.get("skin_status", {})
|
||||
print(
|
||||
"皮肤状态: 痘痘:%d, 皱纹:%d, 毛孔:%d, 黑眼圈:%d" %
|
||||
(skin.get(
|
||||
'acne', 0), skin.get(
|
||||
'wrinkles', 0), skin.get(
|
||||
'pores', 0), skin.get(
|
||||
'dark_circles', 0)))
|
||||
else:
|
||||
print("检测失败: " + str(result.get('error', '未知错误')))
|
||||
print("=" * 40)
|
||||
else:
|
||||
print("错误: 无法获取图像数据")
|
||||
|
||||
except Exception as e:
|
||||
print("测试出错: " + str(e))
|
||||
import sys
|
||||
sys.print_exception(e)
|
||||
finally:
|
||||
print("\n正在清理资源...")
|
||||
state_detector.disconnect()
|
||||
camera_module.camera_deinit()
|
||||
print("测试完成,程序退出")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_single_detection()
|
||||
BIN
k230/HaveFace.wav
Normal file
BIN
k230/HaveFace.wav
Normal file
Binary file not shown.
248
k230/agent_client_http.py
Normal file
248
k230/agent_client_http.py
Normal file
@ -0,0 +1,248 @@
|
||||
import socket
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
import network
|
||||
import struct
|
||||
import gc
|
||||
from audio_module import AudioPlayer
|
||||
|
||||
# Try importing wave, handle different environments
|
||||
try:
|
||||
import media.wave as wave
|
||||
except ImportError:
|
||||
try:
|
||||
import wave
|
||||
except ImportError:
|
||||
wave = None
|
||||
|
||||
|
||||
class AgentClientHTTP:
|
||||
"""
|
||||
K230 Agent Client (HTTP)
|
||||
"""
|
||||
|
||||
def __init__(self, server_host, server_port=8000):
|
||||
self._server_host = server_host
|
||||
self._server_port = server_port
|
||||
self._sta = None
|
||||
self._is_connected = False
|
||||
|
||||
def connect_wifi(self, ssid, password=None, timeout=15):
|
||||
"""
|
||||
Connect to WiFi
|
||||
"""
|
||||
self._sta = network.WLAN(network.STA_IF)
|
||||
self._sta.active(True)
|
||||
|
||||
if password:
|
||||
self._sta.connect(ssid, password)
|
||||
else:
|
||||
self._sta.connect(ssid)
|
||||
|
||||
print("正在连接WiFi: " + ssid + "...")
|
||||
|
||||
start_time = time.time()
|
||||
while not self._sta.isconnected():
|
||||
if time.time() - start_time > timeout:
|
||||
raise RuntimeError("WiFi连接超时: " + ssid)
|
||||
time.sleep(1)
|
||||
try:
|
||||
os.exitpoint()
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
ip = self._sta.ifconfig()[0]
|
||||
print("WiFi连接成功! IP: " + ip)
|
||||
self._is_connected = True
|
||||
return ip
|
||||
|
||||
def disconnect(self):
|
||||
"""断开网络连接"""
|
||||
if self._sta:
|
||||
if self._sta.isconnected():
|
||||
self._sta.disconnect()
|
||||
self._sta.active(False)
|
||||
print("WiFi已断开,网卡已关闭。")
|
||||
self._is_connected = False
|
||||
|
||||
def trigger_abnormal_state(self, reason, context_data=None):
|
||||
"""
|
||||
Trigger abnormal state and save response as WAV.
|
||||
"""
|
||||
url_path = "/abnormal_trigger"
|
||||
|
||||
payload = {
|
||||
"type": "abnormal_trigger",
|
||||
"trigger_reason": reason,
|
||||
"enable_streaming": True,
|
||||
"context_data": context_data or {}
|
||||
}
|
||||
|
||||
body_json = json.dumps(payload)
|
||||
print(f"[AgentHTTP] 请求服务器: {self._server_host}:{self._server_port}...")
|
||||
|
||||
try:
|
||||
# Send Request
|
||||
response_data = self._send_http_request(url_path, body_json)
|
||||
|
||||
# Parse Header/Body
|
||||
header_end = response_data.find(b"\r\n\r\n")
|
||||
if header_end == -1:
|
||||
print("[AgentHTTP] 响应格式错误")
|
||||
return None
|
||||
|
||||
body_bytes = response_data[header_end + 4:]
|
||||
|
||||
if len(body_bytes) == 0:
|
||||
print("[AgentHTTP] 响应正文为空")
|
||||
return None
|
||||
|
||||
print(f"[AgentHTTP] 成功接收音频数据: {len(body_bytes)} 字节")
|
||||
|
||||
# Save as WAV file
|
||||
# TTS server returns 24000Hz, 1 channel, 16bit (2 bytes) PCM
|
||||
wav_file = "/sdcard/agent_response.wav"
|
||||
self._save_wav(wav_file, body_bytes, rate=24000, channels=1, sampwidth=2)
|
||||
|
||||
return wav_file
|
||||
|
||||
except Exception as e:
|
||||
print(f"[AgentHTTP] 请求失败: {e}")
|
||||
return None
|
||||
|
||||
def _save_wav(self, filename, pcm_data, rate=24000, channels=1, sampwidth=2):
|
||||
"""
|
||||
Save PCM data with WAV header
|
||||
"""
|
||||
print(f"[AgentHTTP] 保存 WAV 文件: {filename} (Rate: {rate}, Ch: {channels})")
|
||||
try:
|
||||
if wave:
|
||||
# Use wave module if available
|
||||
wf = wave.open(filename, 'wb')
|
||||
wf.set_channels(channels)
|
||||
wf.set_sampwidth(sampwidth)
|
||||
wf.set_framerate(rate)
|
||||
wf.write_frames(pcm_data)
|
||||
wf.close()
|
||||
else:
|
||||
# Manual WAV header creation
|
||||
total_len = len(pcm_data) + 36
|
||||
header = struct.pack(
|
||||
'<4sI4s4sIHHIIHH4sI',
|
||||
b'RIFF',
|
||||
total_len,
|
||||
b'WAVE',
|
||||
b'fmt ',
|
||||
16,
|
||||
1,
|
||||
channels,
|
||||
rate,
|
||||
rate * channels * sampwidth,
|
||||
channels * sampwidth,
|
||||
sampwidth * 8,
|
||||
b'data',
|
||||
len(pcm_data))
|
||||
with open(filename, 'wb') as f:
|
||||
f.write(header)
|
||||
f.write(pcm_data)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[AgentHTTP] 保存 WAV 失败: {e}")
|
||||
|
||||
def play_audio_file(self, file_path):
|
||||
"""
|
||||
Play audio file using AudioPlayer.play_file (handles WAV)
|
||||
"""
|
||||
if not file_path:
|
||||
return
|
||||
|
||||
print(f"[AgentHTTP] 正在播放: {file_path}...")
|
||||
|
||||
player = None
|
||||
try:
|
||||
player = AudioPlayer()
|
||||
# play_file will init and start stream, write frames, and then deinit in
|
||||
# finally block
|
||||
player.play_file(file_path)
|
||||
|
||||
print("[AgentHTTP] 播放完成。")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[AgentHTTP] 播放出错: {e}")
|
||||
finally:
|
||||
# Double check deinit to prevent noise/resource leak
|
||||
if player:
|
||||
try:
|
||||
if player.is_running: # Check if flag exists
|
||||
player.deinit()
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
def _send_http_request(self, path, body_json):
|
||||
"""
|
||||
Send HTTP POST
|
||||
"""
|
||||
addr_info = socket.getaddrinfo(self._server_host, self._server_port)
|
||||
addr = addr_info[0][-1]
|
||||
|
||||
s = socket.socket()
|
||||
s.settimeout(10)
|
||||
|
||||
try:
|
||||
s.connect(addr)
|
||||
body_bytes = body_json.encode('utf-8')
|
||||
|
||||
req = f"POST {path} HTTP/1.1\r\n"
|
||||
req += f"Host: {self._server_host}:{self._server_port}\r\n"
|
||||
req += "Content-Type: application/json\r\n"
|
||||
req += f"Content-Length: {len(body_bytes)}\r\n"
|
||||
req += "Connection: close\r\n\r\n"
|
||||
|
||||
s.send(req.encode('utf-8'))
|
||||
s.send(body_bytes)
|
||||
|
||||
response = bytearray()
|
||||
while True:
|
||||
chunk = s.recv(4096)
|
||||
if not chunk:
|
||||
break
|
||||
response.extend(chunk)
|
||||
|
||||
return response
|
||||
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# ========== CONFIG ==========
|
||||
# WIFI_SSID = "dongshengwu"
|
||||
WIFI_SSID = "ZTE_969121"
|
||||
WIFI_PASSWORD = "wds666666"
|
||||
# SERVER_HOST = "172.20.10.2"
|
||||
SERVER_HOST = "192.168.0.93"
|
||||
SERVER_PORT = 8000
|
||||
# ============================
|
||||
|
||||
client = AgentClientHTTP(server_host=SERVER_HOST, server_port=SERVER_PORT)
|
||||
|
||||
try:
|
||||
# 1. Connect
|
||||
client.connect_wifi(WIFI_SSID, WIFI_PASSWORD)
|
||||
|
||||
# 2. Trigger & Save
|
||||
print("\n[测试] 触发皮肤状态异常提示...")
|
||||
wav_file = client.trigger_abnormal_state("poor_skin")
|
||||
|
||||
# 3. Play
|
||||
if wav_file:
|
||||
client.play_audio_file(wav_file)
|
||||
|
||||
except Exception as e:
|
||||
print("程序异常: " + str(e))
|
||||
finally:
|
||||
# 4. Clean up resources
|
||||
print("\n[资源清理] 断开 WiFi 并回收资源...")
|
||||
gc.collect()
|
||||
print("测试结束,已安全退出。")
|
||||
314
k230/audio_module.py
Normal file
314
k230/audio_module.py
Normal file
@ -0,0 +1,314 @@
|
||||
# audio_module.py
|
||||
# 音频录制和播放模块
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
import os
|
||||
from media.media import *
|
||||
from media.pyaudio import *
|
||||
import media.wave as wave
|
||||
|
||||
|
||||
class AudioRecorder:
|
||||
"""
|
||||
音频录制类 - 使用板载麦克风录制音频
|
||||
|
||||
参数:
|
||||
rate: 采样率 (默认44100Hz)
|
||||
channels: 声道数 (默认1,单声道)
|
||||
format: 采样精度 (默认paInt16,16位)
|
||||
|
||||
使用方式1 - 直接录制到文件:
|
||||
recorder = AudioRecorder()
|
||||
recorder.record_to_file('/sdcard/test.wav', duration=5)
|
||||
recorder.deinit()
|
||||
|
||||
使用方式2 - 流式录制:
|
||||
recorder = AudioRecorder()
|
||||
recorder.start()
|
||||
for i in range(100):
|
||||
data = recorder.read() # 获取一帧数据
|
||||
# 处理数据...
|
||||
recorder.save('/sdcard/test.wav', frames)
|
||||
recorder.stop()
|
||||
recorder.deinit()
|
||||
"""
|
||||
|
||||
def __init__(self, rate=44100, channels=1, format=paInt16):
|
||||
"""
|
||||
初始化音频录制器
|
||||
|
||||
参数:
|
||||
rate: 采样率,默认44100Hz
|
||||
channels: 声道数,默认1(单声道)
|
||||
format: 采样精度,默认paInt16(16位)
|
||||
"""
|
||||
self._rate = rate
|
||||
self._channels = channels
|
||||
self._format = format
|
||||
self._chunk = int(rate / 25)
|
||||
|
||||
self._pyaudio = None
|
||||
self._stream = None
|
||||
self._is_running = False
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
启动录制流(流式录制模式)
|
||||
调用后可以使用 read() 方法获取音频数据
|
||||
"""
|
||||
if self._is_running:
|
||||
return
|
||||
|
||||
self._pyaudio = PyAudio()
|
||||
self._pyaudio.initialize(self._chunk)
|
||||
MediaManager.init()
|
||||
|
||||
self._stream = self._pyaudio.open(
|
||||
format=self._format,
|
||||
channels=self._channels,
|
||||
rate=self._rate,
|
||||
input=True,
|
||||
frames_per_buffer=self._chunk
|
||||
)
|
||||
self._is_running = True
|
||||
|
||||
def read(self):
|
||||
"""
|
||||
读取一帧音频数据(流式录制模式)
|
||||
|
||||
返回:
|
||||
bytes: 一帧音频数据
|
||||
"""
|
||||
if not self._is_running:
|
||||
raise RuntimeError("录制器未启动,请先调用 start()")
|
||||
return self._stream.read()
|
||||
|
||||
def stop(self):
|
||||
"""停止录制流"""
|
||||
if self._stream:
|
||||
self._stream.stop_stream()
|
||||
self._stream.close()
|
||||
self._stream = None
|
||||
self._is_running = False
|
||||
|
||||
def deinit(self):
|
||||
"""释放所有资源"""
|
||||
self.stop()
|
||||
if self._pyaudio:
|
||||
self._pyaudio.terminate()
|
||||
self._pyaudio = None
|
||||
MediaManager.deinit()
|
||||
|
||||
def save(self, filename, frames):
|
||||
"""
|
||||
将音频帧数据保存为WAV文件
|
||||
|
||||
参数:
|
||||
filename: 保存的文件路径
|
||||
frames: 音频帧数据列表
|
||||
"""
|
||||
wf = wave.open(filename, 'wb')
|
||||
wf.set_channels(self._channels)
|
||||
wf.set_sampwidth(self._pyaudio.get_sample_size(self._format))
|
||||
wf.set_framerate(self._rate)
|
||||
wf.write_frames(b''.join(frames))
|
||||
wf.close()
|
||||
|
||||
def record_to_file(self, filename, duration):
|
||||
"""
|
||||
直接录制音频到WAV文件
|
||||
|
||||
参数:
|
||||
filename: 保存的文件路径 (如 '/sdcard/test.wav')
|
||||
duration: 录制时长 (秒)
|
||||
|
||||
返回:
|
||||
bool: True成功, False失败
|
||||
"""
|
||||
try:
|
||||
self.start()
|
||||
|
||||
frames = []
|
||||
total_chunks = int(self._rate / self._chunk * duration)
|
||||
|
||||
for i in range(total_chunks):
|
||||
data = self.read()
|
||||
frames.append(data)
|
||||
# 检查退出信号
|
||||
try:
|
||||
os.exitpoint()
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
self.save(filename, frames)
|
||||
return True
|
||||
|
||||
except BaseException as e:
|
||||
print("录制异常: ", e)
|
||||
return False
|
||||
finally:
|
||||
self.stop()
|
||||
self.deinit()
|
||||
|
||||
@property
|
||||
def is_running(self):
|
||||
"""是否正在录制"""
|
||||
return self._is_running
|
||||
|
||||
@property
|
||||
def chunk_size(self):
|
||||
"""每帧数据大小"""
|
||||
return self._chunk
|
||||
|
||||
@property
|
||||
def rate(self):
|
||||
"""采样率"""
|
||||
return self._rate
|
||||
|
||||
|
||||
class AudioPlayer:
|
||||
"""
|
||||
音频播放类 - 通过3.5mm耳机接口播放音频
|
||||
|
||||
使用方式1 - 直接播放文件:
|
||||
player = AudioPlayer()
|
||||
player.play_file('/sdcard/test.wav')
|
||||
player.deinit()
|
||||
|
||||
使用方式2 - 流式播放:
|
||||
player = AudioPlayer(rate=44100, channels=1)
|
||||
player.start()
|
||||
player.write(audio_data) # 写入音频数据
|
||||
player.stop()
|
||||
player.deinit()
|
||||
"""
|
||||
|
||||
def __init__(self, rate=44100, channels=1, format=paInt16):
|
||||
"""
|
||||
初始化音频播放器
|
||||
|
||||
参数:
|
||||
rate: 采样率,默认44100Hz
|
||||
channels: 声道数,默认1(单声道)
|
||||
format: 采样精度,默认paInt16(16位)
|
||||
"""
|
||||
self._rate = rate
|
||||
self._channels = channels
|
||||
self._format = format
|
||||
self._chunk = int(rate / 25)
|
||||
|
||||
self._pyaudio = None
|
||||
self._stream = None
|
||||
self._is_running = False
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
启动播放流(流式播放模式)
|
||||
调用后可以使用 write() 方法写入音频数据
|
||||
"""
|
||||
if self._is_running:
|
||||
return
|
||||
|
||||
self._pyaudio = PyAudio()
|
||||
self._pyaudio.initialize(self._chunk)
|
||||
MediaManager.init()
|
||||
|
||||
self._stream = self._pyaudio.open(
|
||||
format=self._format,
|
||||
channels=self._channels,
|
||||
rate=self._rate,
|
||||
output=True,
|
||||
frames_per_buffer=self._chunk
|
||||
)
|
||||
self._is_running = True
|
||||
|
||||
def write(self, data):
|
||||
"""
|
||||
写入音频数据进行播放(流式播放模式)
|
||||
|
||||
参数:
|
||||
data: 音频数据 (bytes)
|
||||
"""
|
||||
if not self._is_running:
|
||||
raise RuntimeError("播放器未启动,请先调用 start()")
|
||||
self._stream.write(data)
|
||||
|
||||
def stop(self):
|
||||
"""停止播放流"""
|
||||
if self._stream:
|
||||
self._stream.stop_stream()
|
||||
self._stream.close()
|
||||
self._stream = None
|
||||
self._is_running = False
|
||||
|
||||
def deinit(self):
|
||||
"""释放所有资源"""
|
||||
self.stop()
|
||||
if self._pyaudio:
|
||||
self._pyaudio.terminate()
|
||||
self._pyaudio = None
|
||||
MediaManager.deinit()
|
||||
|
||||
def play_file(self, filename):
|
||||
"""
|
||||
播放WAV音频文件
|
||||
|
||||
参数:
|
||||
filename: WAV文件路径 (如 '/sdcard/test.wav')
|
||||
|
||||
返回:
|
||||
bool: True成功, False失败
|
||||
"""
|
||||
wf = None
|
||||
try:
|
||||
wf = wave.open(filename, 'rb')
|
||||
chunk = int(wf.get_framerate() / 25)
|
||||
|
||||
self._pyaudio = PyAudio()
|
||||
self._pyaudio.initialize(chunk)
|
||||
MediaManager.init()
|
||||
|
||||
self._stream = self._pyaudio.open(
|
||||
format=self._pyaudio.get_format_from_width(wf.get_sampwidth()),
|
||||
channels=wf.get_channels(),
|
||||
rate=wf.get_framerate(),
|
||||
output=True,
|
||||
frames_per_buffer=chunk
|
||||
)
|
||||
self._is_running = True
|
||||
|
||||
data = wf.read_frames(chunk)
|
||||
while data:
|
||||
self._stream.write(data)
|
||||
data = wf.read_frames(chunk)
|
||||
# 检查退出信号
|
||||
try:
|
||||
os.exitpoint()
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
return True
|
||||
|
||||
except BaseException as e:
|
||||
print("播放异常: ", e)
|
||||
return False
|
||||
finally:
|
||||
if wf:
|
||||
wf.close()
|
||||
self.stop()
|
||||
self.deinit()
|
||||
|
||||
@property
|
||||
def is_running(self):
|
||||
"""是否正在播放"""
|
||||
return self._is_running
|
||||
|
||||
@property
|
||||
def chunk_size(self):
|
||||
"""每帧数据大小"""
|
||||
return self._chunk
|
||||
|
||||
@property
|
||||
def rate(self):
|
||||
"""采样率"""
|
||||
return self._rate
|
||||
59
k230/boot.py
Normal file
59
k230/boot.py
Normal file
@ -0,0 +1,59 @@
|
||||
# audio input and output example
|
||||
#
|
||||
# Note: You will need an SD card to run this example.
|
||||
#
|
||||
# You can play wav files or capture audio to save as wav
|
||||
|
||||
import os
|
||||
from media.media import * # 导入media模块,用于初始化vb buffer
|
||||
from media.pyaudio import * # 导入pyaudio模块,用于采集和播放音频
|
||||
import media.wave as wave # 导入wav模块,用于保存和加载wav音频文件
|
||||
|
||||
|
||||
def exit_check():
|
||||
try:
|
||||
os.exitpoint()
|
||||
except KeyboardInterrupt as e:
|
||||
print("user stop: ", e)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def play_audio(filename):
|
||||
try:
|
||||
wf = wave.open(filename, 'rb') # 打开wav文件
|
||||
CHUNK = int(wf.get_framerate() / 25) # 设置音频chunk值
|
||||
|
||||
p = PyAudio()
|
||||
p.initialize(CHUNK) # 初始化PyAudio对象
|
||||
MediaManager.init() # vb buffer初始化
|
||||
|
||||
# 创建音频输出流,设置的音频参数均为wave中获取到的参数
|
||||
stream = p.open(format=p.get_format_from_width(wf.get_sampwidth()),
|
||||
channels=wf.get_channels(),
|
||||
rate=wf.get_framerate(),
|
||||
output=True, frames_per_buffer=CHUNK)
|
||||
|
||||
data = wf.read_frames(CHUNK) # 从wav文件中读取数一帧数据
|
||||
|
||||
while data:
|
||||
stream.write(data) # 将帧数据写入到音频输出流中
|
||||
data = wf.read_frames(CHUNK) # 从wav文件中读取数一帧数据
|
||||
if exit_check():
|
||||
break
|
||||
except BaseException as e:
|
||||
print(f"Exception {e}")
|
||||
finally:
|
||||
stream.stop_stream() # 停止音频输出流
|
||||
stream.close() # 关闭音频输出流
|
||||
p.terminate() # 释放音频对象
|
||||
wf.close() # 关闭wav文件
|
||||
|
||||
MediaManager.deinit() # 释放vb buffer
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
os.exitpoint(os.EXITPOINT_ENABLE)
|
||||
print("play_audio sample start")
|
||||
play_audio('/sdcard/hello.wav') # 播放wav文件
|
||||
print("play_audio sample done")
|
||||
@ -40,8 +40,8 @@ def camera_init(width, height, chn=CAM_CHN_ID_0, hmirror=False, vflip=False):
|
||||
|
||||
# 设置通道的输出尺寸
|
||||
sensor.set_framesize(width=width, height=height, chn=chn)
|
||||
# 设置通道的输出像素格式为RGB565
|
||||
sensor.set_pixformat(Sensor.RGB565, chn=chn)
|
||||
# 设置通道的输出像素格式为RGB888
|
||||
sensor.set_pixformat(Sensor.RGB888, chn=chn)
|
||||
|
||||
# 初始化媒体管理器
|
||||
MediaManager.init()
|
||||
|
||||
280
k230/face_detect_module.py
Normal file
280
k230/face_detect_module.py
Normal file
@ -0,0 +1,280 @@
|
||||
# face_detect_module.py
|
||||
# 人脸检测模块
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
from libs.PipeLine import PipeLine, ScopedTiming
|
||||
from libs.AIBase import AIBase
|
||||
from libs.AI2D import Ai2d
|
||||
import os
|
||||
import nncase_runtime as nn
|
||||
import ulab.numpy as np
|
||||
import aidemo
|
||||
import gc
|
||||
|
||||
|
||||
class FaceDetector:
|
||||
"""
|
||||
人脸检测类 - 使用K230的AI能力检测人脸
|
||||
|
||||
功能:
|
||||
- 检测图像中的人脸
|
||||
- 返回人脸边界框坐标 (x, y, w, h, score)
|
||||
- 支持在图像上绘制检测框
|
||||
|
||||
参数:
|
||||
display_mode: 显示模式 "lcd" 或 "hdmi"
|
||||
confidence_threshold: 置信度阈值 (默认0.5)
|
||||
nms_threshold: 非极大值抑制阈值 (默认0.2)
|
||||
debug_mode: 调试模式开关 (默认0关闭)
|
||||
|
||||
使用示例:
|
||||
detector = FaceDetector(display_mode="lcd")
|
||||
detector.start()
|
||||
while True:
|
||||
img, faces = detector.detect()
|
||||
# faces = [(x, y, w, h, score), ...]
|
||||
detector.show()
|
||||
detector.stop()
|
||||
"""
|
||||
|
||||
# 模型配置
|
||||
KMODEL_PATH = "/sdcard/examples/kmodel/face_detection_320.kmodel"
|
||||
ANCHORS_PATH = "/sdcard/examples/utils/prior_data_320.bin"
|
||||
MODEL_INPUT_SIZE = [320, 320]
|
||||
ANCHOR_LEN = 4200
|
||||
DET_DIM = 4
|
||||
|
||||
# 显示配置
|
||||
DISPLAY_CONFIGS = {
|
||||
"lcd": {
|
||||
"display_size": [800, 480],
|
||||
"rgb888p_size": [1920, 1080],
|
||||
},
|
||||
"hdmi": {
|
||||
"display_size": [1920, 1080],
|
||||
"rgb888p_size": [1920, 1080],
|
||||
}
|
||||
}
|
||||
|
||||
def __init__(self, display_mode="lcd", confidence_threshold=0.5,
|
||||
nms_threshold=0.2, debug_mode=0):
|
||||
"""
|
||||
初始化人脸检测器
|
||||
|
||||
参数:
|
||||
display_mode: 显示模式 "lcd" 或 "hdmi"
|
||||
confidence_threshold: 置信度阈值,值越高过滤越严格
|
||||
nms_threshold: NMS阈值,防止重复检测
|
||||
debug_mode: 调试模式,1开启计时输出
|
||||
"""
|
||||
if display_mode not in self.DISPLAY_CONFIGS:
|
||||
raise ValueError("display_mode必须是 'lcd' 或 'hdmi'")
|
||||
|
||||
self._display_mode = display_mode
|
||||
self._confidence_threshold = confidence_threshold
|
||||
self._nms_threshold = nms_threshold
|
||||
self._debug_mode = debug_mode
|
||||
|
||||
# 获取显示配置
|
||||
config = self.DISPLAY_CONFIGS[display_mode]
|
||||
self._display_size = config["display_size"]
|
||||
self._rgb888p_size = config["rgb888p_size"]
|
||||
|
||||
# 加载锚点数据
|
||||
self._anchors = np.fromfile(self.ANCHORS_PATH, dtype=np.float)
|
||||
self._anchors = self._anchors.reshape((self.ANCHOR_LEN, self.DET_DIM))
|
||||
|
||||
self._pipeline = None
|
||||
self._face_det = None
|
||||
self._is_running = False
|
||||
self._last_faces = []
|
||||
|
||||
def start(self):
|
||||
"""启动人脸检测器,初始化摄像头和模型"""
|
||||
if self._is_running:
|
||||
return
|
||||
|
||||
# 初始化Pipeline
|
||||
self._pipeline = PipeLine(
|
||||
rgb888p_size=self._rgb888p_size,
|
||||
display_size=self._display_size,
|
||||
display_mode=self._display_mode
|
||||
)
|
||||
self._pipeline.create()
|
||||
|
||||
# 初始化人脸检测模型
|
||||
self._face_det = _FaceDetectionApp(
|
||||
kmodel_path=self.KMODEL_PATH,
|
||||
model_input_size=self.MODEL_INPUT_SIZE,
|
||||
anchors=self._anchors,
|
||||
confidence_threshold=self._confidence_threshold,
|
||||
nms_threshold=self._nms_threshold,
|
||||
rgb888p_size=self._rgb888p_size,
|
||||
display_size=self._display_size,
|
||||
debug_mode=self._debug_mode
|
||||
)
|
||||
self._face_det.config_preprocess()
|
||||
|
||||
self._is_running = True
|
||||
|
||||
def detect(self):
|
||||
"""
|
||||
检测当前帧中的人脸
|
||||
|
||||
返回:
|
||||
tuple: (img, faces)
|
||||
- img: 当前帧图像
|
||||
- faces: 人脸列表,每个元素为 (x, y, w, h, score)
|
||||
坐标已转换为显示分辨率下的实际坐标
|
||||
"""
|
||||
if not self._is_running:
|
||||
raise RuntimeError("检测器未启动,请先调用 start()")
|
||||
|
||||
os.exitpoint()
|
||||
|
||||
# 获取当前帧
|
||||
img = self._pipeline.get_frame()
|
||||
|
||||
# 推理
|
||||
res = self._face_det.run(img)
|
||||
|
||||
# 转换坐标为显示分辨率
|
||||
faces = []
|
||||
if res:
|
||||
for det in res:
|
||||
x, y, w, h = map(lambda v: int(round(v, 0)), det[:4])
|
||||
score = det[4] if len(det) > 4 else 1.0
|
||||
# 转换为显示坐标
|
||||
x = x * self._display_size[0] // self._rgb888p_size[0]
|
||||
y = y * self._display_size[1] // self._rgb888p_size[1]
|
||||
w = w * self._display_size[0] // self._rgb888p_size[0]
|
||||
h = h * self._display_size[1] // self._rgb888p_size[1]
|
||||
faces.append((x, y, w, h, score))
|
||||
|
||||
self._last_faces = faces
|
||||
return img, faces
|
||||
|
||||
def draw_boxes(self, color=(255, 255, 0, 255), thickness=2):
|
||||
"""
|
||||
在OSD层绘制人脸检测框
|
||||
|
||||
参数:
|
||||
color: 框颜色 (R, G, B, A)
|
||||
thickness: 线条粗细
|
||||
"""
|
||||
if not self._is_running:
|
||||
return
|
||||
|
||||
self._pipeline.osd_img.clear()
|
||||
for (x, y, w, h, score) in self._last_faces:
|
||||
self._pipeline.osd_img.draw_rectangle(
|
||||
x, y, w, h,
|
||||
color=color,
|
||||
thickness=thickness
|
||||
)
|
||||
|
||||
def show(self):
|
||||
"""显示当前帧(需要先调用detect和draw_boxes)"""
|
||||
if self._is_running:
|
||||
self._pipeline.show_image()
|
||||
gc.collect()
|
||||
|
||||
def stop(self):
|
||||
"""停止人脸检测器,释放资源"""
|
||||
if self._face_det:
|
||||
self._face_det.deinit()
|
||||
self._face_det = None
|
||||
if self._pipeline:
|
||||
self._pipeline.destroy()
|
||||
self._pipeline = None
|
||||
self._is_running = False
|
||||
|
||||
@property
|
||||
def is_running(self):
|
||||
"""是否正在运行"""
|
||||
return self._is_running
|
||||
|
||||
@property
|
||||
def display_size(self):
|
||||
"""显示分辨率"""
|
||||
return self._display_size
|
||||
|
||||
@property
|
||||
def last_faces(self):
|
||||
"""上次检测到的人脸列表"""
|
||||
return self._last_faces
|
||||
|
||||
|
||||
class _FaceDetectionApp(AIBase):
|
||||
"""内部人脸检测应用类,继承自AIBase"""
|
||||
|
||||
def __init__(self, kmodel_path, model_input_size, anchors,
|
||||
confidence_threshold=0.5, nms_threshold=0.2,
|
||||
rgb888p_size=[224, 224], display_size=[1920, 1080],
|
||||
debug_mode=0):
|
||||
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
|
||||
self.kmodel_path = kmodel_path
|
||||
self.model_input_size = model_input_size
|
||||
self.confidence_threshold = confidence_threshold
|
||||
self.nms_threshold = nms_threshold
|
||||
self.anchors = anchors
|
||||
# 对宽度进行16字节对齐
|
||||
self.rgb888p_size = [self._align_up(rgb888p_size[0], 16), rgb888p_size[1]]
|
||||
self.display_size = [self._align_up(display_size[0], 16), display_size[1]]
|
||||
self.debug_mode = debug_mode
|
||||
self.ai2d = Ai2d(debug_mode)
|
||||
self.ai2d.set_ai2d_dtype(
|
||||
nn.ai2d_format.NCHW_FMT,
|
||||
nn.ai2d_format.NCHW_FMT,
|
||||
np.uint8,
|
||||
np.uint8
|
||||
)
|
||||
|
||||
def _align_up(self, value, alignment):
|
||||
"""向上对齐"""
|
||||
return ((value + alignment - 1) // alignment) * alignment
|
||||
|
||||
def config_preprocess(self, input_image_size=None):
|
||||
"""配置预处理"""
|
||||
with ScopedTiming("set preprocess config", self.debug_mode > 0):
|
||||
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
|
||||
top, bottom, left, right = self.get_padding_param()
|
||||
self.ai2d.pad([0, 0, 0, 0, top, bottom, left, right], 0, [104, 117, 123])
|
||||
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
|
||||
self.ai2d.build(
|
||||
[1, 3, ai2d_input_size[1], ai2d_input_size[0]],
|
||||
[1, 3, self.model_input_size[1], self.model_input_size[0]]
|
||||
)
|
||||
|
||||
def postprocess(self, results):
|
||||
"""后处理"""
|
||||
with ScopedTiming("postprocess", self.debug_mode > 0):
|
||||
post_ret = aidemo.face_det_post_process(
|
||||
self.confidence_threshold,
|
||||
self.nms_threshold,
|
||||
self.model_input_size[1],
|
||||
self.anchors,
|
||||
self.rgb888p_size,
|
||||
results
|
||||
)
|
||||
if len(post_ret) == 0:
|
||||
return post_ret
|
||||
else:
|
||||
return post_ret[0]
|
||||
|
||||
def get_padding_param(self):
|
||||
"""计算填充参数"""
|
||||
dst_w = self.model_input_size[0]
|
||||
dst_h = self.model_input_size[1]
|
||||
ratio_w = dst_w / self.rgb888p_size[0]
|
||||
ratio_h = dst_h / self.rgb888p_size[1]
|
||||
ratio = min(ratio_w, ratio_h)
|
||||
new_w = int(ratio * self.rgb888p_size[0])
|
||||
new_h = int(ratio * self.rgb888p_size[1])
|
||||
dw = (dst_w - new_w) / 2
|
||||
dh = (dst_h - new_h) / 2
|
||||
top = int(round(0))
|
||||
bottom = int(round(dh * 2 + 0.1))
|
||||
left = int(round(0))
|
||||
right = int(round(dw * 2 - 0.1))
|
||||
return top, bottom, left, right
|
||||
BIN
k230/hello.wav
Normal file
BIN
k230/hello.wav
Normal file
Binary file not shown.
24
k230/k230_readme.md
Normal file
24
k230/k230_readme.md
Normal file
@ -0,0 +1,24 @@
|
||||
## k230模块
|
||||
1. 3.1寸显示驱动 ---> 已实现
|
||||
2. 摄像头图像获取 ---> 已实现
|
||||
3. 舵机控制 ---> 已实现
|
||||
4. 麦克风 ---> 已实现
|
||||
5. 喇叭播放 ---> 已实现
|
||||
|
||||
## 软件驱动
|
||||
1. 人脸检测接口 ---> 已实现
|
||||
2. ws请求接口
|
||||
3. 播放本地音频文件接口(提前内置,直接使用前面实现了的喇叭播放)
|
||||
4. 人脸追踪
|
||||
|
||||
## 流程
|
||||
1. 获取图像数据
|
||||
2. 检测人脸( 超过5s)
|
||||
3. 请求模型端( 情绪检测以及皮肤状态打分,注意时间戳)
|
||||
4. 保存到数据库( 图像数据 ,心情,皮肤状态)
|
||||
5. 判断时候状态不好! 然后发起语音对话
|
||||
|
||||
|
||||
## 其他
|
||||
### 音频格式转换
|
||||
ffmpeg -y -i hello_24k.wav -ar 44100 -ac 1 -sample_fmt s16 hello.wav
|
||||
BIN
k230/logo.jpg
Normal file
BIN
k230/logo.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 41 KiB |
BIN
k230/logo_3s.mp4
Normal file
BIN
k230/logo_3s.mp4
Normal file
Binary file not shown.
547
k230/main.py
Normal file
547
k230/main.py
Normal file
@ -0,0 +1,547 @@
|
||||
# main.py
|
||||
# 人脸检测与状态分析主程序
|
||||
# 功能:实时人脸检测 -> 持续3秒触发情绪和皮肤状态检测 -> 输出结果
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
import os
|
||||
import time
|
||||
import gc
|
||||
import sys
|
||||
from face_detect_module import FaceDetector
|
||||
from state_detection import StateDetector
|
||||
from audio_module import AudioPlayer
|
||||
import camera_module
|
||||
|
||||
# 音频播放相关导入
|
||||
from media.media import *
|
||||
from media.pyaudio import *
|
||||
import media.wave as wave
|
||||
|
||||
# ========== 配置参数 ==========
|
||||
|
||||
# WiFi 配置
|
||||
WIFI_SSID = "ZTE_969121"
|
||||
WIFI_PASSWORD = "wds666666"
|
||||
|
||||
# 服务器配置
|
||||
SERVER_HOST = "192.168.0.21"
|
||||
SERVER_PORT = 8081
|
||||
API_PATH = "/api/detection/analyze"
|
||||
|
||||
# 检测参数
|
||||
DETECTION_THRESHOLD = 3.0 # 人脸持续3秒后触发检测
|
||||
COOLDOWN_PERIOD = 10.0 # 检测完成后10秒冷却期(避免频繁检测)
|
||||
|
||||
# 显示和音频配置
|
||||
DISPLAY_MODE = "lcd" # LCD显示模式
|
||||
AUDIO_FILE = "/sdcard/HaveFace.wav" # 检测到人脸时的提示音
|
||||
|
||||
# 摄像头配置
|
||||
DETECTION_IMAGE_WIDTH = 640
|
||||
DETECTION_IMAGE_HEIGHT = 480
|
||||
|
||||
# ========== 状态管理类 ==========
|
||||
|
||||
|
||||
class FaceDetectionState:
|
||||
"""
|
||||
人脸检测状态管理
|
||||
|
||||
追踪:
|
||||
- 人脸首次出现时间
|
||||
- 上次检测触发时间
|
||||
- 是否正在检测中
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.face_start_time = None # 人脸首次出现的时间戳(None表示无人脸)
|
||||
self.last_detection_time = None # 上次触发检测的时间戳
|
||||
self.is_detecting = False # 是否正在执行检测中
|
||||
|
||||
def on_face_detected(self):
|
||||
"""
|
||||
人脸检测到时调用(有人脸)
|
||||
"""
|
||||
if self.face_start_time is None:
|
||||
self.face_start_time = time.time()
|
||||
|
||||
def on_face_lost(self):
|
||||
"""
|
||||
人脸丢失时调用(无人脸)
|
||||
重置人脸计时器
|
||||
"""
|
||||
self.face_start_time = None
|
||||
|
||||
def get_face_duration(self):
|
||||
"""
|
||||
获取人脸持续时间(秒)
|
||||
|
||||
返回:
|
||||
float: 人脸持续的秒数,无人脸时返回0
|
||||
"""
|
||||
if self.face_start_time is None:
|
||||
return 0.0
|
||||
return time.time() - self.face_start_time
|
||||
|
||||
def should_trigger_detection(
|
||||
self,
|
||||
threshold=DETECTION_THRESHOLD,
|
||||
cooldown=COOLDOWN_PERIOD):
|
||||
"""
|
||||
判断是否应该触发检测
|
||||
|
||||
条件:
|
||||
1. 人脸持续时间 >= threshold
|
||||
2. 不在检测中
|
||||
3. 超过冷却期(避免重复检测)
|
||||
|
||||
参数:
|
||||
threshold: 触发检测的时间阈值(秒)
|
||||
cooldown: 冷却期(秒)
|
||||
|
||||
返回:
|
||||
bool: 是否应该触发检测
|
||||
"""
|
||||
# 如果正在检测中,不再触发
|
||||
if self.is_detecting:
|
||||
return False
|
||||
|
||||
# 人脸持续时间不足,不触发
|
||||
if self.get_face_duration() < threshold:
|
||||
return False
|
||||
|
||||
# 检查冷却期
|
||||
if self.last_detection_time is not None:
|
||||
time_since_last = time.time() - self.last_detection_time
|
||||
if time_since_last < cooldown:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def start_detection(self):
|
||||
"""标记开始执行检测"""
|
||||
self.is_detecting = True
|
||||
|
||||
def end_detection(self):
|
||||
"""标记检测完成,记录时间用于冷却期计算"""
|
||||
self.is_detecting = False
|
||||
self.last_detection_time = time.time()
|
||||
|
||||
def has_active_face(self):
|
||||
"""是否有活跃人脸(人脸出现过)"""
|
||||
return self.face_start_time is not None
|
||||
|
||||
|
||||
# ========== 辅助函数 ==========
|
||||
|
||||
def exit_check():
|
||||
"""
|
||||
检查退出信号(参考boot.py)
|
||||
"""
|
||||
try:
|
||||
os.exitpoint()
|
||||
except KeyboardInterrupt as e:
|
||||
print("[用户] 中断: {}".format(e))
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def play_audio_safe(filename):
|
||||
"""
|
||||
安全播放音频文件(完全参考boot.py实现)
|
||||
|
||||
特点:
|
||||
- 完整播放音频
|
||||
- 正确初始化和释放资源
|
||||
- 支持用户中断
|
||||
- 参考 boot.py 的实现方式
|
||||
|
||||
参数:
|
||||
filename: WAV文件路径
|
||||
|
||||
返回:
|
||||
bool: True成功, False失败
|
||||
"""
|
||||
try:
|
||||
# 打开wav文件
|
||||
wf = wave.open(filename, 'rb')
|
||||
chunk = int(wf.get_framerate() / 25)
|
||||
|
||||
# 初始化PyAudio
|
||||
p = PyAudio()
|
||||
p.initialize(chunk)
|
||||
MediaManager.init() # vb buffer初始化(重要!)
|
||||
|
||||
# 创建音频输出流,设置的音频参数均为wave中获取到的参数
|
||||
stream = p.open(
|
||||
format=p.get_format_from_width(wf.get_sampwidth()),
|
||||
channels=wf.get_channels(),
|
||||
rate=wf.get_framerate(),
|
||||
output=True,
|
||||
frames_per_buffer=chunk
|
||||
)
|
||||
|
||||
# 从wav文件中读取第一帧数据
|
||||
data = wf.read_frames(chunk)
|
||||
|
||||
# 逐帧读取并播放
|
||||
while data:
|
||||
stream.write(data) # 将帧数据写入到音频输出流中
|
||||
data = wf.read_frames(chunk) # 从wav文件中读取数一帧数据
|
||||
if exit_check(): # 检查退出信号
|
||||
break
|
||||
|
||||
return True
|
||||
|
||||
except BaseException as e:
|
||||
print("[错误] 音频播放异常: {}".format(e))
|
||||
return False
|
||||
|
||||
finally:
|
||||
# 清理资源
|
||||
try:
|
||||
stream.stop_stream() # 停止音频输出流
|
||||
stream.close() # 关闭音频输出流
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
p.terminate() # 释放音频对象
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
wf.close() # 关闭wav文件
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
MediaManager.deinit() # 释放vb buffer
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def image_to_rgb888(img):
|
||||
"""
|
||||
将图像转换为RGB888格式的bytes数据
|
||||
|
||||
支持多种Image对象格式(兼容不同的K230库版本)
|
||||
|
||||
参数:
|
||||
img: Image对象
|
||||
|
||||
返回:
|
||||
bytes: RGB888格式的图像数据
|
||||
转换失败返回 None
|
||||
"""
|
||||
try:
|
||||
if hasattr(img, 'to_bytes'):
|
||||
return img.to_bytes()
|
||||
if hasattr(img, 'tobytes'):
|
||||
return img.tobytes()
|
||||
return bytes(img)
|
||||
except Exception as e:
|
||||
print("[错误] 图像转换错误: {}".format(e))
|
||||
return None
|
||||
|
||||
|
||||
def initialize():
|
||||
"""
|
||||
初始化程序
|
||||
|
||||
步骤:
|
||||
1. 连接 WiFi(重试3次)
|
||||
2. 初始化 StateDetector
|
||||
3. 初始化 FaceDetector
|
||||
|
||||
返回:
|
||||
tuple: (face_detector, state_detector)
|
||||
初始化失败返回 (None, None)
|
||||
"""
|
||||
print("=" * 50)
|
||||
print("程序启动 - 人脸检测与状态分析系统")
|
||||
print("=" * 50)
|
||||
|
||||
# ===== 第1步:连接WiFi =====
|
||||
print("\n[初始化] 连接WiFi: {}".format(WIFI_SSID))
|
||||
|
||||
state_detector = StateDetector(SERVER_HOST, SERVER_PORT, API_PATH)
|
||||
|
||||
for attempt in range(3):
|
||||
try:
|
||||
state_detector.connect_wifi(WIFI_SSID, WIFI_PASSWORD)
|
||||
print("[初始化] WiFi连接成功")
|
||||
break
|
||||
except Exception as e:
|
||||
print("[初始化] WiFi连接失败 (尝试 {}/3): {}".format(attempt + 1, e))
|
||||
if attempt < 2:
|
||||
time.sleep(2)
|
||||
else:
|
||||
print("[错误] WiFi连接失败,程序退出")
|
||||
return None, None
|
||||
|
||||
# ===== 第2步:初始化人脸检测器 =====
|
||||
print("[初始化] 启动人脸检测器...")
|
||||
|
||||
try:
|
||||
face_detector = FaceDetector(
|
||||
display_mode=DISPLAY_MODE,
|
||||
confidence_threshold=0.5,
|
||||
debug_mode=0
|
||||
)
|
||||
face_detector.start()
|
||||
print("[初始化] 人脸检测器启动成功")
|
||||
except Exception as e:
|
||||
print("[错误] 人脸检测器启动失败: {}".format(e))
|
||||
state_detector.disconnect()
|
||||
return None, None
|
||||
|
||||
print("\n[初始化] 初始化完成,开始检测...\n")
|
||||
|
||||
return face_detector, state_detector
|
||||
|
||||
|
||||
def capture_detection_image():
|
||||
"""
|
||||
捕获用于状态检测的图像(640x480)
|
||||
|
||||
由于 FaceDetector 使用摄像头的 1920x1080 分辨率,
|
||||
而状态检测需要 640x480 的图像,
|
||||
此函数会临时停止 FaceDetector,使用 camera_module 快速拍摄。
|
||||
|
||||
返回:
|
||||
bytes: RGB888 格式的图像数据
|
||||
失败返回 None
|
||||
"""
|
||||
try:
|
||||
# 初始化 640x480 摄像头
|
||||
camera_module.camera_init(DETECTION_IMAGE_WIDTH, DETECTION_IMAGE_HEIGHT)
|
||||
camera_module.camera_start()
|
||||
|
||||
# 等待摄像头稳定(参考 05test_state_detection.py)
|
||||
time.sleep(1)
|
||||
|
||||
# 拍照
|
||||
print("[检测] 拍照中...")
|
||||
img = camera_module.camera_snapshot()
|
||||
|
||||
# 转换为RGB888格式
|
||||
rgb888_data = image_to_rgb888(img)
|
||||
|
||||
# 清理摄像头资源
|
||||
camera_module.camera_stop()
|
||||
camera_module.camera_deinit()
|
||||
|
||||
return rgb888_data
|
||||
|
||||
except Exception as e:
|
||||
print("[错误] 图像捕获失败: {}".format(e))
|
||||
try:
|
||||
camera_module.camera_stop()
|
||||
camera_module.camera_deinit()
|
||||
except BaseException:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def trigger_detection(face_detector, state_detector):
|
||||
"""
|
||||
触发状态检测的完整流程
|
||||
|
||||
步骤:
|
||||
1. 暂停人脸检测显示(屏幕会冻结)
|
||||
2. 播放提示音
|
||||
3. 捕获 640x480 图像
|
||||
4. 发送状态检测请求(情绪+皮肤)
|
||||
5. 输出结果
|
||||
6. 恢复人脸检测显示
|
||||
|
||||
参数:
|
||||
face_detector: FaceDetector 实例
|
||||
state_detector: StateDetector 实例
|
||||
|
||||
注意:
|
||||
此过程会阻塞 3-5 秒,期间屏幕显示会冻结
|
||||
"""
|
||||
print("\n" + "=" * 50)
|
||||
print("触发状态检测...")
|
||||
print("=" * 50)
|
||||
|
||||
try:
|
||||
# 暂停人脸检测
|
||||
face_detector.stop()
|
||||
print("[检测] 暂停人脸显示")
|
||||
|
||||
# ===== 步骤1:播放提示音 =====
|
||||
print("[检测] 播放提示音...")
|
||||
try:
|
||||
if play_audio_safe(AUDIO_FILE):
|
||||
print("[检测] 提示音播放完成")
|
||||
else:
|
||||
print("[警告] 播放音频失败")
|
||||
except Exception as e:
|
||||
print("[警告] 播放音频异常: {}".format(e))
|
||||
|
||||
# ===== 步骤2:捕获图像 =====
|
||||
print("[检测] 捕获图像...")
|
||||
rgb888_data = capture_detection_image()
|
||||
|
||||
if rgb888_data is None:
|
||||
print("[错误] 无法获取图像数据,检测失败")
|
||||
return
|
||||
|
||||
print("[检测] 图像数据大小: {} bytes".format(len(rgb888_data)))
|
||||
|
||||
# ===== 步骤3:发送检测请求 =====
|
||||
print("[检测] 发送状态检测请求...")
|
||||
result = state_detector.detect(
|
||||
rgb888_data,
|
||||
width=DETECTION_IMAGE_WIDTH,
|
||||
height=DETECTION_IMAGE_HEIGHT
|
||||
)
|
||||
|
||||
# ===== 步骤4:输出结果 =====
|
||||
print("\n" + "=" * 45 + " 检测结果 " + "=" * 45)
|
||||
|
||||
if result.get("success"):
|
||||
emotion = result.get('emotion', 'unknown')
|
||||
confidence = result.get('emotion_confidence', 0)
|
||||
skin = result.get('skin_status', {})
|
||||
|
||||
print("情绪: {} (置信度: {:.2f})".format(emotion, confidence))
|
||||
print("皮肤状态:")
|
||||
print(" - 痘痘: {}".format(skin.get('acne', 0)))
|
||||
print(" - 皱纹: {}".format(skin.get('wrinkles', 0)))
|
||||
print(" - 毛孔: {}".format(skin.get('pores', 0)))
|
||||
print(" - 黑眼圈: {}".format(skin.get('dark_circles', 0)))
|
||||
else:
|
||||
print("检测失败: {}".format(result.get('error', '未知错误')))
|
||||
|
||||
print("=" * 100)
|
||||
|
||||
except Exception as e:
|
||||
print("[错误] 检测过程异常: {}".format(e))
|
||||
try:
|
||||
sys.print_exception(e)
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
finally:
|
||||
# 恢复人脸检测显示
|
||||
print("\n[检测] 恢复人脸检测...")
|
||||
try:
|
||||
face_detector.start()
|
||||
print("[检测] 人脸检测已恢复\n")
|
||||
except Exception as e:
|
||||
print("[错误] 恢复人脸检测失败: {}".format(e))
|
||||
|
||||
|
||||
def main_loop(face_detector, state_detector):
|
||||
"""
|
||||
主检测循环
|
||||
|
||||
流程:
|
||||
1. 检测人脸
|
||||
2. 更新状态(出现/消失)
|
||||
3. 判断是否触发检测
|
||||
4. 绘制和显示
|
||||
5. 垃圾回收
|
||||
|
||||
参数:
|
||||
face_detector: FaceDetector 实例
|
||||
state_detector: StateDetector 实例
|
||||
"""
|
||||
state = FaceDetectionState()
|
||||
frame_count = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
os.exitpoint()
|
||||
|
||||
# ===== 步骤1:检测人脸 =====
|
||||
img, faces = face_detector.detect()
|
||||
frame_count += 1
|
||||
|
||||
# ===== 步骤2:更新状态 =====
|
||||
if faces:
|
||||
state.on_face_detected()
|
||||
|
||||
# 仅每秒打印一次(减少日志输出)
|
||||
if frame_count % 25 == 0:
|
||||
print("[检测] 检测到 {} 个人脸,持续时间: {:.1f} 秒".format(
|
||||
len(faces), state.get_face_duration()
|
||||
))
|
||||
else:
|
||||
state.on_face_lost()
|
||||
|
||||
# ===== 步骤3:判断是否触发检测 =====
|
||||
if state.should_trigger_detection():
|
||||
state.start_detection()
|
||||
trigger_detection(face_detector, state_detector)
|
||||
state.end_detection()
|
||||
|
||||
# ===== 步骤4:绘制和显示 =====
|
||||
face_detector.draw_boxes()
|
||||
face_detector.show()
|
||||
|
||||
# ===== 步骤5:垃圾回收 =====
|
||||
gc.collect()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n[用户] 用户停止")
|
||||
break
|
||||
except Exception as e:
|
||||
print("[错误] 主循环异常: {}".format(e))
|
||||
try:
|
||||
sys.print_exception(e)
|
||||
except BaseException:
|
||||
pass
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
# ========== 主程序入口 ==========
|
||||
|
||||
if __name__ == "__main__":
|
||||
os.exitpoint(os.EXITPOINT_ENABLE)
|
||||
|
||||
face_detector = None
|
||||
state_detector = None
|
||||
|
||||
try:
|
||||
# 初始化
|
||||
face_detector, state_detector = initialize()
|
||||
|
||||
# 启动主循环
|
||||
if face_detector and state_detector:
|
||||
main_loop(face_detector, state_detector)
|
||||
else:
|
||||
print("[错误] 初始化失败,程序退出")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n[用户] 用户中断")
|
||||
except Exception as e:
|
||||
print("[错误] 程序异常: {}".format(e))
|
||||
try:
|
||||
sys.print_exception(e)
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
finally:
|
||||
# 清理资源
|
||||
print("\n[清理] 释放资源...")
|
||||
|
||||
if face_detector:
|
||||
try:
|
||||
face_detector.stop()
|
||||
print("[清理] 人脸检测器已停止")
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
if state_detector:
|
||||
try:
|
||||
state_detector.disconnect()
|
||||
print("[清理] WiFi已断开")
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
print("[清理] 程序退出")
|
||||
28
k230/servo.py
Normal file
28
k230/servo.py
Normal file
@ -0,0 +1,28 @@
|
||||
# 02test_servo.py
|
||||
# 舵机测试程序:最大 -> 最小 -> 中间
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
from servo_module import ServoController
|
||||
import time
|
||||
import sys
|
||||
|
||||
sys.path.append("/sdcard")
|
||||
|
||||
|
||||
# 俯仰角
|
||||
servo1 = ServoController(gpio_pin=42, min_angle=45, max_angle=225, servo_range=270)
|
||||
# 横滚
|
||||
servo2 = ServoController(gpio_pin=52, min_angle=45, max_angle=225, servo_range=270)
|
||||
print("舵机测试开始...")
|
||||
|
||||
try:
|
||||
servo1.set_angle(200)
|
||||
servo2.set_angle(135)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("用户停止")
|
||||
finally:
|
||||
servo1.deinit()
|
||||
print("舵机资源已释放")
|
||||
141
k230/servo_module.py
Normal file
141
k230/servo_module.py
Normal file
@ -0,0 +1,141 @@
|
||||
# servo_module.py
|
||||
# 舵机控制模块
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
from machine import PWM, FPIOA
|
||||
|
||||
# GPIO引脚与PWM通道的固定映射 (40PIN排针可用)
|
||||
# 排针引脚号 | 芯片引脚号 | PWM通道号
|
||||
# 12 | GPIO 47 | PWM3
|
||||
# 26 | GPIO 61 | PWM1
|
||||
# 32 | GPIO 46 | PWM2
|
||||
# 33 | GPIO 52 | PWM4
|
||||
# 35 | GPIO 42 | PWM0
|
||||
GPIO_PWM_MAP = {
|
||||
42: 0, # GPIO42 -> PWM0 (排针35)
|
||||
46: 2, # GPIO46 -> PWM2 (排针32)
|
||||
47: 3, # GPIO47 -> PWM3 (排针12)
|
||||
52: 4, # GPIO52 -> PWM4 (排针33)
|
||||
61: 1, # GPIO61 -> PWM1 (排针26)
|
||||
}
|
||||
|
||||
# PWM通道对应的FPIOA功能
|
||||
PWM_FPIOA_MAP = {
|
||||
0: FPIOA.PWM0,
|
||||
1: FPIOA.PWM1,
|
||||
2: FPIOA.PWM2,
|
||||
3: FPIOA.PWM3,
|
||||
4: FPIOA.PWM4,
|
||||
}
|
||||
|
||||
|
||||
class ServoController:
|
||||
"""舵机控制类,支持角度控制和增量控制"""
|
||||
|
||||
def __init__(self, gpio_pin, min_angle=0, max_angle=270, servo_range=None):
|
||||
"""
|
||||
初始化舵机控制器
|
||||
|
||||
参数:
|
||||
gpio_pin: GPIO引脚号 (42, 46, 47, 52, 61)
|
||||
min_angle: 软件最小角度限制 (默认0)
|
||||
max_angle: 软件最大角度限制 (默认270)
|
||||
servo_range: 舵机物理最大角度范围 (默认与max_angle相同)
|
||||
用于脉宽计算,如180度舵机填180,270度舵机填270
|
||||
"""
|
||||
# 检查引脚是否支持
|
||||
if gpio_pin not in GPIO_PWM_MAP:
|
||||
raise ValueError("不支持的GPIO引脚,请使用42, 46, 47, 52, 61")
|
||||
|
||||
self._gpio_pin = gpio_pin
|
||||
self._pwm_channel = GPIO_PWM_MAP[gpio_pin]
|
||||
self._min_angle = min_angle
|
||||
self._max_angle = max_angle
|
||||
# 舵机物理角度范围,用于脉宽计算
|
||||
self._servo_range = servo_range if servo_range is not None else max_angle
|
||||
self._current_angle = (min_angle + max_angle) / 2 # 默认中间位置
|
||||
|
||||
# 配置GPIO为PWM功能
|
||||
self._fpioa = FPIOA()
|
||||
self._fpioa.set_function(gpio_pin, PWM_FPIOA_MAP[self._pwm_channel])
|
||||
|
||||
# 初始化PWM,频率50Hz
|
||||
self._pwm = PWM(self._pwm_channel)
|
||||
self._pwm.freq(50)
|
||||
|
||||
# 移动到初始位置
|
||||
self._update_pwm()
|
||||
|
||||
def _angle_to_duty(self, angle):
|
||||
"""
|
||||
将角度转换为PWM占空比值
|
||||
|
||||
舵机PWM信号: 周期20ms
|
||||
- 0.5ms脉宽 -> 0度
|
||||
- 2.5ms脉宽 -> 舵机最大角度
|
||||
根据servo_range动态计算
|
||||
"""
|
||||
# 脉宽范围: 0.5ms - 2.5ms,根据舵机物理范围计算
|
||||
pulse_ms = 0.5 + (angle / self._servo_range) * 2.0
|
||||
# 占空比: pulse_ms / 20ms * 65535
|
||||
duty = int((pulse_ms / 20.0) * 65535)
|
||||
return duty
|
||||
|
||||
def _clamp_angle(self, angle):
|
||||
"""限制角度在有效范围内"""
|
||||
if angle < self._min_angle:
|
||||
return self._min_angle
|
||||
if angle > self._max_angle:
|
||||
return self._max_angle
|
||||
return angle
|
||||
|
||||
def _update_pwm(self):
|
||||
"""更新PWM输出"""
|
||||
duty = self._angle_to_duty(self._current_angle)
|
||||
self._pwm.duty_u16(duty)
|
||||
|
||||
def set_angle(self, angle):
|
||||
"""
|
||||
设置舵机到指定角度
|
||||
|
||||
参数:
|
||||
angle: 目标角度
|
||||
"""
|
||||
self._current_angle = self._clamp_angle(angle)
|
||||
self._update_pwm()
|
||||
|
||||
def get_angle(self):
|
||||
"""
|
||||
获取当前角度
|
||||
|
||||
返回:
|
||||
当前角度值
|
||||
"""
|
||||
return self._current_angle
|
||||
|
||||
def rotate(self, direction, step):
|
||||
"""
|
||||
增量控制舵机
|
||||
|
||||
参数:
|
||||
direction: 方向,1为顺时针(角度增大),-1为逆时针(角度减小)
|
||||
step: 步进角度 (如0.1, 1, 5等)
|
||||
|
||||
返回:
|
||||
转动后的角度
|
||||
"""
|
||||
new_angle = self._current_angle + (direction * step)
|
||||
self._current_angle = self._clamp_angle(new_angle)
|
||||
self._update_pwm()
|
||||
return self._current_angle
|
||||
|
||||
def center(self):
|
||||
"""舵机归中,移动到角度范围的中间位置"""
|
||||
center_angle = (self._min_angle + self._max_angle) / 2
|
||||
self.set_angle(center_angle)
|
||||
|
||||
def deinit(self):
|
||||
"""释放PWM资源"""
|
||||
if self._pwm:
|
||||
self._pwm.deinit()
|
||||
self._pwm = None
|
||||
375
k230/state_detection.py
Normal file
375
k230/state_detection.py
Normal file
@ -0,0 +1,375 @@
|
||||
# state_detection.py
|
||||
# 状态检测模块 - 通过HTTP请求发送图像进行心情和皮肤状态检测
|
||||
# 适用于庐山派 K230-CanMV 开发板
|
||||
|
||||
import network
|
||||
import socket
|
||||
import time
|
||||
import os
|
||||
|
||||
try:
|
||||
import ubinascii as binascii
|
||||
except ImportError:
|
||||
import binascii
|
||||
|
||||
try:
|
||||
import ujson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
try:
|
||||
import ussl as ssl
|
||||
except ImportError:
|
||||
try:
|
||||
import ssl
|
||||
except ImportError:
|
||||
ssl = None
|
||||
|
||||
|
||||
class StateDetector:
|
||||
"""
|
||||
状态检测类 - 通过HTTP POST请求发送图像数据到服务器进行心情和皮肤状态检测
|
||||
|
||||
功能:
|
||||
- 连接WiFi网络
|
||||
- 发送图像数据到检测服务器 (POST /api/detection/analyze)
|
||||
- 接收并解析心情和皮肤状态检测结果
|
||||
|
||||
参数:
|
||||
server_host: 检测服务器主机地址
|
||||
server_port: 检测服务器端口 (默认80)
|
||||
api_path: API路径 (默认 "/api/detection/analyze")
|
||||
|
||||
使用示例:
|
||||
detector = StateDetector(server_host="192.168.0.21", server_port=8081)
|
||||
detector.connect_wifi("SSID", "password")
|
||||
result = detector.detect(image_data)
|
||||
print(result["emotion"])
|
||||
detector.disconnect()
|
||||
"""
|
||||
|
||||
# 情绪类型定义
|
||||
EMOTIONS = ["angry", "disgust", "fear", "happy", "sad", "surprise", "neutral"]
|
||||
|
||||
def __init__(self, server_host, server_port=80, api_path="/api/detection/analyze"):
|
||||
"""
|
||||
初始化状态检测器
|
||||
|
||||
参数:
|
||||
server_host: 检测服务器主机地址
|
||||
server_port: 检测服务器端口
|
||||
api_path: API路径
|
||||
"""
|
||||
self._server_host = server_host
|
||||
self._server_port = server_port
|
||||
self._api_path = api_path
|
||||
|
||||
self._sta = None
|
||||
self._is_connected = False
|
||||
|
||||
def connect_wifi(self, ssid, password=None, timeout=15):
|
||||
"""
|
||||
连接WiFi网络
|
||||
|
||||
参数:
|
||||
ssid: WiFi SSID
|
||||
password: WiFi密码 (无密码时可为None)
|
||||
timeout: 连接超时时间(秒)
|
||||
|
||||
返回:
|
||||
str: 获取的IP地址
|
||||
"""
|
||||
self._sta = network.WLAN(network.STA_IF)
|
||||
|
||||
if password:
|
||||
self._sta.connect(ssid, password)
|
||||
else:
|
||||
self._sta.connect(ssid)
|
||||
|
||||
print("正在连接WiFi: " + ssid + "...")
|
||||
|
||||
start_time = time.time()
|
||||
while not self._sta.isconnected():
|
||||
if time.time() - start_time > timeout:
|
||||
raise RuntimeError("WiFi连接超时: " + ssid)
|
||||
time.sleep(1)
|
||||
os.exitpoint()
|
||||
|
||||
ip = self._sta.ifconfig()[0]
|
||||
print("WiFi连接成功! IP: " + ip)
|
||||
self._is_connected = True
|
||||
return ip
|
||||
|
||||
def detect(self, image_data, width=640, height=480, timestamp=None):
|
||||
"""
|
||||
发送图像数据进行状态检测
|
||||
|
||||
参数:
|
||||
image_data: 图像数据 (bytes类型,RGB888格式)
|
||||
width: 图像宽度 (默认640)
|
||||
height: 图像高度 (默认480)
|
||||
timestamp: 图像采集时间 (可选,格式: "YYYY-MM-DD HH:MM:SS")
|
||||
|
||||
返回:
|
||||
dict: 检测结果
|
||||
"""
|
||||
if not self._is_connected:
|
||||
raise RuntimeError("网络未连接,请先调用 connect_wifi() 或 connect_lan()")
|
||||
|
||||
# 添加BMP头,将原始RGB数据包装为BMP图片
|
||||
bmp_image = self._add_bmp_header(image_data, width, height)
|
||||
|
||||
# 将BMP图像数据编码为base64
|
||||
image_base64 = binascii.b2a_base64(bmp_image).decode('utf-8').strip()
|
||||
|
||||
# 构建请求体
|
||||
request_body = {
|
||||
"type": "status_detection",
|
||||
"image": image_base64
|
||||
}
|
||||
|
||||
if timestamp:
|
||||
request_body["timestamp"] = timestamp
|
||||
else:
|
||||
# 使用当前时间
|
||||
t = time.localtime()
|
||||
ts = "%04d-%02d-%02d %02d:%02d:%02d" % (t[0], t[1], t[2], t[3], t[4], t[5])
|
||||
request_body["timestamp"] = ts
|
||||
|
||||
body_json = json.dumps(request_body)
|
||||
|
||||
# 创建socket并发送HTTP请求
|
||||
response = self._send_http_request(body_json)
|
||||
|
||||
# 解析响应
|
||||
return self._parse_response(response)
|
||||
|
||||
def _add_bmp_header(self, rgb_data, width, height):
|
||||
"""
|
||||
为RGB888数据添加BMP文件头
|
||||
"""
|
||||
# BMP文件头 (14 bytes)
|
||||
# 0x42, 0x4D ("BM")
|
||||
# File size (4 bytes)
|
||||
# Reserved (4 bytes)
|
||||
# Data offset (4 bytes) -> 54
|
||||
|
||||
# DIB Header (40 bytes)
|
||||
# Header size (4 bytes) -> 40
|
||||
# Width (4 bytes)
|
||||
# Height (4 bytes) -> -height for top-down
|
||||
# Planes (2 bytes) -> 1
|
||||
# BPP (2 bytes) -> 24
|
||||
# Compression (4 bytes) -> 0
|
||||
# Image size (4 bytes)
|
||||
# X ppm (4 bytes)
|
||||
# Y ppm (4 bytes)
|
||||
# Colors used (4 bytes)
|
||||
# Colors important (4 bytes)
|
||||
|
||||
file_size = 54 + len(rgb_data)
|
||||
|
||||
# 构建头部
|
||||
header = bytearray(54)
|
||||
|
||||
# BM
|
||||
header[0], header[1] = 0x42, 0x4D
|
||||
|
||||
# File Size
|
||||
header[2] = file_size & 0xFF
|
||||
header[3] = (file_size >> 8) & 0xFF
|
||||
header[4] = (file_size >> 16) & 0xFF
|
||||
header[5] = (file_size >> 24) & 0xFF
|
||||
|
||||
# Data Offset (54)
|
||||
header[10] = 54
|
||||
|
||||
# DIB Header Size (40)
|
||||
header[14] = 40
|
||||
|
||||
# Width
|
||||
header[18] = width & 0xFF
|
||||
header[19] = (width >> 8) & 0xFF
|
||||
header[20] = (width >> 16) & 0xFF
|
||||
header[21] = (width >> 24) & 0xFF
|
||||
|
||||
# Height (Use negative for top-down RGB)
|
||||
# 2's complement for negative number
|
||||
h = -height
|
||||
header[22] = h & 0xFF
|
||||
header[23] = (h >> 8) & 0xFF
|
||||
header[24] = (h >> 16) & 0xFF
|
||||
header[25] = (h >> 24) & 0xFF
|
||||
|
||||
# Planes (1)
|
||||
header[26] = 1
|
||||
|
||||
# BPP (24)
|
||||
header[28] = 24
|
||||
|
||||
# Image Size
|
||||
data_len = len(rgb_data)
|
||||
header[34] = data_len & 0xFF
|
||||
header[35] = (data_len >> 8) & 0xFF
|
||||
header[36] = (data_len >> 16) & 0xFF
|
||||
header[37] = (data_len >> 24) & 0xFF
|
||||
|
||||
return header + rgb_data
|
||||
|
||||
def _send_http_request(self, body, max_retries=3):
|
||||
"""
|
||||
发送HTTP POST请求
|
||||
|
||||
参数:
|
||||
body: 请求体(JSON字符串)
|
||||
max_retries: 最大重试次数
|
||||
|
||||
返回:
|
||||
str: HTTP响应内容
|
||||
"""
|
||||
# 获取服务器地址
|
||||
addr_info = None
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
addr_info = socket.getaddrinfo(self._server_host, self._server_port)
|
||||
break
|
||||
except BaseException:
|
||||
print("DNS解析重试 (" + str(attempt + 1) + "/" + str(max_retries) + ")")
|
||||
time.sleep(1)
|
||||
|
||||
if not addr_info:
|
||||
raise RuntimeError("无法解析服务器地址: " + self._server_host)
|
||||
|
||||
addr = addr_info[0][-1]
|
||||
print("连接服务器: " + str(addr))
|
||||
|
||||
# 确保body是bytes类型
|
||||
if isinstance(body, str):
|
||||
body_bytes = body.encode('utf-8')
|
||||
else:
|
||||
body_bytes = body
|
||||
|
||||
# 创建socket并连接
|
||||
s = socket.socket()
|
||||
|
||||
try:
|
||||
s.connect(addr)
|
||||
|
||||
# 构建HTTP请求头
|
||||
host_header = self._server_host
|
||||
if self._server_port != 80:
|
||||
host_header = self._server_host + ":" + str(self._server_port)
|
||||
|
||||
# 构建头部
|
||||
header = "POST " + self._api_path + " HTTP/1.1\r\n"
|
||||
header += "Host: " + host_header + "\r\n"
|
||||
header += "Content-Type: application/json\r\n"
|
||||
header += "Content-Length: " + str(len(body_bytes)) + "\r\n"
|
||||
header += "Connection: close\r\n"
|
||||
header += "\r\n"
|
||||
|
||||
# 发送头部 (确保全部发送)
|
||||
self._send_all(s, header.encode('utf-8'))
|
||||
|
||||
# 发送body (确保全部发送)
|
||||
self._send_all(s, body_bytes)
|
||||
|
||||
# 接收响应
|
||||
response = b""
|
||||
while True:
|
||||
chunk = s.recv(4096)
|
||||
if not chunk:
|
||||
break
|
||||
response += chunk
|
||||
|
||||
return response.decode('utf-8')
|
||||
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
def _send_all(self, s, data):
|
||||
"""
|
||||
辅助函数:确保数据全部发送
|
||||
使用memoryview避免内存复制,分块发送
|
||||
"""
|
||||
# 使用memoryview避免切片时的内存复制
|
||||
mv = memoryview(data)
|
||||
total_len = len(data)
|
||||
total_sent = 0
|
||||
chunk_size = 4096 # 每次发送4KB
|
||||
|
||||
while total_sent < total_len:
|
||||
try:
|
||||
# 计算本次发送的切片
|
||||
remaining = total_len - total_sent
|
||||
to_send = min(chunk_size, remaining)
|
||||
|
||||
# 发送数据
|
||||
# mv[start:end] 创建新的memoryview切片,不复制数据
|
||||
sent = s.send(mv[total_sent : total_sent + to_send])
|
||||
|
||||
if sent == 0:
|
||||
raise RuntimeError("Socket连接断开 (sent=0)")
|
||||
|
||||
total_sent += sent
|
||||
|
||||
except OSError as e:
|
||||
# EAGAIN (11) or EWOULDBLOCK
|
||||
if e.args[0] == 11:
|
||||
time.sleep(0.01)
|
||||
continue
|
||||
# 重新抛出其他错误 (如 ECONNRESET)
|
||||
raise e
|
||||
|
||||
def _parse_response(self, response):
|
||||
"""
|
||||
解析HTTP响应
|
||||
|
||||
参数:
|
||||
response: HTTP响应字符串
|
||||
|
||||
返回:
|
||||
dict: 解析后的JSON响应体
|
||||
"""
|
||||
# 分离HTTP头部和body
|
||||
if "\r\n\r\n" in response:
|
||||
header, body = response.split("\r\n\r\n", 1)
|
||||
elif "\n\n" in response:
|
||||
header, body = response.split("\n\n", 1)
|
||||
else:
|
||||
raise RuntimeError("无效的HTTP响应格式")
|
||||
|
||||
# 解析JSON
|
||||
try:
|
||||
result = json.loads(body)
|
||||
return result
|
||||
except Exception as e:
|
||||
print("JSON解析错误: " + str(e))
|
||||
print("响应内容: " + body)
|
||||
return {
|
||||
"type": "status_detection_response",
|
||||
"success": False,
|
||||
"error": "JSON解析失败: " + str(e)
|
||||
}
|
||||
|
||||
def disconnect(self):
|
||||
"""断开网络连接"""
|
||||
if self._sta and self._sta.isconnected():
|
||||
self._sta.disconnect()
|
||||
print("WiFi已断开")
|
||||
self._is_connected = False
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
"""是否已连接网络"""
|
||||
return self._is_connected
|
||||
|
||||
@property
|
||||
def server_host(self):
|
||||
"""服务器主机地址"""
|
||||
return self._server_host
|
||||
|
||||
@property
|
||||
def server_port(self):
|
||||
"""服务器端口"""
|
||||
return self._server_port
|
||||
3
k230/test.py
Normal file
3
k230/test.py
Normal file
@ -0,0 +1,3 @@
|
||||
# 查看/sdcard下的文件
|
||||
import os
|
||||
print(os.listdir('/sdcard'))
|
||||
47
k230/心情检测接口.md
Normal file
47
k230/心情检测接口.md
Normal file
@ -0,0 +1,47 @@
|
||||
**接口描述**: 发送图像数据,检测用户的心情和皮肤状态
|
||||
|
||||
**请求方式**: `POST /api/detection/analyze`
|
||||
|
||||
**请求参数**:
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "status_detection", // 类型:状态检测
|
||||
"image": "string", // 图像数据,base64编码的RGB888格式
|
||||
"timestamp": "2024-01-01 12:30:45" // 可选,图像采集时间
|
||||
}
|
||||
```
|
||||
|
||||
**字段说明**:
|
||||
|
||||
- `type`: 固定值 "status_detection"
|
||||
- `image`: base64编码的RGB888格式图像数据
|
||||
- `timestamp`: 图像采集的时间(可选),精确到秒
|
||||
|
||||
**响应参数**:
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "status_detection_response",
|
||||
"success": true,
|
||||
"emotion": "happy", // 检测到的心情,7选1
|
||||
"emotion_confidence": 0.92, // 可选,情绪识别置信度(0-1)
|
||||
"skin_status": {
|
||||
"acne": 0-100, // 是否有痘痘
|
||||
"wrinkles": 0-100, // 是否有皱纹
|
||||
"pores": 0-100, // 是否有毛孔粗大
|
||||
"dark_circles": 0-100 // 是否有黑眼圈
|
||||
},
|
||||
"timestamp": "2024-01-01 12:30:46" // 检测完成时间
|
||||
}
|
||||
```
|
||||
|
||||
**emotion可选值**:
|
||||
|
||||
- `angry`: 愤怒
|
||||
- `disgust`: 厌恶
|
||||
- `fear`: 恐惧
|
||||
- `happy`: 快乐
|
||||
- `sad`: 悲伤
|
||||
- `surprise`: 惊讶
|
||||
- `neutral`: 中性
|
||||
Loading…
x
Reference in New Issue
Block a user