feat: ✨ 音频播放以及录制模块!
This commit is contained in:
parent
8efb28d725
commit
20ea76dccb
124
k230/03test_audio.py
Normal file
124
k230/03test_audio.py
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
# # 03test_audio.py
|
||||||
|
# # 音频测试程序:先录制再播放
|
||||||
|
# # 适用于庐山派 K230-CanMV 开发板
|
||||||
|
|
||||||
|
# from audio_module import AudioRecorder, AudioPlayer
|
||||||
|
# import os
|
||||||
|
# import sys
|
||||||
|
# import time
|
||||||
|
|
||||||
|
# sys.path.append("/sdcard")
|
||||||
|
# os.exitpoint(os.EXITPOINT_ENABLE)
|
||||||
|
|
||||||
|
|
||||||
|
# # 配置
|
||||||
|
# AUDIO_FILE = '/sdcard/test_record.wav'
|
||||||
|
# RECORD_DURATION = 5 # 录制5秒
|
||||||
|
|
||||||
|
# print("=" * 40)
|
||||||
|
# print("音频录制和播放测试")
|
||||||
|
# print("=" * 40)
|
||||||
|
|
||||||
|
# # ===== 录制音频 =====
|
||||||
|
# print("\n[1] 开始录制音频...")
|
||||||
|
# print(" 时长: {}秒".format(RECORD_DURATION))
|
||||||
|
# print(" 请对着麦克风说话...")
|
||||||
|
|
||||||
|
# recorder = AudioRecorder()
|
||||||
|
# result = recorder.record_to_file(AUDIO_FILE, RECORD_DURATION)
|
||||||
|
|
||||||
|
# if result:
|
||||||
|
# print(" 录制完成!")
|
||||||
|
# else:
|
||||||
|
# print(" 录制失败!")
|
||||||
|
|
||||||
|
# # 等待一下
|
||||||
|
# time.sleep(1)
|
||||||
|
|
||||||
|
# # ===== 播放音频 =====
|
||||||
|
# print("\n[2] 开始播放录制的音频...")
|
||||||
|
# print(" 请确保已插入3.5mm耳机")
|
||||||
|
|
||||||
|
# player = AudioPlayer()
|
||||||
|
# result = player.play_file(AUDIO_FILE)
|
||||||
|
|
||||||
|
# if result:
|
||||||
|
# print(" 播放完成!")
|
||||||
|
# else:
|
||||||
|
# print(" 播放失败!")
|
||||||
|
|
||||||
|
# print("\n" + "=" * 40)
|
||||||
|
# print("测试结束")
|
||||||
|
# print("=" * 40)
|
||||||
|
|
||||||
|
|
||||||
|
# ===== 同时录播测试 =====
|
||||||
|
# 使用流式接口实现:麦克风录制 -> 耳机播放
|
||||||
|
|
||||||
|
from media.pyaudio import *
|
||||||
|
from media.media import *
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
sys.path.append("/sdcard")
|
||||||
|
os.exitpoint(os.EXITPOINT_ENABLE)
|
||||||
|
|
||||||
|
|
||||||
|
DURATION = 15 # 持续15秒
|
||||||
|
RATE = 44100
|
||||||
|
CHANNELS = 1
|
||||||
|
CHUNK = int(RATE / 25)
|
||||||
|
FORMAT = paInt16
|
||||||
|
|
||||||
|
print("=" * 40)
|
||||||
|
print("同时录播测试 ({}秒)".format(DURATION))
|
||||||
|
print("请插入耳机并对着麦克风说话")
|
||||||
|
print("=" * 40)
|
||||||
|
|
||||||
|
try:
|
||||||
|
p = PyAudio()
|
||||||
|
p.initialize(CHUNK)
|
||||||
|
MediaManager.init()
|
||||||
|
|
||||||
|
# 创建输入流 (录制)
|
||||||
|
input_stream = p.open(
|
||||||
|
format=FORMAT,
|
||||||
|
channels=CHANNELS,
|
||||||
|
rate=RATE,
|
||||||
|
input=True,
|
||||||
|
frames_per_buffer=CHUNK
|
||||||
|
)
|
||||||
|
|
||||||
|
# 创建输出流 (播放)
|
||||||
|
output_stream = p.open(
|
||||||
|
format=FORMAT,
|
||||||
|
channels=CHANNELS,
|
||||||
|
rate=RATE,
|
||||||
|
output=True,
|
||||||
|
frames_per_buffer=CHUNK
|
||||||
|
)
|
||||||
|
|
||||||
|
print("开始录播...")
|
||||||
|
|
||||||
|
# 实时录播
|
||||||
|
total_chunks = int(RATE / CHUNK * DURATION)
|
||||||
|
for i in range(total_chunks):
|
||||||
|
data = input_stream.read()
|
||||||
|
output_stream.write(data)
|
||||||
|
try:
|
||||||
|
os.exitpoint()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("用户停止")
|
||||||
|
break
|
||||||
|
|
||||||
|
print("录播完成!")
|
||||||
|
|
||||||
|
except BaseException as e:
|
||||||
|
print("异常: ", e)
|
||||||
|
finally:
|
||||||
|
input_stream.stop_stream()
|
||||||
|
output_stream.stop_stream()
|
||||||
|
input_stream.close()
|
||||||
|
output_stream.close()
|
||||||
|
p.terminate()
|
||||||
|
MediaManager.deinit()
|
||||||
|
print("资源已释放")
|
||||||
314
k230/audio_module.py
Normal file
314
k230/audio_module.py
Normal file
@ -0,0 +1,314 @@
|
|||||||
|
# audio_module.py
|
||||||
|
# 音频录制和播放模块
|
||||||
|
# 适用于庐山派 K230-CanMV 开发板
|
||||||
|
|
||||||
|
import os
|
||||||
|
from media.media import *
|
||||||
|
from media.pyaudio import *
|
||||||
|
import media.wave as wave
|
||||||
|
|
||||||
|
|
||||||
|
class AudioRecorder:
|
||||||
|
"""
|
||||||
|
音频录制类 - 使用板载麦克风录制音频
|
||||||
|
|
||||||
|
参数:
|
||||||
|
rate: 采样率 (默认44100Hz)
|
||||||
|
channels: 声道数 (默认1,单声道)
|
||||||
|
format: 采样精度 (默认paInt16,16位)
|
||||||
|
|
||||||
|
使用方式1 - 直接录制到文件:
|
||||||
|
recorder = AudioRecorder()
|
||||||
|
recorder.record_to_file('/sdcard/test.wav', duration=5)
|
||||||
|
recorder.deinit()
|
||||||
|
|
||||||
|
使用方式2 - 流式录制:
|
||||||
|
recorder = AudioRecorder()
|
||||||
|
recorder.start()
|
||||||
|
for i in range(100):
|
||||||
|
data = recorder.read() # 获取一帧数据
|
||||||
|
# 处理数据...
|
||||||
|
recorder.save('/sdcard/test.wav', frames)
|
||||||
|
recorder.stop()
|
||||||
|
recorder.deinit()
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, rate=44100, channels=1, format=paInt16):
|
||||||
|
"""
|
||||||
|
初始化音频录制器
|
||||||
|
|
||||||
|
参数:
|
||||||
|
rate: 采样率,默认44100Hz
|
||||||
|
channels: 声道数,默认1(单声道)
|
||||||
|
format: 采样精度,默认paInt16(16位)
|
||||||
|
"""
|
||||||
|
self._rate = rate
|
||||||
|
self._channels = channels
|
||||||
|
self._format = format
|
||||||
|
self._chunk = int(rate / 25)
|
||||||
|
|
||||||
|
self._pyaudio = None
|
||||||
|
self._stream = None
|
||||||
|
self._is_running = False
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""
|
||||||
|
启动录制流(流式录制模式)
|
||||||
|
调用后可以使用 read() 方法获取音频数据
|
||||||
|
"""
|
||||||
|
if self._is_running:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._pyaudio = PyAudio()
|
||||||
|
self._pyaudio.initialize(self._chunk)
|
||||||
|
MediaManager.init()
|
||||||
|
|
||||||
|
self._stream = self._pyaudio.open(
|
||||||
|
format=self._format,
|
||||||
|
channels=self._channels,
|
||||||
|
rate=self._rate,
|
||||||
|
input=True,
|
||||||
|
frames_per_buffer=self._chunk
|
||||||
|
)
|
||||||
|
self._is_running = True
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
"""
|
||||||
|
读取一帧音频数据(流式录制模式)
|
||||||
|
|
||||||
|
返回:
|
||||||
|
bytes: 一帧音频数据
|
||||||
|
"""
|
||||||
|
if not self._is_running:
|
||||||
|
raise RuntimeError("录制器未启动,请先调用 start()")
|
||||||
|
return self._stream.read()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止录制流"""
|
||||||
|
if self._stream:
|
||||||
|
self._stream.stop_stream()
|
||||||
|
self._stream.close()
|
||||||
|
self._stream = None
|
||||||
|
self._is_running = False
|
||||||
|
|
||||||
|
def deinit(self):
|
||||||
|
"""释放所有资源"""
|
||||||
|
self.stop()
|
||||||
|
if self._pyaudio:
|
||||||
|
self._pyaudio.terminate()
|
||||||
|
self._pyaudio = None
|
||||||
|
MediaManager.deinit()
|
||||||
|
|
||||||
|
def save(self, filename, frames):
|
||||||
|
"""
|
||||||
|
将音频帧数据保存为WAV文件
|
||||||
|
|
||||||
|
参数:
|
||||||
|
filename: 保存的文件路径
|
||||||
|
frames: 音频帧数据列表
|
||||||
|
"""
|
||||||
|
wf = wave.open(filename, 'wb')
|
||||||
|
wf.set_channels(self._channels)
|
||||||
|
wf.set_sampwidth(self._pyaudio.get_sample_size(self._format))
|
||||||
|
wf.set_framerate(self._rate)
|
||||||
|
wf.write_frames(b''.join(frames))
|
||||||
|
wf.close()
|
||||||
|
|
||||||
|
def record_to_file(self, filename, duration):
|
||||||
|
"""
|
||||||
|
直接录制音频到WAV文件
|
||||||
|
|
||||||
|
参数:
|
||||||
|
filename: 保存的文件路径 (如 '/sdcard/test.wav')
|
||||||
|
duration: 录制时长 (秒)
|
||||||
|
|
||||||
|
返回:
|
||||||
|
bool: True成功, False失败
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
frames = []
|
||||||
|
total_chunks = int(self._rate / self._chunk * duration)
|
||||||
|
|
||||||
|
for i in range(total_chunks):
|
||||||
|
data = self.read()
|
||||||
|
frames.append(data)
|
||||||
|
# 检查退出信号
|
||||||
|
try:
|
||||||
|
os.exitpoint()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
break
|
||||||
|
|
||||||
|
self.save(filename, frames)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except BaseException as e:
|
||||||
|
print("录制异常: ", e)
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
self.stop()
|
||||||
|
self.deinit()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_running(self):
|
||||||
|
"""是否正在录制"""
|
||||||
|
return self._is_running
|
||||||
|
|
||||||
|
@property
|
||||||
|
def chunk_size(self):
|
||||||
|
"""每帧数据大小"""
|
||||||
|
return self._chunk
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rate(self):
|
||||||
|
"""采样率"""
|
||||||
|
return self._rate
|
||||||
|
|
||||||
|
|
||||||
|
class AudioPlayer:
|
||||||
|
"""
|
||||||
|
音频播放类 - 通过3.5mm耳机接口播放音频
|
||||||
|
|
||||||
|
使用方式1 - 直接播放文件:
|
||||||
|
player = AudioPlayer()
|
||||||
|
player.play_file('/sdcard/test.wav')
|
||||||
|
player.deinit()
|
||||||
|
|
||||||
|
使用方式2 - 流式播放:
|
||||||
|
player = AudioPlayer(rate=44100, channels=1)
|
||||||
|
player.start()
|
||||||
|
player.write(audio_data) # 写入音频数据
|
||||||
|
player.stop()
|
||||||
|
player.deinit()
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, rate=44100, channels=1, format=paInt16):
|
||||||
|
"""
|
||||||
|
初始化音频播放器
|
||||||
|
|
||||||
|
参数:
|
||||||
|
rate: 采样率,默认44100Hz
|
||||||
|
channels: 声道数,默认1(单声道)
|
||||||
|
format: 采样精度,默认paInt16(16位)
|
||||||
|
"""
|
||||||
|
self._rate = rate
|
||||||
|
self._channels = channels
|
||||||
|
self._format = format
|
||||||
|
self._chunk = int(rate / 25)
|
||||||
|
|
||||||
|
self._pyaudio = None
|
||||||
|
self._stream = None
|
||||||
|
self._is_running = False
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""
|
||||||
|
启动播放流(流式播放模式)
|
||||||
|
调用后可以使用 write() 方法写入音频数据
|
||||||
|
"""
|
||||||
|
if self._is_running:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._pyaudio = PyAudio()
|
||||||
|
self._pyaudio.initialize(self._chunk)
|
||||||
|
MediaManager.init()
|
||||||
|
|
||||||
|
self._stream = self._pyaudio.open(
|
||||||
|
format=self._format,
|
||||||
|
channels=self._channels,
|
||||||
|
rate=self._rate,
|
||||||
|
output=True,
|
||||||
|
frames_per_buffer=self._chunk
|
||||||
|
)
|
||||||
|
self._is_running = True
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
"""
|
||||||
|
写入音频数据进行播放(流式播放模式)
|
||||||
|
|
||||||
|
参数:
|
||||||
|
data: 音频数据 (bytes)
|
||||||
|
"""
|
||||||
|
if not self._is_running:
|
||||||
|
raise RuntimeError("播放器未启动,请先调用 start()")
|
||||||
|
self._stream.write(data)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止播放流"""
|
||||||
|
if self._stream:
|
||||||
|
self._stream.stop_stream()
|
||||||
|
self._stream.close()
|
||||||
|
self._stream = None
|
||||||
|
self._is_running = False
|
||||||
|
|
||||||
|
def deinit(self):
|
||||||
|
"""释放所有资源"""
|
||||||
|
self.stop()
|
||||||
|
if self._pyaudio:
|
||||||
|
self._pyaudio.terminate()
|
||||||
|
self._pyaudio = None
|
||||||
|
MediaManager.deinit()
|
||||||
|
|
||||||
|
def play_file(self, filename):
|
||||||
|
"""
|
||||||
|
播放WAV音频文件
|
||||||
|
|
||||||
|
参数:
|
||||||
|
filename: WAV文件路径 (如 '/sdcard/test.wav')
|
||||||
|
|
||||||
|
返回:
|
||||||
|
bool: True成功, False失败
|
||||||
|
"""
|
||||||
|
wf = None
|
||||||
|
try:
|
||||||
|
wf = wave.open(filename, 'rb')
|
||||||
|
chunk = int(wf.get_framerate() / 25)
|
||||||
|
|
||||||
|
self._pyaudio = PyAudio()
|
||||||
|
self._pyaudio.initialize(chunk)
|
||||||
|
MediaManager.init()
|
||||||
|
|
||||||
|
self._stream = self._pyaudio.open(
|
||||||
|
format=self._pyaudio.get_format_from_width(wf.get_sampwidth()),
|
||||||
|
channels=wf.get_channels(),
|
||||||
|
rate=wf.get_framerate(),
|
||||||
|
output=True,
|
||||||
|
frames_per_buffer=chunk
|
||||||
|
)
|
||||||
|
self._is_running = True
|
||||||
|
|
||||||
|
data = wf.read_frames(chunk)
|
||||||
|
while data:
|
||||||
|
self._stream.write(data)
|
||||||
|
data = wf.read_frames(chunk)
|
||||||
|
# 检查退出信号
|
||||||
|
try:
|
||||||
|
os.exitpoint()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
break
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except BaseException as e:
|
||||||
|
print("播放异常: ", e)
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
if wf:
|
||||||
|
wf.close()
|
||||||
|
self.stop()
|
||||||
|
self.deinit()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_running(self):
|
||||||
|
"""是否正在播放"""
|
||||||
|
return self._is_running
|
||||||
|
|
||||||
|
@property
|
||||||
|
def chunk_size(self):
|
||||||
|
"""每帧数据大小"""
|
||||||
|
return self._chunk
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rate(self):
|
||||||
|
"""采样率"""
|
||||||
|
return self._rate
|
||||||
Loading…
x
Reference in New Issue
Block a user