python编写webRTC推拉流脚本,推自定义音频文件,获取音频流写入文件
import asyncio
import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
async def send_sdp(e_sdp):
url = "https://xxxxx:xxxx/rtc/v1/whip/?app=live&stream=livestream"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
data=e_sdp.sdp.encode(), # 将 SDP 字符串编码为字节
headers={
"Content-Type": "application/sdp",
"Content-Length": str(len(e_sdp.sdp))
},
ssl=False # 忽略 SSL 证书验证(不推荐在生产环境中使用)
) as response:
response_data = await response.text()
print("对方的SDP:", response_data)
return RTCSessionDescription(sdp=response_data, type='answer')
async def send_candidate(candidate):
if candidate:
print("收集到的候选:", candidate) # 处理候选,例如打印候选信息
async def run():
pc = RTCPeerConnection()
# 添加本地媒体
player = MediaPlayer('D:\\ceshi\\guoqing.mp4')
# player = MediaPlayer('D:\\ceshi\\guoqing.mp4')
pc.addTrack(player.video)
pc.addTrack(player.audio) # 确保使用 audio
# 监听 ICE 候选
pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))
# 创建 offer
offer = await pc.createOffer()
print("本地生成的SDP:", offer.sdp) # 打印本地 SDP
await pc.setLocalDescription(offer)
# 发送 offer 并接收 answer
answer = await send_sdp(offer)
# 设置远程描述
await pc.setRemoteDescription(answer)
# 监听 ICE 连接状态变化
def on_connection_state_change():
print("连接状态:", pc.connectionState)
print("ICE 连接状态:", pc.iceConnectionState)
if pc.connectionState == "connected":
print("连接已建立!")
elif pc.connectionState == "disconnected":
print("连接已断开!")
elif pc.connectionState == "failed":
print("连接失败!")
elif pc.connectionState == "closed":
print("连接已关闭!")
pc.onconnectionstatechange = on_connection_state_change
# 保持连接活跃
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(run())
上面为推流,下面为拉流
# webrtc_client.py
import asyncio
import time
import wave
import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration
from aiortc.mediastreams import MediaStreamTrack
class AudioTrack(MediaStreamTrack):
kind = "audio"
def __init__(self):
super().__init__()
self.frames = []
async def recv(self):
frame = await super().recv()
return frame
async def save_audio_to_file(frames, filename):
print("---------------")
with wave.open(filename, 'wb') as wf:
wf.setnchannels(1) # Mono
wf.setsampwidth(2) # 16-bit PCM
wf.setframerate(44100) # Sample rate
print(frames.__sizeof__())
print(print(type(frames)))
for frame in frames:
wf.writeframes(frame.tobytes())
print(f"Saved {len(frames)} audio frames to {filename}.")
async def run_client():
# 创建一个 WebRTC 连接
rtc_conf = RTCConfiguration()
rtc_conf.iceServers = []
pc = RTCPeerConnection(rtc_conf)
audio_track = AudioTrack() # 创建音频轨道
pc.addTrack(audio_track) # 将音频轨道添加到连接中
wav_file = wave.open("output.wav", "wb")
wav_file.setnchannels(2)
wav_file.setsampwidth(2) # 16位音频
wav_file.setframerate(48000)
@pc.on("track")
async def on_track(track):
if track.kind == "audio":
print("Receiving audio track")
while True:
frame = await track.recv()
print(f"Received audio frame of size {len(frame.to_ndarray())}")
audio_track.frames.append(frame)
wav_file.writeframes(frame.to_ndarray())
start_time = time.time()
# 创建SDP Offer
offer = await pc.createOffer() # 创建 SDP Offer
print("本地生成的SDP:", "offer.sdp") # 打印本地 SDP
offer_end_time = time.time()
await pc.setLocalDescription(offer) # 设置本地描述
end_time = time.time()
# 计算各个阶段的运行时长
offer_duration = offer_end_time - start_time
set_local_description_duration = end_time - offer_end_time
total_duration = end_time - start_time
print(f"创建 Offer 耗时: {offer_duration} 秒")
print(f"设置本地描述耗时: {set_local_description_duration} 秒")
print(f"总运行时长: {total_duration} 秒")
async with aiohttp.ClientSession() as session:
async with session.post(
"whep地址",
data=offer.sdp.encode(), # 将 SDP 字符串编码为字节
headers={
"Content-Type": "application/sdp",
"Content-Length": str(len(offer.sdp))
},
ssl=False # 忽略 SSL 证书验证(不推荐在生产环境中使用)
) as response:
response_data = await response.text()
print("对方的SDP:", response_data)
await pc.setRemoteDescription(RTCSessionDescription(sdp=response_data, type='answer'))
while True:
await asyncio.sleep(0.1)
async def main():
"""主函数,启动客户端。"""
await run_client() # 启动 WebRTC 客户端
if __name__ == '__main__':
asyncio.run(main()) # 运行主函数
作者:眉梢i