想要多进程处理达到低延迟多摄像头实时目标识别,在尝试单摄像头时结果显示出问题
读取图像放入队列
def image_put(q_put, ip):
capture = cv2.VideoCapture(int(ip))
ref2, frame2 = capture.read()
if not ref2:
raise ValueError("未能正确读取摄像头(视频),请注意是否正确安装摄像头(是否正确填写视频路径)。")
while True:
ref2, frame2 = capture.read()
if not ref2:
break
frame2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2RGB)
q_put.put(frame2) if ref2 else None
q_put.get() if q_put.qsize() > 1 else None
获取队列中的图像
def image_get(q_get, window_name):
cv2.namedWindow(window_name, flags=cv2.WINDOW_FREERATIO) if window_name else None
while True:
frame2 = q_get.get()
frame2 = np.copy(frame2)
frame2 = cv2.cvtColor(frame2, cv2.COLOR_RGB2BGR)
frame2 = cv2.putText(frame2, (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
(cv2.imshow(window_name, frame2), cv2.waitKey(1)) if window_name else None
cv2.waitKey(1)
if __name__ == "__main__":
retinaface = Retinaface()
camera_ip_l = [
# '0',
'1',
# "172.18.18.101",
# "172.18.18.102",
# "[fe80::3aaf:29ff:fed3:d260]", # ipv6
]
mp.set_start_method(method='spawn')
origin_img_q = mp.Queue(maxsize=2)
result_img_q = mp.Queue(maxsize=4)
processes = [
mp.Process(target=image_put, args=(origin_img_q, camera_ip_l[0])),
mp.Process(target=retinaface.detect_image, args=(origin_img_q, result_img_q)),
mp.Process(target=image_get, args=(result_img_q, camera_ip_l[0])),
]
[setattr(process, "demo", True) for process in processes]
[process.start() for process in processes]
[process.join() for process in processes]
检测代码
def detect_image(self, origin_img_q, result_img_q):
while origin_img_q.qsize() == 0:
time.sleep(0.1)
origin_img = origin_img_q.get()
old_image = origin_img.copy()
image = np.array(origin_img, np.float32)
im_height, im_width, _ = np.shape(image)
scale = [
np.shape(image)[1], np.shape(image)[0], np.shape(image)[1], np.shape(image)[0]
]
scale_for_landmarks = [
np.shape(image)[1], np.shape(image)[0], np.shape(image)[1], np.shape(image)[0],
np.shape(image)[1], np.shape(image)[0], np.shape(image)[1], np.shape(image)[0],
np.shape(image)[1], np.shape(image)[0]
]
if self.letterbox_image:
image = letterbox_image(image, [self.retinaface_input_shape[1], self.retinaface_input_shape[0]])
anchors = self.anchors
else:
anchors = Anchors(self.cfg, image_size=(im_height, im_width)).get_anchors()
with torch.no_grad():
image = torch.from_numpy(preprocess_input(image).transpose(2, 0, 1)).unsqueeze(0).type(torch.FloatTensor)
if self.cuda:
anchors = anchors.cuda()
image = image.cuda()
loc, conf, landms = self.net(image)
boxes = decode(loc.data.squeeze(0), anchors, self.cfg['variance'])
conf = conf.data.squeeze(0)[:, 1:2]
landms = decode_landm(landms.data.squeeze(0), anchors, self.cfg['variance'])
boxes_conf_landms = torch.cat([boxes, conf, landms], -1)
boxes_conf_landms = non_max_suppression(boxes_conf_landms, self.confidence)
if len(boxes_conf_landms) <= 0:
return old_image
if self.letterbox_image:
boxes_conf_landms = retinaface_correct_boxes(boxes_conf_landms, \
np.array([self.retinaface_input_shape[0],
self.retinaface_input_shape[1]]),
np.array([im_height, im_width]))
boxes_conf_landms[:, :4] = boxes_conf_landms[:, :4] * scale
boxes_conf_landms[:, 5:] = boxes_conf_landms[:, 5:] * scale_for_landmarks
face_encodings = []
for boxes_conf_landm in boxes_conf_landms:
boxes_conf_landm = np.maximum(boxes_conf_landm, 0)
crop_img = np.array(old_image)[int(boxes_conf_landm[1]):int(boxes_conf_landm[3]),
int(boxes_conf_landm[0]):int(boxes_conf_landm[2])]
landmark = np.reshape(boxes_conf_landm[5:], (5, 2)) - np.array(
[int(boxes_conf_landm[0]), int(boxes_conf_landm[1])])
crop_img, _ = Alignment_1(crop_img, landmark)
crop_img = np.array(
letterbox_image(np.uint8(crop_img), (self.facenet_input_shape[1], self.facenet_input_shape[0]))) / 255
crop_img = np.expand_dims(crop_img.transpose(2, 0, 1), 0)
with torch.no_grad():
crop_img = torch.from_numpy(crop_img).type(torch.FloatTensor)
if self.cuda:
crop_img = crop_img.cuda()
face_encoding = self.facenet(crop_img)[0].cpu().numpy()
face_encodings.append(face_encoding)
face_names = []
for face_encoding in face_encodings:
matches, face_distances = compare_faces(self.known_face_encodings, face_encoding,
tolerance=self.facenet_threhold)
name = "Unknown"
best_match_index = np.argmin(face_distances)
best_score = face_distances[best_match_index]
if matches[best_match_index]:
name = self.known_face_names[best_match_index]
face_names.append(name)
for i, b in enumerate(boxes_conf_landms):
b = list(map(int, b))
cv2.rectangle(old_image, (b[0], b[1]), (b[2], b[3]), (0, 0, 255), 2)
cx = b[0]
cy = b[1] - 10
im0 = old_image
cv2.putText(old_image, str(best_score), (cx, cy),
cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))
name = face_names[i]
old_image = cv2ImgAddText(old_image, name, b[0] + 5, b[3] - 25)
result_img_q.put(old_image)
结果显示无响应
求大手子帮忙看看
你要先解决下面报错的问题,目前看来是有空图片了,检查一下图片的数据看下那一帧不对,或者在调用的时候判断下图片或者相机读取的时候返回值是否正确再进行下面的操作