From 20ea76dccbba3cef9ce42b114bc6af3d1788aa2c Mon Sep 17 00:00:00 2001 From: wds Date: Thu, 1 Jan 2026 15:06:31 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20:sparkles:=20=E9=9F=B3=E9=A2=91?= =?UTF-8?q?=E6=92=AD=E6=94=BE=E4=BB=A5=E5=8F=8A=E5=BD=95=E5=88=B6=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=EF=BC=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- k230/03test_audio.py | 124 +++++++++++++++++ k230/audio_module.py | 314 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 438 insertions(+) create mode 100644 k230/03test_audio.py create mode 100644 k230/audio_module.py diff --git a/k230/03test_audio.py b/k230/03test_audio.py new file mode 100644 index 0000000..8167402 --- /dev/null +++ b/k230/03test_audio.py @@ -0,0 +1,124 @@ +# # 03test_audio.py +# # 音频测试程序:先录制再播放 +# # 适用于庐山派 K230-CanMV 开发板 + +# from audio_module import AudioRecorder, AudioPlayer +# import os +# import sys +# import time + +# sys.path.append("/sdcard") +# os.exitpoint(os.EXITPOINT_ENABLE) + + +# # 配置 +# AUDIO_FILE = '/sdcard/test_record.wav' +# RECORD_DURATION = 5 # 录制5秒 + +# print("=" * 40) +# print("音频录制和播放测试") +# print("=" * 40) + +# # ===== 录制音频 ===== +# print("\n[1] 开始录制音频...") +# print(" 时长: {}秒".format(RECORD_DURATION)) +# print(" 请对着麦克风说话...") + +# recorder = AudioRecorder() +# result = recorder.record_to_file(AUDIO_FILE, RECORD_DURATION) + +# if result: +# print(" 录制完成!") +# else: +# print(" 录制失败!") + +# # 等待一下 +# time.sleep(1) + +# # ===== 播放音频 ===== +# print("\n[2] 开始播放录制的音频...") +# print(" 请确保已插入3.5mm耳机") + +# player = AudioPlayer() +# result = player.play_file(AUDIO_FILE) + +# if result: +# print(" 播放完成!") +# else: +# print(" 播放失败!") + +# print("\n" + "=" * 40) +# print("测试结束") +# print("=" * 40) + + +# ===== 同时录播测试 ===== +# 使用流式接口实现:麦克风录制 -> 耳机播放 + +from media.pyaudio import * +from media.media import * +import os +import sys +sys.path.append("/sdcard") +os.exitpoint(os.EXITPOINT_ENABLE) + + +DURATION = 15 # 持续15秒 +RATE = 44100 +CHANNELS = 1 +CHUNK = int(RATE / 25) +FORMAT = paInt16 + +print("=" * 40) +print("同时录播测试 ({}秒)".format(DURATION)) +print("请插入耳机并对着麦克风说话") +print("=" * 40) + +try: + p = PyAudio() + p.initialize(CHUNK) + MediaManager.init() + + # 创建输入流 (录制) + input_stream = p.open( + format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=True, + frames_per_buffer=CHUNK + ) + + # 创建输出流 (播放) + output_stream = p.open( + format=FORMAT, + channels=CHANNELS, + rate=RATE, + output=True, + frames_per_buffer=CHUNK + ) + + print("开始录播...") + + # 实时录播 + total_chunks = int(RATE / CHUNK * DURATION) + for i in range(total_chunks): + data = input_stream.read() + output_stream.write(data) + try: + os.exitpoint() + except KeyboardInterrupt: + print("用户停止") + break + + print("录播完成!") + +except BaseException as e: + print("异常: ", e) +finally: + input_stream.stop_stream() + output_stream.stop_stream() + input_stream.close() + output_stream.close() + p.terminate() + MediaManager.deinit() + print("资源已释放") diff --git a/k230/audio_module.py b/k230/audio_module.py new file mode 100644 index 0000000..26ee699 --- /dev/null +++ b/k230/audio_module.py @@ -0,0 +1,314 @@ +# audio_module.py +# 音频录制和播放模块 +# 适用于庐山派 K230-CanMV 开发板 + +import os +from media.media import * +from media.pyaudio import * +import media.wave as wave + + +class AudioRecorder: + """ + 音频录制类 - 使用板载麦克风录制音频 + + 参数: + rate: 采样率 (默认44100Hz) + channels: 声道数 (默认1,单声道) + format: 采样精度 (默认paInt16,16位) + + 使用方式1 - 直接录制到文件: + recorder = AudioRecorder() + recorder.record_to_file('/sdcard/test.wav', duration=5) + recorder.deinit() + + 使用方式2 - 流式录制: + recorder = AudioRecorder() + recorder.start() + for i in range(100): + data = recorder.read() # 获取一帧数据 + # 处理数据... + recorder.save('/sdcard/test.wav', frames) + recorder.stop() + recorder.deinit() + """ + + def __init__(self, rate=44100, channels=1, format=paInt16): + """ + 初始化音频录制器 + + 参数: + rate: 采样率,默认44100Hz + channels: 声道数,默认1(单声道) + format: 采样精度,默认paInt16(16位) + """ + self._rate = rate + self._channels = channels + self._format = format + self._chunk = int(rate / 25) + + self._pyaudio = None + self._stream = None + self._is_running = False + + def start(self): + """ + 启动录制流(流式录制模式) + 调用后可以使用 read() 方法获取音频数据 + """ + if self._is_running: + return + + self._pyaudio = PyAudio() + self._pyaudio.initialize(self._chunk) + MediaManager.init() + + self._stream = self._pyaudio.open( + format=self._format, + channels=self._channels, + rate=self._rate, + input=True, + frames_per_buffer=self._chunk + ) + self._is_running = True + + def read(self): + """ + 读取一帧音频数据(流式录制模式) + + 返回: + bytes: 一帧音频数据 + """ + if not self._is_running: + raise RuntimeError("录制器未启动,请先调用 start()") + return self._stream.read() + + def stop(self): + """停止录制流""" + if self._stream: + self._stream.stop_stream() + self._stream.close() + self._stream = None + self._is_running = False + + def deinit(self): + """释放所有资源""" + self.stop() + if self._pyaudio: + self._pyaudio.terminate() + self._pyaudio = None + MediaManager.deinit() + + def save(self, filename, frames): + """ + 将音频帧数据保存为WAV文件 + + 参数: + filename: 保存的文件路径 + frames: 音频帧数据列表 + """ + wf = wave.open(filename, 'wb') + wf.set_channels(self._channels) + wf.set_sampwidth(self._pyaudio.get_sample_size(self._format)) + wf.set_framerate(self._rate) + wf.write_frames(b''.join(frames)) + wf.close() + + def record_to_file(self, filename, duration): + """ + 直接录制音频到WAV文件 + + 参数: + filename: 保存的文件路径 (如 '/sdcard/test.wav') + duration: 录制时长 (秒) + + 返回: + bool: True成功, False失败 + """ + try: + self.start() + + frames = [] + total_chunks = int(self._rate / self._chunk * duration) + + for i in range(total_chunks): + data = self.read() + frames.append(data) + # 检查退出信号 + try: + os.exitpoint() + except KeyboardInterrupt: + break + + self.save(filename, frames) + return True + + except BaseException as e: + print("录制异常: ", e) + return False + finally: + self.stop() + self.deinit() + + @property + def is_running(self): + """是否正在录制""" + return self._is_running + + @property + def chunk_size(self): + """每帧数据大小""" + return self._chunk + + @property + def rate(self): + """采样率""" + return self._rate + + +class AudioPlayer: + """ + 音频播放类 - 通过3.5mm耳机接口播放音频 + + 使用方式1 - 直接播放文件: + player = AudioPlayer() + player.play_file('/sdcard/test.wav') + player.deinit() + + 使用方式2 - 流式播放: + player = AudioPlayer(rate=44100, channels=1) + player.start() + player.write(audio_data) # 写入音频数据 + player.stop() + player.deinit() + """ + + def __init__(self, rate=44100, channels=1, format=paInt16): + """ + 初始化音频播放器 + + 参数: + rate: 采样率,默认44100Hz + channels: 声道数,默认1(单声道) + format: 采样精度,默认paInt16(16位) + """ + self._rate = rate + self._channels = channels + self._format = format + self._chunk = int(rate / 25) + + self._pyaudio = None + self._stream = None + self._is_running = False + + def start(self): + """ + 启动播放流(流式播放模式) + 调用后可以使用 write() 方法写入音频数据 + """ + if self._is_running: + return + + self._pyaudio = PyAudio() + self._pyaudio.initialize(self._chunk) + MediaManager.init() + + self._stream = self._pyaudio.open( + format=self._format, + channels=self._channels, + rate=self._rate, + output=True, + frames_per_buffer=self._chunk + ) + self._is_running = True + + def write(self, data): + """ + 写入音频数据进行播放(流式播放模式) + + 参数: + data: 音频数据 (bytes) + """ + if not self._is_running: + raise RuntimeError("播放器未启动,请先调用 start()") + self._stream.write(data) + + def stop(self): + """停止播放流""" + if self._stream: + self._stream.stop_stream() + self._stream.close() + self._stream = None + self._is_running = False + + def deinit(self): + """释放所有资源""" + self.stop() + if self._pyaudio: + self._pyaudio.terminate() + self._pyaudio = None + MediaManager.deinit() + + def play_file(self, filename): + """ + 播放WAV音频文件 + + 参数: + filename: WAV文件路径 (如 '/sdcard/test.wav') + + 返回: + bool: True成功, False失败 + """ + wf = None + try: + wf = wave.open(filename, 'rb') + chunk = int(wf.get_framerate() / 25) + + self._pyaudio = PyAudio() + self._pyaudio.initialize(chunk) + MediaManager.init() + + self._stream = self._pyaudio.open( + format=self._pyaudio.get_format_from_width(wf.get_sampwidth()), + channels=wf.get_channels(), + rate=wf.get_framerate(), + output=True, + frames_per_buffer=chunk + ) + self._is_running = True + + data = wf.read_frames(chunk) + while data: + self._stream.write(data) + data = wf.read_frames(chunk) + # 检查退出信号 + try: + os.exitpoint() + except KeyboardInterrupt: + break + + return True + + except BaseException as e: + print("播放异常: ", e) + return False + finally: + if wf: + wf.close() + self.stop() + self.deinit() + + @property + def is_running(self): + """是否正在播放""" + return self._is_running + + @property + def chunk_size(self): + """每帧数据大小""" + return self._chunk + + @property + def rate(self): + """采样率""" + return self._rate