使用Python FastAPI和WebSocket实现流式调用Minimax算法处理大模型
python + fastapi + websocket 流式调用minimax 大模型
基于fastapi,用websocket流式接收音频,经过语音识别后调用minimax大模型,再将大模型的流式返回进行音频合成后流式返回
前言
第一次写流式接口,真的是坑都踩了一圈
本文仅提供代码思路,文中的代码不完整,无法直接复制
本文仅提供代码思路,文中的代码不完整,无法直接复制
本文仅提供代码思路,文中的代码不完整,无法直接复制
websocket介绍
WebSocket 是一种网络通信协议,提供了浏览器和服务器之间的全双工通信能力。与传统的HTTP请求不同,WebSocket 协议允许数据在客户端和服务器之间进行实时双向传输,而无需频繁地建立新的连接。这种协议特别适合需要实时数据交换的应用场景,比如在线游戏、实时交易平台、聊天应用等。
说人话就是socket的web版本,两边建立了websocket连接之后,就可以全双工通信
在fastapi中编写websocket接口
在fastapi中,可以使用 @app.websocket("/url")
来创建websocket接口,
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/url")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() #接收websocket连接
......
此时,当客户端程序调用url,ws://ip:port/url
就可以调用了
流式接收音频并进行本地语音识别
发送格式
在接收音频之前需要先和发送方协商好发送格式,这是我定义的发送格式
{
text: Optional[str] = None #文本输入,若输入文本则默认非音频输入
audio: Optional[str] = None # 默认流式输入 base64
meta_info : {
stream: [bool] = True # 是否流式输出
voice_synthesize: [bool] = False #是否在大模型返回后进行音频合成
is_end: [bool] #是否结束,即音频发送是否完成
encoding: [str] = ['raw', ...] #编码格式,raw表示base64,可以在这里定义一些压缩格式
}
流式接收并进行本地语音识别
这边我们先初始化我们的asr(auto speech recognizer),我用的是开源的一个项目,然后while true开始循环接受消息,若文本输入不为空,则表示该输入为文本输入,我们将文本信息读取出来之后就直接退出循环
如果接受的是音频输入,则通过"is_end"字段判断是否是最后一帧,用asr进行语音识别后添加到current_message中,若是最后一帧则asr识别并添加之后,再记录一下后续返回类型就退出
response_type = RESPONSE_TEXT
asr_start_time = time.perf_counter()
if config["main"]["asr"] == LOCAL_ASR: #使用本地ASR
asr = FunAutoSpeechRecognizer() #替换成你自己的ASR
try:
while True:
data_json = json.loads(await ws.receive_text())
if data_json["text"]: #若文字输入不为空,则表示该输入为文字输入
if data_json["meta_info"]["voice_synthesize"]:
response_type = RESPONSE_AUDIO #查看voice_synthesize判断返回类型
current_message = data_json['text']
break
if not data_json['meta_info']['is_end']: #还在发
asr_result = asr.streaming_recognize(data_json["audio"])
current_message += ''.join(asr_result['text'])
else: #发完了
asr_result = asr.streaming_recognize(data_json["audio"],is_end=True)
session_id = data_json["meta_info"]["session_id"]
current_message += ''.join(asr_result['text'])
if data_json["meta_info"]["voice_synthesize"]:
response_type = RESPONSE_AUDIO #查看voice_synthesize判断返回类型
break
except Exception as e:
error_info = f"接收用户消息错误: {str(e)}"
error_message = {"type":"error","code":"500","msg":error_info}
logger.error(error_info)
await ws.send_text(json.dumps(error_message,ensure_ascii=False))
await ws.close()
return
识别后的文本被放在current_messgae中
流式接收并调用讯飞流式接口进行语音识别
讯飞流式语音识别接口
和本地比起来,调用接口用时不稳定,我调用讯飞的接口用时基本上在3,4秒左右,但是本地的话稳定能控制在1秒钟之内。
采取异步的方式,一边流式接受音频,一边流式将音频传给讯飞
usr_chat_recv()
是异步接受方法
user_chat_send()
是异步发送方法
在user_chat_send()
里面调用了讯飞的接口,解释一下就是先获取鉴权url,然后定义两个回调函数,再与讯飞开启websocket连接并使用这两个回调函数,这四个回调函数分别是on_open,在websocket连接建立的时候调用,在on_open里面发送音频数据,on_message,收到讯飞识别后的流式返回,在这个里面获取识别结果
这一版代码是根据讯飞的demo调出来的, 若是要用于生产环境,还需要调整
current_message = "" #用于存储用户消息
response_type = RESPONSE_TEXT #用于获取返回类型
session_id = ""
q_recv = queue.Queue() #一个函数往队列中存,一个函数同时从队列中取
chat_type = CHAT_UNCERTAIN #判断一下是语音还是文本信息
if config["main"]["asr"] == REMOTE_ASR:
logger.info("开始调用讯飞接口")
async def usr_chat_recv(): #定义函数用于接受流式输入,并存入队列
nonlocal current_message
nonlocal chat_type
nonlocal session_id
nonlocal response_type
try:
while True:
data_json = await ws.receive_json()
q_recv.put(data_json)
if data_json["text"]: #如果是文本则一轮直接退出
if data_json["meta_info"]["voice_synthesize"]:
response_type = RESPONSE_AUDIO
chat_type = CHAT_TEXT
current_message = data_json['text']
session_id = data_json["meta_info"]["session_id"]
break
else:
chat_type = CHAT_AUDIO
if data_json['meta_info']["is_end"]: #收到结束标志,退出
if data_json["meta_info"]["voice_synthesize"]:
response_type = RESPONSE_AUDIO
session_id = data_json["meta_info"]["session_id"]
break
except Exception as e:
error_message = {"type":"error","code":500,"msg":f"error occur when receiving data from front: {str(e)}"}
print(error_message)
ws.send_text(json.dumps(error_message))
async def user_chat_send():
url = generate_xf_satt_url()
def on_open(xfws): #定义on_open回调函数,在websocket建立时触发
def run(*args):
interval = 0.04
status = FIRST_FRAME
while True:
data_json = q_recv.get()
if data_json["meta_info"]["is_end"]: #收到结束标志位
status = LAST_FRAME
if status == FIRST_FRAME: #第一帧要带上common和business
d = {"common": {"app_id": config['xfapi']['APPID']},
"business": {"domain": config['satt']['domain'], "language": config['satt']['language'],"accent": config['satt']['accent'], "vad_eos": config['satt']['vad_eos']},
"data": {"status": 0, "format": "audio/L16;rate=16000",
"audio": data_json["audio"],
"encoding": "raw"}}
d = json.dumps(d)
xfws.send(d)
status = CONTINUE_FRAME
elif status == CONTINUE_FRAME:
d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
"audio": data_json["audio"],
"encoding": "raw"}}
xfws.send(json.dumps(d))
elif status == LAST_FRAME:
d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
"audio": data_json["audio"],
"encoding": "raw"}}
xfws.send(json.dumps(d))
time.sleep(0.05)
break;
time.sleep(interval)
xfws.close()
thread.start_new_thread(run,())
def on_message(xfws,message):
try:
nonlocal current_message
code = json.loads(message)["code"]
sid = json.loads(message)["sid"]
if code != 0:
errMsg = json.loads(message)["message"]
print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
else:
data = json.loads(message)["data"]["result"]["ws"]
# print(json.loads(message))
result = ""
for i in data:
for w in i["cw"]:
result += w["w"]
current_message += result #将讯飞接口返回保存
except Exception as e:
print("receive msg,but parse exception:", e)
websocket.enableTrace(False)
xfws = websocket.WebSocketApp(url,on_message=on_message)
xfws.on_open=on_open
xfws.run_forever()
await usr_chat_recv()
if chat_type==CHAT_AUDIO:
await user_chat_send()
调用minimax大模型
这里用的是minimax的ChatCompletion v2
minimaxAPI文档
定义pyload和header后创建request,在payload中将stream设置为True,在request中也把stream设置为True,使得可以获取大模型的流式返回
try:
http_send_start_time = time.perf_counter()
payload = json.dumps({
"model":"abab5.5-chat",
"stream":True,
"messages": messages,
"tool_choice":"auto",
"max_tokens":10000,
"temperature":0.9,
"top_p":1
})
headers={
'Authorization':f"Bearer {config['llm']['API_KEY']}",
'Content-Type':'application/json'
}
response = requests.request("POST",config["llm"]["url"],headers=headers,data=payload,stream=True)
http_send_end_time = time.perf_counter()
except Exception as e:
error_info = f"发送信息给大模型时发生错误: {str(e)}"
error_message ={"type":"error","code":500,"msg":error_info}
logger.error(error_info)
await ws.send_text(json.dumps(error_message,ensure_ascii=False))
await ws.close()
return
获取大模型流式返回并进行语音合成
首先定义一个split_string_with_punctuation(text)
函数,用于把大模型的返回根据标点符号拆分,这样可以让语音合成之后的音频更加自然,不然的话想想一下一句话“我们今天出去玩吧。”,大模型分两次返回给你,“我们今天”,“出去玩吧”,分成两次音频合成的语调就没有一次那么自然了
然后定义一个函数用于解析minimax返回后的chunk
利用response的迭代器(for chunk in response.iter_lines():
)来处理流式返回的chunk,对于每一个chunk我们都先对其进行解析后读出文本数据,在根据标点拆分后进行音频合成,最后利用await ws.send_text(json.dumps(text_response,ensure_ascii=False))
返回二进制流数据,await ws.send_bytes(audio)
返回文本数据
此处的tts
是我本地的vits,网上也有不少开源库,需自行配置
def split_string_with_punctuation(text):
punctuations = ",!?。"
result = []
current_sentence = ""
for char in text:
current_sentence += char
if char in punctuations:
result.append(current_sentence)
current_sentence = ""
# 判断最后一个字符是否为标点符号
if current_sentence and current_sentence[-1] not in punctuations:
# 如果最后一段不以标点符号结尾,则加入拆分数组
result.append(current_sentence)
return result
llm_response = ""
response_buf = ""
def parseChunkDelta(chunk) :
decoded_data = chunk.decode('utf-8')
parsed_data = json.loads(decoded_data[6:])
if 'delta' in parsed_data['choices'][0]:
delta_content = parsed_data['choices'][0]['delta']
return delta_content['content']
else:
return ""
try:
if config["main"]['tts'] == LOCAL_TTS:
if response.status_code == 200:
for chunk in response.iter_lines():
if chunk:
if response_type == RESPONSE_AUDIO: #若返回类型是音频则需要语音合成
chunk_data = parseChunkDelta(chunk)
llm_response += chunk_data
response_buf += chunk_data
split_buf = split_string_with_punctuation(response_buf)
response_buf = ""
if len(split_buf) != 0:
for sentence in split_buf:
sr,audio = tts.synthesize(sentence,0,103,0.1,0.668,1.2,return_bytes=True)
text_response = {"type":"text","code":200,"msg":sentence}
await ws.send_text(json.dumps(text_response,ensure_ascii=False)) #返回文本数据
await ws.send_bytes(audio) #返回二进制流数据
if response_type == RESPONSE_TEXT:
chunk_data = parseChunkDelta(chunk)
llm_response += chunk_data
text_response = {"type":"text","code":200,"msg":chunk_data}
await ws.send_text(json.dumps(text_response,ensure_ascii=False))
elif config["main"]['tts'] == REMOTE_TTS:
error_info = f"暂不支持远程音频合成"
error_message = {"type":"error","code":500,"msg":error_info}
logger.error(error_info)
await ws.send_text(json.dumps(error_message,ensure_ascii=False))
await ws.close()
return
logger.info(f"llm消息: {llm_response}")
except Exception as e:
error_info = f"音频合成与向前端返回时错误: {str(e)}"
error_message = {"type":"error","code":500,"msg":error_info}
logger.error(error_info)
await ws.send_text(json.dumps(error_message,ensure_ascii=False))
await ws.close()
return
receive_stt_end_time = time.perf_counter()
总结
本文利用利用fastapi接受websocket请求,接收客户端发送的流式音频数据,在本地进行语音识别或使用讯飞接口进行识别,再将识别出的文字发送给minimax大模型,接收minimax的流式返回并进行音频合成,最终把音频以及文本流式返回给客户端。
附录
附上一个客户端代码
import asyncio
import websockets
import json
import base64
from datetime import datetime
#你的音频文件
pcm_file_path = 'example_recording.wav'
def read_pcm_file_in_chunks(chunk_size): #将音频文件切割成一个一个chunk
with open(pcm_file_path, 'rb') as pcm_file:
while True:
data = pcm_file.read(chunk_size)
if not data:
break
yield data
data = {
"text": "",
"audio": "",
"meta_info": {
"session_id":"7d0546dd-36b6-4bc2-8008-fc77c78aaa14",
"stream": False,
"voice_synthesize": True,
"is_end": False,
"encoding": "raw"
}
}
async def send_audio_chunk(websocket, chunk):
# 将PCM数据进行Base64编码
encoded_data = base64.b64encode(chunk).decode('utf-8')
# 更新data字典中的"audio"键的值为Base64编码后的音频数据
data["audio"] = encoded_data
# 将JSON数据对象转换为JSON字符串
message = json.dumps(data)
# 发送JSON字符串到WebSocket接口
await websocket.send(message)
async def send_json():
async with websockets.connect('ws://ip:port/url') as websocket:
chunks = read_pcm_file_in_chunks(2048) # 读取PCM文件并生成数据块
for chunk in chunks:
await send_audio_chunk(websocket, chunk)
await asyncio.sleep(0.01) # 等待0.04秒
# 设置data字典中的"is_end"键为True,表示音频流结束
data["meta_info"]["is_end"] = True
# 发送最后一个数据块和流结束信号
await send_audio_chunk(websocket, b'') # 发送空数据块表示结束
# 等待并打印接收到的数据
print("等待接收:",datetime.now())
audio_bytes = b''
while True:
data_ws = await websocket.recv()
try:
message_json = json.loads(data_ws)
print(message_json) # 打印接收到的消息
if message_json["type"] == "close":
break # 如果没有接收到消息,则退出循环
except Exception as e:
audio_bytes += data_ws
print(e)
print("接收完毕:", datetime.now())
# 在此处播放二进制流数据,这里的注释的代码只是示意,用不了
# player = AudioPlayer(RATE=22050)
# player.play(audio_bytes)
await asyncio.sleep(0.04) # 等待0.04秒后断开连接
await websocket.close()
# 启动事件循环
try:
asyncio.run(send_json())
except websockets.exceptions.ConnectionClosedOK:
print("成功")
作者:Killua4396