python下的无线电台输出音频降噪代码
python下的无线电台输出音频降噪代码
前言
由于业余无线电短波电台的工作特点,在短波通讯过程中常常受强大的白噪音和背噪音干扰影响收听体验,于是编写了 audio_denoise.py代码文件,使用电脑来对电台输出的音频信号进行降噪处理,然后通过电脑扬声器来输出降噪后的音频,并自动记录音频文件,运行结束时生成以"output_时间"命名的音频文件。
代码运行时,在保持语音清晰度的同时,可有效抑制以下噪声:
空调/风扇等稳态噪声
键盘敲击等瞬时噪声
50/60Hz电源干扰
高频嘶声(hiss)
低频噪声
一、audio_denoise是什么?
audio_denoise.py代码文件是基于python 的一种工具,该工具是为了解决业余短波无线电台输出音频时白噪和背噪音而创建的。
主要特点包括:
多级滤波系统:
使用Chebyshev II型滤波器进行预处理(150Hz高通)
Butterworth低通滤波器后处理(3000Hz低通)
级联滤波显著降低带外噪声
降噪算法:
noisereduce参数(n_fft=1024,hop_length=256)
增加预处理和后处理阶段
采用严格的prop_decrease=1.0
智能增益控制:
自动增益控制(AGC)保持目标电平
10帧RMS平滑处理避免增益突变
增益限制在20倍以内防止过载
多级噪声门:
基于平均RMS的智能门限控制
噪声抑制可达-20dB(0.1倍衰减)
二、使用步骤
1.运行前安装所需要的库
运行前请确保安装所需库:
bash
复制
pip install pyaudio noisereduce scipy matplotlib numpy
引入库:
import pyaudio
import numpy as np
import noisereduce as nr
import matplotlib.pyplot as plt
import threading
import wave
import time
from queue import Queue
from scipy.signal import butter, lfilter, filtfilt
2.参数配置
代码如下:
======================== 参数配置 ========================
FORMAT = pyaudio.paInt16 # 音频格式
CHANNELS = 1 # 单声道
RATE = 16000 # 采样率
CHUNK = 8192 # 音频块大小
NOISE_DURATION = 3 # 噪声采样时长
SAVE_FILE = “output.wav” # 初始文件名(将被覆盖)
GAIN = 1.5 # 调整增益避免削波
NOISE_GATE_THRESH = 0.025 # 噪声门限阈值
========================================================
3.数据代入
设置支持中文的字体
plt.rcParams[‘font.sans-serif’] = [‘SimHei’] # 使用黑体
plt.rcParams[‘axes.unicode_minus’] = False # 解决负号显示问题
初始化PyAudio
p = pyaudio.PyAudio()
——————— 1. 复合滤波器设计 ——————
def butter_bandpass(lowcut, highcut, fs, order=4):
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype=‘band’)
return b, a
def butter_lowpass(cutoff, fs, order=3):
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype=‘low’)
return b, a
初始化滤波器组
b_band, a_band = butter_bandpass(80, 4000, RATE)
b_low, a_low = butter_lowpass(3500, RATE)
——————- 2. 录制噪声样本 ———————
print(“请保持安静,正在录制噪声样本…”)
try:
input_stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
except Exception as e:
print(f"音频设备初始化失败: {e}")
p.terminate()
exit(1)
noise_frames = []
for _ in range(int(RATE / CHUNK * NOISE_DURATION)):
noise_frames.append(input_stream.read(CHUNK))
input_stream.stop_stream()
input_stream.close()
noise_sample = np.frombuffer(b’'.join(noise_frames), dtype=np.int16).astype(np.float32) / 32768.0
print(“噪声样本录制完成,开始实时降噪和频谱监测…”)
——————- 3. 初始化绘图 ———————–
plt.ion() # 启用交互模式
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(8, 6))
plt.subplots_adjust(hspace=0.8)
波形图配置
x = np.arange(0, CHUNK)
line_wave, = ax1.plot([], [], color=‘deepskyblue’, lw=1)
ax1.set_title(‘实时波形’)
ax1.set_xlim(0, CHUNK)
ax1.set_ylim(-1, 1)
频谱图配置
fft_freq = np.fft.rfftfreq(CHUNK, 1/RATE)
line_spectrum, = ax2.plot([], [], color=‘cyan’, lw=1)
ax2.set_title(‘实时频谱(80-4000Hz)’)
ax2.set_xlim(80, 4000)
ax2.set_ylim(0, 1)
ax2.set_xscale(‘log’)
音量指示条配置
vol_bar = ax3.barh(0, 0, color=‘limegreen’)
ax3.set_title(f’实时音量(GAIN={GAIN})')
ax3.set_xlim(0, 1)
ax3.set_xticks([])
ax3.set_yticks([])
全局变量
processed_audio_buffer = np.zeros(CHUNK)
last_fft_norm = np.zeros(len(fft_freq))
audio_queue = Queue()
zi_band = np.zeros(max(len(a_band), len(b_band))-1)
zi_low = np.zeros(max(len(a_low), len(b_low))-1)
save_thread = None
stop_event = threading.Event()
—————– 4. 增强音频处理回调 ——————–
def audio_callback(in_data, frame_count, time_info, status):
global processed_audio_buffer, zi_band, zi_low
audio_data = np.frombuffer(in_data, dtype=np.int16).astype(np.float32) / 32768.0
# 多级降噪处理
processed_audio = nr.reduce_noise(
y=audio_data,
y_noise=noise_sample,
sr=RATE,
prop_decrease=1.0,
n_fft=2048,
win_length=2048,
hop_length=512,
stationary=True,
thresh_n_mult_nonstationary=1.5
)
# 相位补偿带通滤波
processed_audio, zi_band = lfilter(b_band, a_band, processed_audio, zi=zi_band)
# 动态增益控制+软限幅
rms = np.sqrt(np.mean(processed_audio**2))
dynamic_gain = GAIN * (1 - 0.5 * np.clip(rms, 0, 1))
processed_audio = np.tanh(processed_audio * dynamic_gain)
# 噪声门限处理
if rms < NOISE_GATE_THRESH:
processed_audio *= 0.2
# 零相位低通滤波
processed_audio = filtfilt(b_low, a_low, processed_audio)
audio_queue.put(processed_audio.copy())
processed_audio_buffer = processed_audio
processed_audio_bytes = (processed_audio * 32768).astype(np.int16).tobytes()
return (processed_audio_bytes, pyaudio.paContinue)
——————- 5. 绘图更新函数 ———————-
def update_plot():
global last_fft_norm
# 更新波形
line_wave.set_data(x, processed_audio_buffer)
# 计算平滑频谱
fft = np.abs(np.fft.rfft(processed_audio_buffer * np.hanning(CHUNK)))
max_fft = np.max(fft) if np.max(fft) != 0 else 1
fft_norm = fft / max_fft
# 频谱平滑处理
fft_smoothed = 0.7 * fft_norm + 0.3 * last_fft_norm
last_fft_norm = fft_smoothed
line_spectrum.set_data(fft_freq, fft_smoothed)
# 更新音量条
rms = np.sqrt(np.mean(processed_audio_buffer**2))
vol_bar[0].set_width(rms)
vol_bar[0].set_color('red' if rms > 0.6 else 'limegreen')
plt.pause(0.001)
——————- 6. 音频保存线程 ———————-
def save_audio():
try:
with wave.open(SAVE_FILE, ‘wb’) as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
while not stop_event.is_set():
data_list = []
while not audio_queue.empty():
data = audio_queue.get()
data_list.append(data)
if data_list:
combined_data = np.concatenate(data_list)
audio_bytes = (combined_data * 32768).astype(np.int16).tobytes()
wf.writeframes(audio_bytes)
time.sleep(0.1)
except Exception as e:
print(f"音频保存失败: {e}")
——————- 7. 主程序 ————————–
if name == “main”:
# 生成带时间的文件名
SAVE_FILE = f"output_{time.strftime(‘%Y%m%d_%H%M’)}.wav"
try:
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
output=True,
frames_per_buffer=CHUNK,
stream_callback=audio_callback
)
stream.start_stream()
save_thread = threading.Thread(target=save_audio, daemon=True)
save_thread.start()
# 手动更新绘图循环
while stream.is_active() and plt.fignum_exists(fig.number):
update_plot()
plt.gcf().canvas.flush_events()
except KeyboardInterrupt:
print("检测到键盘中断,停止程序...")
except Exception as e:
print(f"程序运行出错: {e}")
finally:
stop_event.set()
if save_thread is not None:
save_thread.join()
if 'stream' in locals():
stream.stop_stream()
stream.close()
p.terminate()
plt.ioff()
plt.close()
print("实时降噪已停止。")
print(f"音频已保存到:{SAVE_FILE}")
作者:Mohists_Liu