Python RK3588 MPP 8路视频硬解码!全体目光向我看齐!
在进行图像相关的算法工作时,通常离不开拉取实时摄像头进行拉流工作。在python中,通常使用OPENCV完成解码工作,但OPENCV解码非常占用CPU资源,对于资源受限的边缘设备,占用大量CPU资源显然是我们不愿意看到的。
好在Rockchip官方针对自家的开发版设计了专门的硬件解码芯片,并通过mpp进行硬件解码。不过官方提供的是基于C++开发的示例,着实令没接触过C++的朋友们头疼。
因此,本文重写了官方demo、加入了Zlmediakit进行拉流并通过pybind11将代码封装成so库,现在,在python中直接通过import的方式,就能完成最多8路实时视频流的拉流解码操作!!!
目录
1. 结果演示
2. python 端 8 路拉流解码代码:
3. pybind11 封装 c++ 代码思路
4. 使用
1. 结果演示
output
上述结果演示是第一版,只保存了2路视频流的图片,8路视频流没有选择将图片保存到本地,而是采用计数的方式统计了10秒钟的时间内,从每个流中获得的数据数量。
2. python 端 8 路拉流解码代码:
import sys
import os
import cv2
import numpy as np
import time
import threading
# 使用 pybind11 编译得到的解码库
import mpp_decoder
rtsp_infos = {
"camera1": {"camera_id": 1,
"video_type": 264,
"rtsp_address": "rtsp://address_1/h264/ch1/main/av_stream"},
"camera2": {"camera_id": 2,
"video_type": 264,
"rtsp_address": "rtsp://address_2/h264/ch1/main/av_stream"},
"camera3": {"camera_id": 3,
"video_type": 264,
"rtsp_address": "rtsp://address_3/h264/ch1/main/av_stream"},
"camera4": {"camera_id": 4,
"video_type": 264,
"rtsp_address": "rtsp://address_4/h264/ch1/main/av_stream"},
"camera5": {"camera_id": 5,
"video_type": 264,
"rtsp_address": "rtsp://address_5/h264/ch1/main/av_stream"},
"camera6": {"camera_id": 6,
"video_type": 264,
"rtsp_address": "rtsp://address_6/h264/ch1/main/av_stream"},
"camera7": {"camera_id": 7,
"video_type": 264,
"rtsp_address": "rtsp://address_7/h264/ch1/main/av_stream"},
"camera8": {"camera_id": 8,
"video_type": 264,
"rtsp_address": "rtsp://address_8/h264/ch1/main/av_stream"}
}
# 记录每个 camera_id 的帧索引
frame_idx = {camera["camera_id"]: 0 for camera in rtsp_infos.values()}
mpp_decoder.start_processing_async(rtsp_infos)
stop_event = threading.Event() # 线程停止事件
def worker(camera_id, context):
while not stop_event.is_set(): # 运行10秒
frame = context.get_decoded_frame()
if frame.size > 0:
frame_idx[camera_id] += 1
else:
time.sleep(0.1)
if __name__ == "__main__":
threads = []
# 获取 camera对应的c++解码上下文信息
context_dict = mpp_decoder.get_all_camera_contexts()
for camera_id, context in context_dict.items():
t = threading.Thread(target=worker, args=(camera_id, context))
threads.append(t)
t.start()
time.sleep(10) # 主线程等待10秒
stop_event.set() # 设置停止事件
# 等待线程结束并打印结果
for t in threads:
t.join()
# 打印每个线程的获取帧数
for camera_info in rtsp_infos.values():
camera_id = camera_info["camera_id"]
print(f"Camera {camera_id} got {frame_idx[camera_id]} frames.")
通常视频流满帧为25帧,10秒下满帧应该是250帧,以下是8路解码统计结果,可以看出,8路视频基本都处于满帧状态:
3. pybind11 封装 c++ 代码思路
1. start_processing_async函数接收 rtsp 流地址以及流类型,为每个流地址启动一个线程进行拉流解码操作。
2. 调用 process_video_rtsp 函数使用 Zlmediakit 进行拉流操做
3. Zlmediakit 在监听事件on_track_frame_out进行解码操作:ctx->decoder->Decode()
4. 解码一帧数据后触发回调函数 mpp_decoder_frame_callback
5. 回调函数 mpp_decoder_frame_callback 负责取出数据、放入大小为30的缓冲区
6. pybind 通过 py::class_<rknn_zl_app_context_t>(m, "RKNNZLAppContext")
.def("get_decoded_frame", &rknn_zl_app_context_t::get_decoded_frame); 将上下文信息注册给python。
7. python 通过 get_all_camera_contexts 函数,获得流地址及其对应上下文信息,并通过 ctx.get_decoded_frame() 获得解码后的图片数据,格式为 RGB。
struct rknn_zl_app_context_t {
MppDecoder *decoder; // MPP解码器
mk_player player; // 播放器
mk_media media; // 媒体服务
uint64_t pts; // 显示时间戳
uint64_t dts; // 解码时间戳
std::queue<cv::Mat> frame_buffer; // 任务队列
std::mutex buffer_mutex; // 锁
rknn_zl_app_context_t() : decoder(nullptr), player(0), media(0), pusher(0), pts(0), dts(0) {}
py::array_t<uint8_t> get_decoded_frame()
{
std::lock_guard<std::mutex> lock(buffer_mutex);
if (frame_buffer.empty())
{
return py::array_t<uint8_t>();
}
cv::Mat frame_data = frame_buffer.front();
frame_buffer.pop();
// 将 cv::Mat 转换为 NumPy 数组
py::array_t<uint8_t> result = py::array_t<uint8_t>(
{frame_data.rows, frame_data.cols, 3},
{frame_data.cols * 3, 3, 1},
frame_data.data
);
return result;
}
};
std::map<int, rknn_zl_app_context_t*> camera_context_map;
std::mutex context_map_mutex;
rknn_zl_app_context_t* get_context_by_camera_id(int camera_id)
{
std::lock_guard<std::mutex> lock(context_map_mutex);
auto it = camera_context_map.find(camera_id);
if (it != camera_context_map.end())
{
return it->second;
}
return nullptr;
}
py::dict get_all_camera_contexts() {
std::lock_guard<std::mutex> lock(context_map_mutex);
py::dict context_dict;
for (const auto& entry : camera_context_map) {
int camera_id = entry.first;
rknn_zl_app_context_t* ctx = entry.second;
// 将上下文信息存入字典
context_dict[py::cast(camera_id)] = py::cast(ctx);
}
return context_dict;
}
void mpp_decoder_frame_callback(void *userdata, int width_stride, int height_stride, int width, int height, int format, int fd, void *data, int camera_id)
{
rknn_zl_app_context_t *ctx = get_context_by_camera_id(camera_id);
if (ctx == nullptr)
{
return;
}
rga_buffer_t origin;
// 创建 YUV420SP 格式的缓冲区
origin = wrapbuffer_fd(fd, width, height, RK_FORMAT_YCbCr_420_SP, width_stride, height_stride);
cv::Mat origin_mat = cv::Mat::zeros(height, width, CV_8UC3);
rga_buffer_t rgb_img = wrapbuffer_virtualaddr((void *)origin_mat.data, width, height, RK_FORMAT_RGB_888);
imcopy(origin, rgb_img);
{
std::lock_guard<std::mutex> lock(ctx->buffer_mutex);
if (ctx->frame_buffer.size() >= 30)
{
ctx->frame_buffer.pop(); // 丢弃旧帧
}
ctx->frame_buffer.emplace(origin_mat); // 存储新帧
}
}
void API_CALL on_track_frame_out(void *user_data, mk_frame frame) {
rknn_zl_app_context_t *ctx = (rknn_zl_app_context_t *) user_data;
// printf("on_track_frame_out ctx=%p\n", ctx);
const char *data = mk_frame_get_data(frame); // 获取帧数据指针
ctx->dts = mk_frame_get_dts(frame); // 获取解码时间戳,单位毫秒
ctx->pts = mk_frame_get_pts(frame); // 获取显示时间戳,单位毫秒
size_t size = mk_frame_get_data_size(frame); // 获取帧数据指针长度
// printf("decoder=%p\n", ctx->decoder);
ctx->decoder->Decode((uint8_t *)data, size, 0); // 调用decoder解码器进行解码
}
void API_CALL on_mk_play_event_func(void *user_data, int err_code, const char *err_msg, mk_track tracks[], int track_count)
{
rknn_zl_app_context_t *ctx = (rknn_zl_app_context_t *) user_data; // 将 user_data 转换成 Zlmediakit Context 结构体指针
if (err_code == 0)
{
log_debug("play success!");
int i;
for (i = 0; i < track_count; ++i)
{
if (mk_track_is_video(tracks[i]))
{
mk_track_add_delegate(tracks[i], on_track_frame_out, user_data);
}
}
}
else
{
log_warn("play failed: %d %s", err_code, err_msg);
}
}
int process_video_rtsp(rknn_zl_app_context_t *ctx, const char *url)
{
mk_config config; // zlmediakit config配置,默认即可
memset(&config, 0, sizeof(mk_config));
config.log_mask = LOG_CONSOLE; // 设置日志输出的路径
mk_env_init(&config); // 使用 config 配置初始化 zlmediakit 环境
mk_player player = mk_player_create();
//设置播放器正常播放/播放中断的回调函数
mk_player_set_on_result(player, on_mk_play_event_func, ctx);
mk_player_set_on_shutdown(player, on_mk_play_event_func, ctx);
// 开始播放 url
mk_player_play(player, url);
while(true)
{
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
void start_processing_async(const py::dict& rtsp_streams)
{
for (const auto& camera : rtsp_streams) // 使用 items() 遍历 JSON 对象
{
// 提取每个摄像头的 RTSP 信息
py::dict rtsp_info = camera.second.cast<py::dict>();
int camera_id = rtsp_info["camera_id"].cast<int>();
std::string rtsp_address = rtsp_info["rtsp_address"].cast<std::string>();
int video_type = rtsp_info["video_type"].cast<int>();
// 创建新线程处理每个 RTSP 流
std::thread([camera_id, rtsp_address, video_type]()
{
rknn_zl_app_context_t app_ctx;
std::cerr << "init size: " << app_ctx.frame_buffer.size() << std::endl;
if (app_ctx.decoder == nullptr)
{
MppDecoder* decoder = new MppDecoder();
decoder->Init(video_type, 30, &app_ctx, camera_id);
decoder->SetCallback(mpp_decoder_frame_callback);
app_ctx.decoder = decoder;
}
{
std::lock_guard<std::mutex> lock(context_map_mutex);
camera_context_map[camera_id] = &app_ctx;
}
// 处理 RTSP 流
process_video_rtsp(&app_ctx, rtsp_address.c_str());
if (app_ctx.decoder != nullptr)
{
delete app_ctx.decoder;
app_ctx.decoder = nullptr;
}
}).detach(); // 分离线程,让它在后台执行
// 防止线程冲突,延迟一下
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
PYBIND11_MODULE(mpp_decoder, m)
{
// 注册 rknn_zl_app_context_t 类型
py::class_<rknn_zl_app_context_t>(m, "RKNNZLAppContext")
.def("get_decoded_frame", &rknn_zl_app_context_t::get_decoded_frame);
// 绑定 start_processing_async 函数
m.def("start_processing_async", [](py::dict rtsp_infos){
start_processing_async(rtsp_infos);
}, "Start processing RTSP stream with specified URL and video type");
// 绑定 get_decoded_frame 函数
m.def("get_all_camera_contexts", &get_all_camera_contexts, "Get the latest decoded frame");
}
4. 使用
上述所有代码都运行在docker容器中,在docker容器中只需要export 一下so库的路径即可。
export PYTHONPATH=/mnt/mpp_decoder/build:$PYTHONPATH
作者:不想起名字呢