Opus——基于Python实现opus音频编解码

文章目录

  • 安装环境
  • 编解码程序
  • 安装环境

  • 安装环境:
  • sudo apt-get update
    sudo apt-get install build-essential
    # pyaudio
    sudo apt-get install libportaudio2 libportaudiocpp0 portaudio19-dev
    # python opuslib
    sudo apt-get install libopus-dev
    
  • 安装python包:
  • pip install opuslib pyaudio wave numpy
    

    编解码程序

    – 超级容易踩坑的注意事项:

    1. opus编码时读取音频的字节默认是2,且不可修改;
    2. 送入encoder的音频长度为codec.frame_duration_size * codec.sample_width ,如果frame_duration_size是960,则真正送入encoder的pcm_data长度为1920;
    3. 当最后一帧不满足长度要求时,需要进行静默帧padding,补充到该长度,在padding最后一帧时,也需要按上上述的策略执行。
    import opuslib
    import wave
    import struct
    import numpy as np
    import opuslib
    class OpusCodec:
        def __init__(self, sample_rate=16000, channels=1, sample_width=2, application='audio', frame_duration_ms=60, encode=True, decode=True):
            self.encoder = encode
            self.decoder = decode
            self.sample_rate = sample_rate
            self.channels = channels
            self.sample_width = sample_width
            self.input_pcm_chunk_size = int(sample_rate * (frame_duration_ms / 1000)) * channels * sample_width
            self.frame_duration_size = int((sample_rate * frame_duration_ms) / 1000)
            # 初始化编码器和解码器
            if encode:
                self.encoder = opuslib.Encoder(fs=sample_rate, channels=channels, application=application)
            if decode:
                self.decoder = opuslib.Decoder(fs=sample_rate, channels=channels)
    
        def encode(self, pcm_data):
            """
            Encode PCM audio data to Opus format.
    
            :param pcm_data: PCM audio data as a byte string.
            :return: Opus encoded data as a byte string.
            """
            if self.encoder is None:
                raise ValueError('Opus encoder is not initialized.')
            return self.encoder.encode(pcm_data, self.frame_duration_size)
    
        def decode(self, opus_data):
            """
            Decode Opus audio data to PCM format.
    
            :param opus_data: Opus encoded data as a byte string.
            :return: PCM audio data as a byte string.
            """
            if self.decoder is None:
                raise ValueError('Opus decoder is not initialized.')
            return self.decoder.decode(opus_data, self.frame_duration_size)
    
    
    if __name__ == '__main__':
        # 使用示例
        # RATE = 16000
        # input_filename = "example_recording.wav"
        # output_filename = "example_recording_opus.wav"
        RATE = 24000
        input_filename = "bytedance_tts_output_24k.wav"
        output_filename = "bytedance_tts_output_24k_opus.wav"
        
        # handler = OpusAudioHandler(sample_rate=16000, channels=1, frame_duration_ms=60)
        codec = OpusCodec(sample_rate=RATE, channels=1, application='audio', frame_duration_ms=60, encode=True, decode=True)
    
        wave_read = wave.open(input_filename, "rb")
        wave_write = wave.open(output_filename, "wb")
        wave_write.setnchannels(1)
        wave_write.setframerate(RATE)
        wave_write.setsampwidth(2)
        
        while True:
            # chunk_size = codec.input_pcm_chunk_size
            chunk_size = codec.frame_duration_size
            # 读取一帧原始音频数据
            pcm_data = wave_read.readframes(chunk_size) 
            # pcm_data: 按照chunk_size截取,但是sampwidth是2,因此每个音符是两个字节,所以总长度乘2
            print(f"Read {len(pcm_data)} bytes.")
            if len(pcm_data) == 0:
                break
            if len(pcm_data) < codec.input_pcm_chunk_size:
                # TODO: opus最后一帧解码有问题
                print(f"Warning: Read {len(pcm_data)} bytes, expected {chunk_size * codec.sample_width}.")
                padding_length_bytes = chunk_size * codec.sample_width - len(pcm_data)
                padding = b'\x01' * padding_length_bytes
                pcm_data += padding
                print(f"Padded to {len(pcm_data)} bytes.")
            encoded_data = codec.encode(pcm_data)
            decoded_data = codec.decode(encoded_data)
            
            # 写入解码后的数据
            wave_write.writeframes(decoded_data)
        wave_read.close()
        wave_write.close()
    
    

    作者:Irving.Gao

    物联沃分享整理
    物联沃-IOTWORD物联网 » Opus——基于Python实现opus音频编解码

    发表回复