Python中PaddleOCR识别速度与多线程运行效率探讨
PaddleOCR文字识别精度比较高,但在python中使用时无法进行多线程并发调用,而且对于大图片在识别速度上还偏慢,为解决该问题,从以下几个方面入手(基本实现了并发调用,大图片识别提高4倍以上):
1.多线程问题,根据对开源代码的分析,飞浆的识别过程主要是三个步骤,cls用于文字方向调整,det用于文字分割,rec用于文字识别也是核心代码,对rec单独进行多线程调用发现速度并没有明显提高。
2.针对上一个问题,对代码进行再次分解,发现主要有两方面的工作,第一步是对传入的图像进行张量的统一,这部分主要使用cpu计算,而cpu计算在多线程调用时并不能实现并发加速,所以选用多进程,问题解决,可并发调用。
3.大图片的识别速度在多线程调用的基础上再次使用多线程调用,充分发挥cpu和gpu的性能,可实现加速。
具体代码如下(个人自用,已稳定运行一个月,可更好用于服务器部署,代码比较乱,但可以运行,有空再修改):
首先定义进程数以及导库
import concurrent.futures
import threading
import paddle
from flask import Flask, request
import base64
import copy
import math
import queue
import time
import cv2
import numpy as np
import pyclipper
from paddle.inference import Config
from paddle.inference import create_predictor
from paddle.inference import PrecisionType
from shapely import Polygon
import re
#定义进程数,根据自己cpu核心数选择,建议不超过核心数的3/4
executor = concurrent.futures.ProcessPoolExecutor(max_workers=10)
然后定义rec和cls模型,这里没有研究cls模型,可同理进行
'''det模型用于图像分割,也就是识别文字的位置'''
class TextDetognizer(object):
def __init__(self):
self.det_predictor = None
#路径可以去飞浆开源的地址直接下载然后自定义
config = Config('whl/det/ch/ch_PP-OCRv4_det_infer/inference.pdmodel',
'whl/det/ch/ch_PP-OCRv4_det_infer/inference.pdiparams')
#不用gpu则把下面的注释掉
config.enable_use_gpu(20, 0)
config.disable_glog_info()
config.glog_info_disabled()
config.switch_use_feed_fetch_ops(False)
config.switch_specify_input_names(True)
config.enable_memory_optim() # 打开显存优化
config.enable_mkldnn()
self.det_predictor = create_predictor(config)
def run(self, img_list):
# det_predictor=self.det_predictor
input_names = self.det_predictor.get_input_names() # 获取输入tensor的name
for i, name in enumerate(input_names): # 对每个输入进行设置
input_tensor = self.det_predictor.get_input_handle(input_names[i])
input_tensor.reshape(img_list[i].shape)
input_tensor.copy_from_cpu(img_list[i].copy())
self.det_predictor.run() # 推理
results = []
lod_info = []
output_names = self.det_predictor.get_output_names()
# 获取所有的输出并放入results中
for i, name in enumerate(output_names):
output_tensor = self.det_predictor.get_output_handle(output_names[i])
output_data = output_tensor.copy_to_cpu()
results.append(output_data)
lod = output_tensor.lod()
if len(lod) > 0:
lod_info.append(lod[0])
return results, lod_info
def predict(self, img):
st = time.time()
det_img, ratio_list = det_preprocess(img)
ori_im = img.copy()
if img is None:
return None, 0
img = np.expand_dims(img, axis=0)
img = img.copy()
det_predictor = self.det_model_pool.acquire()
input_names = det_predictor.get_input_names()
for i, name in enumerate(input_names):
input_tensor = det_predictor.get_input_handle(input_names[i])
input_tensor.reshape(ori_im.shape)
input_tensor.copy_from_cpu(ori_im.copy())
det_predictor.run()
outputs = []
output_names = det_predictor.get_output_names()
# 获取所有的输出并放入results中
for i, name in enumerate(output_names):
output_tensor = det_predictor.get_output_handle(output_names[i])
output_data = output_tensor.copy_to_cpu()
self.det_model_pool.release(det_predictor)
preds = {}
preds['maps'] = outputs[0]
postprocess_params = {}
postprocess_params["thresh"] = 0.3
postprocess_params["box_thresh"] = 0.5
postprocess_params["max_candidates"] = 1000
postprocess_params["unclip_ratio"] = 2.0
postprocess_op = DBPostProcess(postprocess_params)
post_result = postprocess_op(preds, ratio_list)
dt_boxes = post_result[0]['points']
img_height, img_width = ori_im.shape[0:2]
dt_boxes_new = []
for box in dt_boxes:
if type(box) is list:
box = np.array(box)
rect = np.zeros((4, 2), dtype="float32")
s = box.sum(axis=1)
rect[0] = box[np.argmin(s)]
rect[2] = box[np.argmax(s)]
tmp = np.delete(box, (np.argmin(s), np.argmax(s)), axis=0)
diff = np.diff(np.array(tmp), axis=1)
rect[1] = tmp[np.argmin(diff)]
rect[3] = tmp[np.argmax(diff)]
box = rect
for pno in range(box.shape[0]):
box[pno, 0] = int(min(max(box[pno, 0], 0), img_width - 1))
box[pno, 1] = int(min(max(box[pno, 1], 0), img_height - 1))
rect_width = int(np.linalg.norm(box[0] - box[1]))
rect_height = int(np.linalg.norm(box[0] - box[3]))
if rect_width <= 3 or rect_height <= 3:
continue
dt_boxes_new.append(box)
dt_boxes = np.array(dt_boxes_new)
et = time.time()
return dt_boxes, et - st
def text_detector(self, img):
MIN_BOUND_DISTANCE = 50
det_limit_side_len = 960
dt_boxes = np.zeros((0, 4, 2), dtype=np.float32)
elapse = 0
if img.shape[0] / img.shape[1] > 2 and img.shape[0] > det_limit_side_len:
start_h = 0
end_h = 0
while end_h <= img.shape[0]:
end_h = start_h + img.shape[1] * 3 // 4
subimg = img[start_h: end_h, :]
if len(subimg) == 0:
break
sub_dt_boxes, sub_elapse = self.predict(subimg)
offset = start_h
# To prevent text blocks from being cut off, roll back a certain buffer area.
if len(sub_dt_boxes) == 0 or img.shape[1] - max([x[-1][1] for x in sub_dt_boxes]) > MIN_BOUND_DISTANCE:
start_h = end_h
else:
sorted_indices = np.argsort(sub_dt_boxes[:, 2, 1])
sub_dt_boxes = sub_dt_boxes[sorted_indices]
bottom_line = 0 if len(sub_dt_boxes) <= 1 else int(np.max(sub_dt_boxes[:-1, 2, 1]))
if bottom_line > 0:
start_h += bottom_line
sub_dt_boxes = sub_dt_boxes[sub_dt_boxes[:, 2, 1] <= bottom_line]
else:
start_h = end_h
if len(sub_dt_boxes) > 0:
if dt_boxes.shape[0] == 0:
dt_boxes = sub_dt_boxes + np.array([0, offset], dtype=np.float32)
else:
dt_boxes = np.append(dt_boxes,
sub_dt_boxes + np.array([0, offset], dtype=np.float32),
axis=0)
elapse += sub_elapse
elif img.shape[1] / img.shape[0] > 3 and img.shape[1] > det_limit_side_len * 3:
start_w = 0
end_w = 0
while end_w <= img.shape[1]:
end_w = start_w + img.shape[0] * 3 // 4
subimg = img[:, start_w: end_w]
if len(subimg) == 0:
break
sub_dt_boxes, sub_elapse = self.predict(subimg)
offset = start_w
if len(sub_dt_boxes) == 0 or img.shape[0] - max([x[-1][0] for x in sub_dt_boxes]) > MIN_BOUND_DISTANCE:
start_w = end_w
else:
sorted_indices = np.argsort(sub_dt_boxes[:, 2, 0])
sub_dt_boxes = sub_dt_boxes[sorted_indices]
right_line = 0 if len(sub_dt_boxes) <= 1 else int(np.max(sub_dt_boxes[:-1, 1, 0]))
if right_line > 0:
start_w += right_line
sub_dt_boxes = sub_dt_boxes[sub_dt_boxes[:, 1, 0] <= right_line]
else:
start_w = end_w
if len(sub_dt_boxes) > 0:
if dt_boxes.shape[0] == 0:
dt_boxes = sub_dt_boxes + np.array([offset, 0], dtype=np.float32)
else:
dt_boxes = np.append(dt_boxes,
sub_dt_boxes + np.array([offset, 0], dtype=np.float32),
axis=0)
elapse += sub_elapse
else:
dt_boxes, elapse = self.predict(img)
return dt_boxes, elapse
def __call__(self, img, cls=True):
time_dict = {'det': 0, 'rec': 0, 'cls': 0, 'all': 0}
if img is None:
print("no valid image provided")
return None, None, time_dict
start = time.time()
ori_im = img.copy()
dt_boxes, elapse = self.text_detector(img)
time_dict['det'] = elapse
if dt_boxes is None:
print("no dt_boxes found, elapsed : {}".format(elapse))
end = time.time()
time_dict['all'] = end - start
return None, None, time_dict
else:
print("dt_boxes num : {}, elapsed : {}".format(
len(dt_boxes), elapse))
img_crop_list = []
dt_boxes = sorted_boxes(dt_boxes)
for bno in range(len(dt_boxes)):
tmp_box = copy.deepcopy(dt_boxes[bno])
img_crop = get_rotate_crop_image(ori_im, tmp_box)
img_crop_list.append(img_crop)
return img_crop_list
#定义rec模型,主要用于文字识别,rec的模型加载采用了模型池,可以防止多线程或多进程调用时发生识别内容的混肴
class TextRecognizers(object):
def __init__(self):
self.rec_image_shape = [int(v) for v in [3, 48, 320]]
self.rec_batch_num = 6
self.rec_algorithm = 'SVTR_LCNet'
self.benchmark = False
self.use_onnx = False
self.return_word_box = True
config = Config('whl/rec/ch/ch_PP-OCRv4_rec_infer/inference.pdmodel',
'whl/rec/ch/ch_PP-OCRv4_rec_infer/inference.pdiparams')
config.enable_use_gpu(20, 0)
config.disable_glog_info()
config.glog_info_disabled()
config.switch_use_feed_fetch_ops(False)
config.switch_specify_input_names(True)
config.enable_memory_optim() # 打开显存优化
config.enable_mkldnn()
tensorkai = True
if tensorkai:
config.enable_tensorrt_engine(
workspace_size=1 << 30,
precision_mode=PrecisionType.Float32,
max_batch_size=10,
min_subgraph_size=15, # skip the minmum trt subgraph
use_calib_mode=False)
self.rec_model_pool = self.recModelPool(5)
self.postprocess_op = CTCLabelDecode('ppocr_keys_v1.txt', True)
class recModelPool:
def __init__(self, size):
self.max_size = size
self.pool = queue.Queue(size)
self.jiazai()
def jiazai(self):
n = 0
for _ in range(self.max_size):
config = Config('whl/rec/ch/ch_PP-OCRv4_rec_infer/inference.pdmodel',
'whl/rec/ch/ch_PP-OCRv4_rec_infer/inference.pdiparams')
config.enable_use_gpu(200, 0)
config.disable_glog_info()
config.glog_info_disabled()
config.switch_use_feed_fetch_ops(False)
config.switch_specify_input_names(True)
config.enable_memory_optim() # 打开显存优化
config.enable_mkldnn()
tensorkai = False
if tensorkai:
config.enable_tensorrt_engine(
workspace_size=1 << 30,
precision_mode=PrecisionType.Float32,
max_batch_size=10,
min_subgraph_size=15, # skip the minmum trt subgraph
use_calib_mode=False)
predictor = create_predictor(config)
self.pool.put(predictor)
n += 1
print('已加载{n}个rec模型'.format(n=n))
def acquire(self):
try:
rec_predictor = self.pool.get(block=False)
except queue.Empty:
print("没有空闲模型可用,正在等待...")
rec_predictor = self.pool.get() # 阻塞直到有模型可用
return rec_predictor
def release(self, rec_predictor):
self.pool.put(rec_predictor)
def perdict(self,img_list,beg_img_no,batch_num,indices,zuobiaolist,rec_res):
#注释的两行用于rec识别内部的多进程调用,可极大提高大图片的识别速度,但是对cpu要求较高,如果使用则把后面的两行注释掉
#future=executor.submit(recfen,img_list,beg_img_no,batch_num,self.rec_image_shape,indices)
#norm_img_batch,wh_ratio_list,max_wh_ratio=future.result()
norm_img_batch, wh_ratio_list, max_wh_ratio=recfen(img_list,beg_img_no,batch_num,self.rec_image_shape,indices)
norm_img_batch=norm_img_batch.copy()
rec_predictor = self.rec_model_pool.acquire()
try:
input_names = rec_predictor.get_input_names()
input_tensor = []
for name in input_names:
input_tensor = rec_predictor.get_input_handle(name)
input_tensor.copy_from_cpu(norm_img_batch)
rec_predictor.run()
output_names = rec_predictor.get_output_names()
output_tensors = []
for output_name in output_names:
output_tensor = rec_predictor.get_output_handle(output_name)
output_tensors.append(output_tensor)
outputs = []
for output_tensor in output_tensors:
output = output_tensor.copy_to_cpu()
outputs.append(output)
if len(outputs) != 1:
preds = outputs
else:
preds = outputs[0]
rec_result = self.postprocess_op(
preds,
return_word_box=self.return_word_box,
wh_ratio_list=wh_ratio_list,
max_wh_ratio=max_wh_ratio)
for rno in range(len(rec_result)):
if zuobiaolist != None:
zuobiao = zuobiaolist[indices[beg_img_no + rno]]
b = [zuobiao.astype(int).tolist(), rec_result[rno]]
#print(zuobiaolist,rno,rec_result)
else:
b = [rec_result[rno]]
rec_res[indices[beg_img_no + rno]] = b
finally:
self.rec_model_pool.release(rec_predictor)
def __call__(self, img_list, zuobiaolist=None):
img_num = len(img_list)
# Calculate the aspect ratio of all text bars
width_list = []
for img in img_list:
width_list.append(img.shape[1] / float(img.shape[0]))
# Sorting can speed up the recognition process
indices = np.argsort(np.array(width_list))
rec_res = [['', 0.0]] * img_num
batch_num = self.rec_batch_num
st = time.time()
tlist=[]
for beg_img_no in range(0, img_num, batch_num):
#future=executor.submit(recfen,img_list,beg_img_no,batch_num,self.rec_image_shape,indices)
#norm_img_batch,wh_ratio_list,max_wh_ratio=future.result()
#norm_img_batch, wh_ratio_list, max_wh_ratio=recfen(img_list,beg_img_no,batch_num,self.rec_image_shape,indices)
t=threading.Thread(target=self.perdict, args=(img_list,beg_img_no,batch_num,indices,zuobiaolist,rec_res))
tlist.append(t)
t.start()
for t in tlist:
t.join()
return rec_res, time.time() - st
下面是从源代码中提取出来的rec和det需要使用的函数和模型
class BaseRecLabelDecode(object):
""" Convert between text-label and text-index """
def __init__(self, character_dict_path=None, use_space_char=False):
self.beg_str = "sos"
self.end_str = "eos"
self.reverse = False
self.character_str = []
if character_dict_path is None:
self.character_str = "0123456789abcdefghijklmnopqrstuvwxyz"
dict_character = list(self.character_str)
else:
with open(character_dict_path, "rb") as fin:
lines = fin.readlines()
for line in lines:
line = line.decode('utf-8').strip("\n").strip("\r\n")
self.character_str.append(line)
if use_space_char:
self.character_str.append(" ")
dict_character = list(self.character_str)
if 'arabic' in character_dict_path:
self.reverse = True
dict_character = self.add_special_char(dict_character)
self.dict = {}
for i, char in enumerate(dict_character):
self.dict[char] = i
self.character = dict_character
def pred_reverse(self, pred):
pred_re = []
c_current = ''
for c in pred:
if not bool(re.search('[a-zA-Z0-9 :*./%+-]', c)):
if c_current != '':
pred_re.append(c_current)
pred_re.append(c)
c_current = ''
else:
c_current += c
if c_current != '':
pred_re.append(c_current)
return ''.join(pred_re[::-1])
def add_special_char(self, dict_character):
return dict_character
def decode(self, text_index, text_prob=None, is_remove_duplicate=False):
""" convert text-index into text-label. """
result_list = []
ignored_tokens = self.get_ignored_tokens()
batch_size = len(text_index)
for batch_idx in range(batch_size):
selection = np.ones(len(text_index[batch_idx]), dtype=bool)
if is_remove_duplicate:
selection[1:] = text_index[batch_idx][1:] != text_index[
batch_idx][:-1]
for ignored_token in ignored_tokens:
selection &= text_index[batch_idx] != ignored_token
char_list = [
self.character[text_id]
for text_id in text_index[batch_idx][selection]
]
if text_prob is not None:
conf_list = text_prob[batch_idx][selection]
else:
conf_list = [1] * len(selection)
if len(conf_list) == 0:
conf_list = [0]
text = ''.join(char_list)
if self.reverse: # for arabic rec
text = self.pred_reverse(text)
result_list.append((text, np.mean(conf_list).tolist()))
return result_list
def get_ignored_tokens(self):
return [0] # for ctc blank
class CTCLabelDecode(BaseRecLabelDecode):
""" Convert between text-label and text-index """
def __init__(self, character_dict_path=None, use_space_char=False,
**kwargs):
super(CTCLabelDecode, self).__init__(character_dict_path,
use_space_char)
def __call__(self, preds, label=None, *args, **kwargs):
if isinstance(preds, tuple) or isinstance(preds, list):
preds = preds[-1]
if isinstance(preds, paddle.Tensor):
preds = preds.numpy()
preds_idx = preds.argmax(axis=2)
preds_prob = preds.max(axis=2)
text = self.decode(preds_idx, preds_prob, is_remove_duplicate=True)
if label is None:
return text
label = self.decode(label)
return text, label
def add_special_char(self, dict_character):
dict_character = ['blank'] + dict_character
return dict_character
def recfen(img_list, beg_img_no , batch_num,rec_image_shape,indices):
img_num=len(img_list)
end_img_no = min(img_num, beg_img_no + batch_num)
norm_img_batch = []
imgC, imgH, imgW = rec_image_shape[:3]
max_wh_ratio = imgW / imgH
wh_ratio_list = []
for ino in range(beg_img_no, end_img_no):
h, w = img_list[indices[ino]].shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
wh_ratio_list.append(wh_ratio)
for ino in range(beg_img_no, end_img_no):
norm_img = resize_norm_img(img_list[indices[ino]],
max_wh_ratio)
norm_img = norm_img[np.newaxis, :]
norm_img_batch.append(norm_img)
norm_img_batch = np.concatenate(norm_img_batch)
return norm_img_batch,wh_ratio_list,max_wh_ratio
class TextDetognizer(object):
def __init__(self):
self.det_predictor = None
# self.det_predictor = det_model_pool.acquire()
config = Config('whl/det/ch/ch_PP-OCRv4_det_infer/inference.pdmodel',
'whl/det/ch/ch_PP-OCRv4_det_infer/inference.pdiparams')
config.enable_use_gpu(20, 0)
config.disable_glog_info()
config.glog_info_disabled()
config.switch_use_feed_fetch_ops(False)
config.switch_specify_input_names(True)
config.enable_memory_optim() # 打开显存优化
config.enable_mkldnn()
self.det_predictor = create_predictor(config)
class detModelPool:
def __init__(self, size):
self.max_size = size
self.pool = queue.Queue(size)
self.jiazai()
def jiazai(self):
n = 0
for _ in range(self.max_size):
config = Config('whl/det/ch/ch_PP-OCRv4_det_infer/inference.pdmodel',
'whl/det/ch/ch_PP-OCRv4_det_infer/inference.pdiparams')
config.enable_use_gpu(20, 0)
config.disable_glog_info()
config.glog_info_disabled()
config.switch_use_feed_fetch_ops(False)
config.switch_specify_input_names(True)
config.enable_memory_optim() # 打开显存优化
config.enable_mkldnn()
predictor = create_predictor(config)
self.pool.put(predictor)
n += 1
print('已加载{n}个det模型'.format(n=n))
def acquire(self):
try:
rec_predictor = self.pool.get(block=False)
except queue.Empty:
print("没有空闲模型可用,正在等待...")
rec_predictor = self.pool.get() # 阻塞直到有模型可用
return rec_predictor
def release(self, rec_predictor):
self.pool.put(rec_predictor)
def run(self, img_list):
# det_predictor=self.det_predictor
input_names = self.det_predictor.get_input_names() # 获取输入tensor的name
for i, name in enumerate(input_names): # 对每个输入进行设置
input_tensor = self.det_predictor.get_input_handle(input_names[i])
input_tensor.reshape(img_list[i].shape)
input_tensor.copy_from_cpu(img_list[i].copy())
self.det_predictor.run() # 推理
results = []
lod_info = []
output_names = self.det_predictor.get_output_names()
# 获取所有的输出并放入results中
for i, name in enumerate(output_names):
output_tensor = self.det_predictor.get_output_handle(output_names[i])
output_data = output_tensor.copy_to_cpu()
results.append(output_data)
lod = output_tensor.lod()
if len(lod) > 0:
lod_info.append(lod[0])
return results, lod_info
def predict(self, img):
st = time.time()
det_img, ratio_list = det_preprocess(img)
ori_im = img.copy()
if img is None:
return None, 0
img = np.expand_dims(img, axis=0)
img = img.copy()
det_predictor = self.det_model_pool.acquire()
input_names = det_predictor.get_input_names()
for i, name in enumerate(input_names):
input_tensor = det_predictor.get_input_handle(input_names[i])
input_tensor.reshape(ori_im.shape)
input_tensor.copy_from_cpu(ori_im.copy())
det_predictor.run()
outputs = []
output_names = det_predictor.get_output_names()
# 获取所有的输出并放入results中
for i, name in enumerate(output_names):
output_tensor = det_predictor.get_output_handle(output_names[i])
output_data = output_tensor.copy_to_cpu()
self.det_model_pool.release(det_predictor)
preds = {}
preds['maps'] = outputs[0]
postprocess_params = {}
postprocess_params["thresh"] = 0.3
postprocess_params["box_thresh"] = 0.5
postprocess_params["max_candidates"] = 1000
postprocess_params["unclip_ratio"] = 2.0
postprocess_op = DBPostProcess(postprocess_params)
post_result = postprocess_op(preds, ratio_list)
dt_boxes = post_result[0]['points']
img_height, img_width = ori_im.shape[0:2]
dt_boxes_new = []
for box in dt_boxes:
if type(box) is list:
box = np.array(box)
rect = np.zeros((4, 2), dtype="float32")
s = box.sum(axis=1)
rect[0] = box[np.argmin(s)]
rect[2] = box[np.argmax(s)]
tmp = np.delete(box, (np.argmin(s), np.argmax(s)), axis=0)
diff = np.diff(np.array(tmp), axis=1)
rect[1] = tmp[np.argmin(diff)]
rect[3] = tmp[np.argmax(diff)]
box = rect
for pno in range(box.shape[0]):
box[pno, 0] = int(min(max(box[pno, 0], 0), img_width - 1))
box[pno, 1] = int(min(max(box[pno, 1], 0), img_height - 1))
rect_width = int(np.linalg.norm(box[0] - box[1]))
rect_height = int(np.linalg.norm(box[0] - box[3]))
if rect_width <= 3 or rect_height <= 3:
continue
dt_boxes_new.append(box)
dt_boxes = np.array(dt_boxes_new)
et = time.time()
return dt_boxes, et - st
def text_detector(self, img):
MIN_BOUND_DISTANCE = 50
det_limit_side_len = 960
dt_boxes = np.zeros((0, 4, 2), dtype=np.float32)
elapse = 0
if img.shape[0] / img.shape[1] > 2 and img.shape[0] > det_limit_side_len:
start_h = 0
end_h = 0
while end_h <= img.shape[0]:
end_h = start_h + img.shape[1] * 3 // 4
subimg = img[start_h: end_h, :]
if len(subimg) == 0:
break
sub_dt_boxes, sub_elapse = self.predict(subimg)
offset = start_h
# To prevent text blocks from being cut off, roll back a certain buffer area.
if len(sub_dt_boxes) == 0 or img.shape[1] - max([x[-1][1] for x in sub_dt_boxes]) > MIN_BOUND_DISTANCE:
start_h = end_h
else:
sorted_indices = np.argsort(sub_dt_boxes[:, 2, 1])
sub_dt_boxes = sub_dt_boxes[sorted_indices]
bottom_line = 0 if len(sub_dt_boxes) <= 1 else int(np.max(sub_dt_boxes[:-1, 2, 1]))
if bottom_line > 0:
start_h += bottom_line
sub_dt_boxes = sub_dt_boxes[sub_dt_boxes[:, 2, 1] <= bottom_line]
else:
start_h = end_h
if len(sub_dt_boxes) > 0:
if dt_boxes.shape[0] == 0:
dt_boxes = sub_dt_boxes + np.array([0, offset], dtype=np.float32)
else:
dt_boxes = np.append(dt_boxes,
sub_dt_boxes + np.array([0, offset], dtype=np.float32),
axis=0)
elapse += sub_elapse
elif img.shape[1] / img.shape[0] > 3 and img.shape[1] > det_limit_side_len * 3:
start_w = 0
end_w = 0
while end_w <= img.shape[1]:
end_w = start_w + img.shape[0] * 3 // 4
subimg = img[:, start_w: end_w]
if len(subimg) == 0:
break
sub_dt_boxes, sub_elapse = self.predict(subimg)
offset = start_w
if len(sub_dt_boxes) == 0 or img.shape[0] - max([x[-1][0] for x in sub_dt_boxes]) > MIN_BOUND_DISTANCE:
start_w = end_w
else:
sorted_indices = np.argsort(sub_dt_boxes[:, 2, 0])
sub_dt_boxes = sub_dt_boxes[sorted_indices]
right_line = 0 if len(sub_dt_boxes) <= 1 else int(np.max(sub_dt_boxes[:-1, 1, 0]))
if right_line > 0:
start_w += right_line
sub_dt_boxes = sub_dt_boxes[sub_dt_boxes[:, 1, 0] <= right_line]
else:
start_w = end_w
if len(sub_dt_boxes) > 0:
if dt_boxes.shape[0] == 0:
dt_boxes = sub_dt_boxes + np.array([offset, 0], dtype=np.float32)
else:
dt_boxes = np.append(dt_boxes,
sub_dt_boxes + np.array([offset, 0], dtype=np.float32),
axis=0)
elapse += sub_elapse
else:
dt_boxes, elapse = self.predict(img)
return dt_boxes, elapse
def __call__(self, img, cls=True):
time_dict = {'det': 0, 'rec': 0, 'cls': 0, 'all': 0}
if img is None:
print("no valid image provided")
return None, None, time_dict
start = time.time()
ori_im = img.copy()
dt_boxes, elapse = self.text_detector(img)
time_dict['det'] = elapse
if dt_boxes is None:
print("no dt_boxes found, elapsed : {}".format(elapse))
end = time.time()
time_dict['all'] = end - start
return None, None, time_dict
else:
print("dt_boxes num : {}, elapsed : {}".format(
len(dt_boxes), elapse))
img_crop_list = []
dt_boxes = sorted_boxes(dt_boxes)
for bno in range(len(dt_boxes)):
tmp_box = copy.deepcopy(dt_boxes[bno])
img_crop = get_rotate_crop_image(ori_im, tmp_box)
img_crop_list.append(img_crop)
return img_crop_list
def det_resize_image(im, max_side_len=960):
h, w, _ = im.shape
resize_w = w
resize_h = h
# limit the max side
if max(resize_h, resize_w) > max_side_len:
if resize_h > resize_w:
ratio = float(max_side_len) / resize_h
else:
ratio = float(max_side_len) / resize_w
else:
ratio = 1.
resize_h = int(resize_h * ratio)
resize_w = int(resize_w * ratio)
if resize_h % 32 == 0:
resize_h = resize_h
elif resize_h // 32 <= 1:
resize_h = 32
else:
resize_h = (resize_h // 32 - 1) * 32
if resize_w % 32 == 0:
resize_w = resize_w
elif resize_w // 32 <= 1:
resize_w = 32
else:
resize_w = (resize_w // 32 - 1) * 32
if int(resize_w) <= 0 or int(resize_h) <= 0:
return None, (None, None)
im = cv2.resize(im, (int(resize_w), int(resize_h)))
ratio_h = resize_h / float(h)
ratio_w = resize_w / float(w)
return im, (ratio_h, ratio_w)
def det_normalize(im):
img_mean = [0.485, 0.456, 0.406]
img_std = [0.229, 0.224, 0.225]
im = im.astype(np.float32, copy=False)
im = im / 255
im -= img_mean
im /= img_std
channel_swap = (2, 0, 1)
im = im.transpose(channel_swap)
return im
def det_preprocess(im):
im, (ratio_h, ratio_w) = det_resize_image(im)
im = det_normalize(im)
im = im[np.newaxis, :]
return [im, (ratio_h, ratio_w)]
class DBPostProcess(object):
"""
The post process for Differentiable Binarization (DB).
"""
def __init__(self, params):
self.thresh = params['thresh']
self.box_thresh = params['box_thresh']
self.max_candidates = params['max_candidates']
self.unclip_ratio = params['unclip_ratio']
self.min_size = 3
def boxes_from_bitmap(self, pred, _bitmap, dest_width, dest_height):
'''
_bitmap: single map with shape (1, H, W),
whose values are binarized as {0, 1}
'''
bitmap = _bitmap
height, width = bitmap.shape
outs = cv2.findContours((bitmap * 255).astype(np.uint8), cv2.RETR_LIST,
cv2.CHAIN_APPROX_SIMPLE)
if len(outs) == 3:
img, contours, _ = outs[0], outs[1], outs[2]
elif len(outs) == 2:
contours, _ = outs[0], outs[1]
num_contours = min(len(contours), self.max_candidates)
boxes = np.zeros((num_contours, 4, 2), dtype=np.int16)
scores = np.zeros((num_contours, ), dtype=np.float32)
for index in range(num_contours):
contour = contours[index]
points, sside = self.get_mini_boxes(contour)
if sside < self.min_size:
continue
points = np.array(points)
score = self.box_score_fast(pred, points.reshape(-1, 2))
if self.box_thresh > score:
continue
box = self.unclip(points).reshape(-1, 1, 2)
box, sside = self.get_mini_boxes(box)
if sside < self.min_size + 2:
continue
box = np.array(box)
if not isinstance(dest_width, int):
dest_width = dest_width.item()
dest_height = dest_height.item()
box[:, 0] = np.clip(
np.round(box[:, 0] / width * dest_width), 0, dest_width)
box[:, 1] = np.clip(
np.round(box[:, 1] / height * dest_height), 0, dest_height)
boxes[index, :, :] = box.astype(np.int16)
scores[index] = score
return boxes, scores
def unclip(self, box):
unclip_ratio = self.unclip_ratio
poly = Polygon(box)
distance = poly.area * unclip_ratio / poly.length
offset = pyclipper.PyclipperOffset()
offset.AddPath(box, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON)
expanded = np.array(offset.Execute(distance))
return expanded
def get_mini_boxes(self, contour):
bounding_box = cv2.minAreaRect(contour)
points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0])
index_1, index_2, index_3, index_4 = 0, 1, 2, 3
if points[1][1] > points[0][1]:
index_1 = 0
index_4 = 1
else:
index_1 = 1
index_4 = 0
if points[3][1] > points[2][1]:
index_2 = 2
index_3 = 3
else:
index_2 = 3
index_3 = 2
box = [
points[index_1], points[index_2], points[index_3], points[index_4]
]
return box, min(bounding_box[1])
def box_score_fast(self, bitmap, _box):
h, w = bitmap.shape[:2]
box = _box.copy()
xmin = np.clip(np.floor(box[:, 0].min()).astype(int), 0, w - 1)
xmax = np.clip(np.ceil(box[:, 0].max()).astype(int), 0, w - 1)
ymin = np.clip(np.floor(box[:, 1].min()).astype(int), 0, h - 1)
ymax = np.clip(np.ceil(box[:, 1].max()).astype(int), 0, h - 1)
mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8)
box[:, 0] = box[:, 0] - xmin
box[:, 1] = box[:, 1] - ymin
cv2.fillPoly(mask, box.reshape(1, -1, 2).astype(np.int32), 1)
return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0]
def __call__(self, outs_dict, ratio_list):
pred = outs_dict['maps']
pred = pred[:, 0, :, :]
segmentation = pred > self.thresh
boxes_batch = []
for batch_index in range(pred.shape[0]):
height, width = pred.shape[-2:]
tmp_boxes, tmp_scores = self.boxes_from_bitmap(
pred[batch_index], segmentation[batch_index], width, height)
boxes = []
for k in range(len(tmp_boxes)):
if tmp_scores[k] > self.box_thresh:
boxes.append(tmp_boxes[k])
if len(boxes) > 0:
boxes = np.array(boxes)
ratio_h, ratio_w = ratio_list[batch_index]
boxes[:, :, 0] = boxes[:, :, 0] / ratio_w
boxes[:, :, 1] = boxes[:, :, 1] / ratio_h
boxes_batch.append(boxes)
return boxes_batch
def order_points_clockwise(pts):
xSorted = pts[np.argsort(pts[:, 0]), :]
leftMost = xSorted[:2, :]
rightMost = xSorted[2:, :]
leftMost = leftMost[np.argsort(leftMost[:, 1]), :]
(tl, bl) = leftMost
rightMost = rightMost[np.argsort(rightMost[:, 1]), :]
(tr, br) = rightMost
rect = np.array([tl, tr, br, bl], dtype="float32")
return rect
def clip_det_res(points, img_height, img_width):
for pno in range(4):
points[pno, 0] = int(min(max(points[pno, 0], 0), img_width - 1))
points[pno, 1] = int(min(max(points[pno, 1], 0), img_height - 1))
return points
def filter_tag_det_res(dt_boxes, image_shape):
img_height, img_width = image_shape[0:2]
dt_boxes_new = []
for box in dt_boxes:
box = order_points_clockwise(box)
box = clip_det_res(box, img_height, img_width)
rect_width = int(np.linalg.norm(box[0] - box[1]))
rect_height = int(np.linalg.norm(box[0] - box[3]))
if rect_width <= 10 or rect_height <= 10:
continue
dt_boxes_new.append(box)
dt_boxes = np.array(dt_boxes_new)
return dt_boxes
def sorted_boxes( dt_boxes):
num_boxes = dt_boxes.shape[0]
sorted_boxes = sorted(dt_boxes, key=lambda x: (x[0][1], x[0][0]))
_boxes = list(sorted_boxes)
for i in range(num_boxes - 1):
if abs(_boxes[i+1][0][1] - _boxes[i][0][1]) < 10 and \
(_boxes[i + 1][0][0] < _boxes[i][0][0]):
tmp = _boxes[i]
_boxes[i] = _boxes[i + 1]
_boxes[i + 1] = tmp
return _boxes
def get_rotate_crop_image(img, points):
img_height, img_width = img.shape[0:2]
left = int(np.min(points[:, 0]))
right = int(np.max(points[:, 0]))
top = int(np.min(points[:, 1]))
bottom = int(np.max(points[:, 1]))
img_crop = img[top:bottom, left:right, :].copy()
points[:, 0] = points[:, 0] - left
points[:, 1] = points[:, 1] - top
img_crop_width = int(np.linalg.norm(points[0] - points[1]))
img_crop_height = int(np.linalg.norm(points[0] - points[3]))
pts_std = np.float32([[0, 0], [img_crop_width, 0],\
[img_crop_width, img_crop_height], [0, img_crop_height]])
M = cv2.getPerspectiveTransform(points, pts_std)
dst_img = cv2.warpPerspective(
img_crop,
M, (img_crop_width, img_crop_height),
borderMode=cv2.BORDER_REPLICATE)
dst_img_height, dst_img_width = dst_img.shape[0:2]
if dst_img_height * 1.0 / dst_img_width >= 1.5:
dst_img = np.rot90(dst_img)
return dst_img
def det_postprocess(ori_im, ratio_list, results):
outs_dict = {}
outs_dict['maps'] = results[0]
postprocess_params = {}
postprocess_params["thresh"] = 0.3
postprocess_params["box_thresh"] = 0.5
postprocess_params["max_candidates"] = 1000
postprocess_params["unclip_ratio"] = 2.0
postprocess_op = DBPostProcess(postprocess_params)
dt_boxes_list = postprocess_op(outs_dict, [ratio_list])
dt_boxes = dt_boxes_list[0]
dt_boxes = filter_tag_det_res(dt_boxes, ori_im.shape)
img_crop_list = []
dt_boxes = sorted_boxes(dt_boxes)
for bno in range(len(dt_boxes)):
tmp_box = copy.deepcopy(dt_boxes[bno])
img_crop = get_rotate_crop_image(ori_im, tmp_box)
img_crop_list.append(img_crop)
return dt_boxes, img_crop_list
def resize_norm_img(img, max_wh_ratio):
imgC = 3
imgH = 32
imgW = 320
imgW = int(32 * max_wh_ratio)
h = img.shape[0]
w = img.shape[1]
ratio = w / float(h)
if math.ceil(imgH * ratio) > imgW:
resized_w = imgW
else:
resized_w = int(math.ceil(imgH * ratio))
resized_image = cv2.resize(img, (resized_w, imgH))
resized_image = resized_image.astype('float32')
resized_image = resized_image.transpose((2, 0, 1)) / 255
resized_image -= 0.5
resized_image /= 0.5
padding_im = np.zeros((imgC, imgH, imgW), dtype=np.float32)
padding_im[:, :, 0:resized_w] = resized_image
return padding_im
最后是自定义调用识别的代码
detm = TextDetognizer()
recm = TextRecognizers()
def ocrbase(cv2_image,method ):
kaishi=time.time()
if c== 2:
future = executor.submit(tuilidet,cv2_image)
det_img, ratio_list = future.result()
det_results, _ = detm.run([det_img])
future = executor.submit(tuilidetxia, cv2_image, ratio_list,det_results)
dt_boxes, img_crop_list=future.result()
else:
img_crop_list = [cv2_image]
dt_boxes = None
ressss, ttim = recm(img_crop_list, dt_boxes)
jieshu = time.time()
#print(f'消耗时间{jieshu-kaishi},开始时间:{kaishi},结束时间:{jieshu}')
#print(ressss)
return [ressss]
作者:cxp199105