| | |
| | | def variance_of_laplacian(image): |
| | | # 计算输入图像的拉普拉斯响应的方差 |
| | | return cv2.Laplacian(image, cv2.CV_64F).var() |
| | | |
| | | |
| | | def clean_directory(path, days_threshold, max_files): |
| | | """清理超过时间或数量的文件""" |
| | | now = time.time() |
| | | threshold = now - days_threshold * 24 * 3600 |
| | | files = [] |
| | | for f in os.listdir(path): |
| | | file_path = os.path.join(path, f) |
| | | if os.path.isfile(file_path): |
| | | files.append((file_path, os.path.getmtime(file_path))) |
| | | # 按修改时间降序排序,保留最新文件 |
| | | files.sort(key=lambda x: x[1], reverse=True) |
| | | # 删除过期文件 |
| | | for file_info in files: |
| | | if file_info[1] < threshold: |
| | | os.remove(file_info[0]) |
| | | # 保留最新文件,删除多余文件 |
| | | for file_info in files[max_files:]: |
| | | os.remove(file_info[0]) |
| | | |
| | | |
| | | # 调用另一个长焦镜头,拍摄清晰的局部药材图片 |
| | | def get_image(): |
| | | logger.info("识别线程启动") |
| | | global is_loaded, class_count, class_count_max, class_sum |
| | | camera2_index = webcams.get(cam2) |
| | | camera2_index = config['cam']['cam2'] |
| | | print("第二个摄像头索引:" + str(camera2_index)) |
| | | # 打开摄像头 |
| | | capture = cv2.VideoCapture(camera2_index, cv2.CAP_DSHOW) |
| | |
| | | break |
| | | count += 1 |
| | | |
| | | if count == 2: |
| | | if count == config['cam']['frames']: |
| | | herb_probabilities = herb_identifier(frame2) |
| | | top_five_classes = np.argsort(herb_probabilities, axis=1)[0][-5:][::-1] |
| | | name = "" |
| | |
| | | # 计算拉普拉斯响应的方差 |
| | | laplacian = variance_of_laplacian(frame2) |
| | | # 生成保存文件名,以当前时间命名 |
| | | save_name2 = name +"_["+ str(round(laplacian, 2)) +"]_"+ time.strftime("%Y%m%d%H%M%S", time.localtime()) + ".jpg" |
| | | save_name2 = time.strftime("%Y%m%d%H%M%S", time.localtime()) + "_" +name +"_["+ str(round(laplacian, 2)) +"]"+ ".jpg" |
| | | logger.info(f"识别结果转换为保存图片名称:, {save_name2}") |
| | | # 判断图像的清晰度 |
| | | # 保存调整尺寸后的图片 |
| | | if laplacian > 200: |
| | | c_ = save_path + "2/c/" |
| | | # 判断文件是否存在,不存在则创建 |
| | | if not os.path.exists(c_): |
| | | os.makedirs(c_) |
| | | cv2.imwrite(c_ + save_name2, frame2) |
| | | # 新增清理调用 |
| | | clean_directory(c_, config['cam']['days_threshold'], config['cam']['max_files']) |
| | | # 清晰条件下累计识别结果中药材名称出现的次数 |
| | | # 累计每种药材不论名次出现的次数,累计每种药材置信度最高的次数,累计每种药材的置信度总和 |
| | | # class_count = {} |
| | |
| | | is_loaded = True |
| | | else: |
| | | n_ = save_path + "2/n/" |
| | | # 判断文件是否存在,不存在则创建 |
| | | if not os.path.exists(n_): |
| | | os.makedirs(n_) |
| | | cv2.imwrite(n_ + save_name2, frame2) |
| | | # 新增清理调用 |
| | | clean_directory(n_, config['cam']['days_threshold'], config['cam']['max_files']) |
| | | # cv2.imshow("Camera", resized_frame2) |
| | | print("保存图片:", save_name2) |
| | | break |
| | | # 结束线程 |
| | | capture.release() |
| | | def send_result(class_count, class_count_max, class_sum): |
| | | global is_loaded |
| | | def send_result(): |
| | | global is_loaded,class_count, class_count_max, class_sum |
| | | # 将三种统计结果输出到日志中 |
| | | logger.info("class_count:"+str(class_count)) |
| | | logger.info("class_count_max:"+str(class_count_max)) |
| | |
| | | def load_identify(): |
| | | global is_loaded |
| | | # 摄像头索引号,通常为0表示第一个摄像头 |
| | | camera_index = webcams.get(cam1) |
| | | camera_index = config['cam']['cam1'] |
| | | print("第一个摄像头索引:" + str(camera_index)) |
| | | # 打开摄像头 |
| | | cap = cv2.VideoCapture(camera_index, cv2.CAP_DSHOW) |
| | |
| | | # 上料状态 |
| | | status = "没有上料" |
| | | |
| | | # 累计每种药材不论名次出现的次数 |
| | | class_count = {} |
| | | # 累计每种药材置信度最高的次数 |
| | | class_count_max = {} |
| | | # 累计每种药材的置信度总和 |
| | | class_sum = {} |
| | | |
| | | # 循环读取摄像头画面 |
| | | while True: |
| | | logger.info("循环读取摄像头画面") |
| | | # 睡眠100毫秒 |
| | | time.sleep(0.1) |
| | | time.sleep(config['cam']['sleep']) |
| | | ret, frame = cap.read() |
| | | if not ret: |
| | | print("无法读取摄像头画面") |
| | |
| | | break |
| | | # 获取当前时间 |
| | | current_time = time.time() |
| | | # 每隔3秒取一帧图像 |
| | | |
| | | # 安全检测 |
| | | boxes, scores, class_ids = safety_detect(frame) |
| | | draw_img = safety_detect.draw_detections(frame, boxes, scores, class_ids) |
| | |
| | | if not os.path.exists(path_): |
| | | os.makedirs(path_) |
| | | cv2.imwrite(path_ + save_name, frame) |
| | | # 新增清理调用 |
| | | clean_directory(path_, config['cam']['days_threshold'], config['cam']['max_files']) |
| | | # 重置计时器 |
| | | stime = time.time() |
| | | |
| | |
| | | status = "没有上料" |
| | | if class_ == "meishangliao" and count == 3 and is_loaded: |
| | | logger.info("停止上料后发送识别结果") |
| | | send_result(class_count, class_count_max, class_sum) |
| | | send_result() |
| | | if class_ == "meishangliao" and count == 1000: |
| | | is_loaded = False |
| | | logger.info("长时间未上料,重置正在上料状态") |
| | |
| | | fps = frame_count / elapsed_time |
| | | # print(f"FPS: {fps:.2f}") |
| | | # 将FPS绘制在图像上 |
| | | cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, |
| | | cv2.putText(draw_img, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, |
| | | cv2.LINE_AA) |
| | | # 显示画面 |
| | | cv2.imshow("Camera", draw_img) |
| | | cv2.imshow("AICamera", draw_img) |
| | | # 检测按键,如果按下q键则退出循环 |
| | | if cv2.waitKey(1) & 0xFF == ord('q'): |
| | | break |
| | |
| | | return 0 |
| | | |
| | | if __name__ == '__main__': |
| | | cam1 = "USB Camera" |
| | | cam2 = "PC Camera" |
| | | camUtil = CAM_UTIL(cam1, cam2) |
| | | webcams = camUtil.list_webcams() |
| | | print("摄像头", webcams) |
| | | # 累计每种药材不论名次出现的次数 |
| | | class_count = {} |
| | | # 累计每种药材置信度最高的次数 |
| | | class_count_max = {} |
| | | # 累计每种药材的置信度总和 |
| | | class_sum = {} |
| | | # cam1 = "USB Camera" |
| | | # cam2 = "USB ZOOM Camera" |
| | | # camUtil = CAM_UTIL(cam1, cam2) |
| | | # webcams = camUtil.webcam_list |
| | | # print("摄像头", webcams) |
| | | save_path = "data/images/" |
| | | # 是否上过料 |
| | | is_loaded = False |