import sys import time import cv2 import numpy as np import ctypes import ctypes.wintypes import yaml import win32api import win32con import win32gui import multiprocessing from safety_detect import SAFETY_DETECT from identifier import IDENTIFIER import os from logger_config import logger import threading multiprocessing.freeze_support() def variance_of_laplacian(image): # 计算输入图像的拉普拉斯响应的方差 return cv2.Laplacian(image, cv2.CV_64F).var() def clean_directory(path, days_threshold, max_files): """清理超过时间或数量的文件""" now = time.time() threshold = now - days_threshold * 24 * 3600 files = [] for f in os.listdir(path): file_path = os.path.join(path, f) if os.path.isfile(file_path): files.append((file_path, os.path.getmtime(file_path))) # 按修改时间降序排序,保留最新文件 files.sort(key=lambda x: x[1], reverse=True) # 删除过期文件 for file_info in files: if file_info[1] < threshold: os.remove(file_info[0]) # 保留最新文件,删除多余文件 for file_info in files[max_files:]: os.remove(file_info[0]) # 调用另一个长焦镜头,拍摄清晰的局部药材图片 def get_image(): herb_identifier = IDENTIFIER("model/herb_identify.onnx") logger.info("识别线程启动") global is_loaded, class_count, class_count_max, class_sum camera2_index = config['cam']['cam2'] print("第二个摄像头索引:" + str(camera2_index)) # 打开摄像头 capture = cv2.VideoCapture(camera2_index, cv2.CAP_DSHOW) # 设置分辨率y capture.set(cv2.CAP_PROP_FRAME_WIDTH, 2048) # 宽度 capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 1540) # 高度 # 检查摄像头是否成功打开 if not capture.isOpened(): print("无法打开摄像头2") logger.error("无法打开摄像头2") exit() width2 = capture.get(cv2.CAP_PROP_FRAME_WIDTH) height2 = capture.get(cv2.CAP_PROP_FRAME_HEIGHT) print("摄像头2分辨率:", width2, "x", height2) logger.info(f"摄像头2分辨率:, {width2}, x, {height2}") # 循环读取摄像头画面 # Shadows name 'width' from outer scope count = 0 while True: ret2, frame2 = capture.read() if not ret2: print("无法读取摄像头画面") logger.error("无法读取摄像头画面") break count += 1 if count == config['cam']['frames']: herb_probabilities = herb_identifier(frame2) top_five_classes = np.argsort(herb_probabilities, axis=1)[0][-5:][::-1] name = "" for i, class_id in enumerate(top_five_classes): # 保留两位小数 probability = round(herb_probabilities[0][class_id], 2) herb_class = herb_identifier.class_names[class_id] name = name + herb_class + "=" + str(probability) # 显示画面 # cv2.imshow('Output2', resized_frame2) # 计算拉普拉斯响应的方差 laplacian = variance_of_laplacian(frame2) # 生成保存文件名,以当前时间命名 save_name2 = time.strftime("%Y%m%d%H%M%S", time.localtime()) + "_" +name +"_["+ str(round(laplacian, 2)) +"]"+ ".jpg" logger.info(f"识别结果转换为保存图片名称:, {save_name2}") # 判断图像的清晰度 # 保存调整尺寸后的图片 if laplacian > 200: c_ = save_path + "2/c/" if not os.path.exists(c_): os.makedirs(c_) cv2.imwrite(c_ + save_name2, frame2) # 新增清理调用 clean_directory(c_, config['cam']['days_threshold'], config['cam']['max_files']) # 清晰条件下累计识别结果中药材名称出现的次数 # 累计每种药材不论名次出现的次数,累计每种药材置信度最高的次数,累计每种药材的置信度总和 # class_count = {} # class_count_max = {} # class_sum = {} for i in range(len(top_five_classes)): class_id = top_five_classes[i] herb_class = herb_identifier.class_names[class_id] # 累计每种药材不论名次出现的次数 if herb_class in class_count: class_count[herb_class] += 1 else: class_count[herb_class] = 1 # 累计每种药材置信度最高的次数 if i == 0 and herb_class in class_count_max: class_count_max[herb_class] += 1 elif i == 0: class_count_max[herb_class] = 1 # 累计每种药材的置信度总和 if herb_class in class_sum: class_sum[herb_class] += herb_probabilities[0][class_id] else: class_sum[herb_class] = herb_probabilities[0][class_id] is_loaded = True else: n_ = save_path + "2/n/" if not os.path.exists(n_): os.makedirs(n_) cv2.imwrite(n_ + save_name2, frame2) # 新增清理调用 clean_directory(n_, config['cam']['days_threshold'], config['cam']['max_files']) # cv2.imshow("Camera", resized_frame2) print("保存图片:", save_name2) break # 结束线程 capture.release() def send_result(): global is_loaded,class_count, class_count_max, class_sum # 对class_count进行排序,按照值从大到小排序,返回值最大的前五个 sorted_class_count = dict(sorted(class_count.items(), key=lambda x: x[1], reverse=True)[:5]) # 对class_count_max进行排序,按照值从大到小排序,返回值最大的前五个 sorted_class_count_max = dict(sorted(class_count_max.items(), key=lambda x: x[1], reverse=True)[:5]) # 对 class_sum进行排序,按照值从大到小排序,返回值最大的前五个 sorted_class_sum = dict(sorted(class_sum.items(), key=lambda x: x[1], reverse=True)[:5]) # 将三种统计结果输出到日志中 logger.info("class_count:"+str(class_count)) logger.info("sorted_class_count:"+str(sorted_class_count)) logger.info("class_count_max:"+str(class_count_max)) logger.info("sorted_class_count_max:"+str(sorted_class_count_max)) logger.info("class_sum:"+str(class_sum)) logger.info("sorted_class_sum:"+str(sorted_class_sum)) is_loaded = False count_msg = "airecognize," + f"{sorted_class_count}" logger.info("发送药材识别结果:"+str(count_msg)) l.send_msg(count_msg) pass def load_identify(): global is_loaded # 摄像头索引号,通常为0表示第一个摄像头 camera_index = config['cam']['cam1'] print("第一个摄像头索引:" + str(camera_index)) # 打开摄像头 cap = cv2.VideoCapture(camera_index, cv2.CAP_DSHOW) # 设置分辨率 # cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3840) # 宽度 # cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2160) # 高度 # 检查摄像头是否成功打开 if not cap.isOpened(): print("无法打开摄像头") logger.error("无法打开摄像头") exit() width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) print("摄像头分辨率:", width, "x", height) logger.info(f"摄像头分辨率:, {width}, x, {height}") # 目标图像尺寸 # 计时器 frame_count = 0 start_time = time.time() stime = time.time() if not os.path.exists(save_path): os.makedirs(save_path) # 上次识别结果 class_old = "1" # 累计次数 count = 0 # 上料状态 status = "没有上料" # 创建窗口并设置为可调整大小 cv2.namedWindow("AICamera", cv2.WINDOW_NORMAL) # 循环读取摄像头画面 while True: logger.info("循环读取摄像头画面") # 睡眠100毫秒 time.sleep(config['cam']['sleep']) ret, frame = cap.read() if not ret: print("无法读取摄像头画面") logger.error("无法读取摄像头画面") break # 获取当前时间 current_time = time.time() # 每隔3秒取一帧图像 # 安全检测 boxes, scores, class_ids = safety_detect(frame) draw_img = safety_detect.draw_detections(frame, boxes, scores, class_ids) det_res = {} if class_ids is not None: # 遍历class_ids 转换成类别名称 for i in range(len(class_ids)): class_id = class_ids[i] class_name = safety_detect.class_names[class_id] # 存入到det_res中 if class_name in det_res: det_res[class_name] = det_res[class_name] if det_res[class_name] > scores[i] else scores[i] else: det_res[class_name] = scores[i] print(det_res) logger.info(f"安全检测识别结果, {det_res}") # 如果cass_ids中包含0,则表示有安全检测到人体 if 0 in class_ids: res_ = "aidetect," + f"{det_res}" logger.info("发送安全检测结果:"+str(res_)) l.send_msg(res_) # 上料识别 probabilities = load_identifier(frame) # 找到最大概率的类别 predicted_class = np.argmax(probabilities, axis=1)[0] max_probability = np.max(probabilities, axis=1)[0] class_ = load_identifier.class_names[predicted_class] # 计算类型重复的次数,类别更换之后重新计数 if class_ != class_old: count = 0 else: count += 1 class_old = class_ print(f"{class_}:{count}: {max_probability}") logger.info(f"{class_}:{count}: {max_probability}") # 判断是否上料并且上料次数大于10次 if class_ == "shangliao" and count > 10: status = "正在上料" # 每隔3秒取一帧图像 # 如果距离上一次保存已经过去1秒,则保存当前画面 if current_time - stime >= 10.0: save_name = time.strftime("%Y%m%d%H%M%S", time.localtime()) + ".jpg" # 保存调整尺寸后的图片 path_ = save_path + "1/" if not os.path.exists(path_): os.makedirs(path_) cv2.imwrite(path_ + save_name, frame) # 新增清理调用 clean_directory(path_, config['cam']['days_threshold'], config['cam']['max_files']) # 重置计时器 stime = time.time() thread = threading.Thread(target=get_image) thread.start() else: status = "没有上料" if class_ == "meishangliao" and count == 3 and is_loaded: logger.info("停止上料后发送识别结果") send_result() if class_ == "meishangliao" and count == 1000: is_loaded = False logger.info("长时间未上料,重置正在上料状态") # print(status) # 上料机位置识别 probabilities2 = hoister_position(frame); predicted_class2 = np.argmax(probabilities2, axis=1)[0] max_probability2 = np.max(probabilities2, axis=1)[0] class_2 = hoister_position.class_names[predicted_class2] print(f"-----------{class_2}:{predicted_class2}: {max_probability2}") logger.info(f"-----------{class_2}:{predicted_class2}: {max_probability2}") if predicted_class2 == 0: feeder_res = {class_2: max_probability2} class_feeder = "aifeeder," + f"{feeder_res}" print("send_msg", class_feeder) logger.info("发送上料机位置识别结果:"+str(class_feeder)) l.send_msg(class_feeder) # 计算帧速率 frame_count += 1 end_time = time.time() elapsed_time = end_time - start_time fps = frame_count / elapsed_time # print(f"FPS: {fps:.2f}") # 将FPS绘制在图像上 cv2.putText(draw_img, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) # 显示画面 # 获取当前窗口大小 width = cv2.getWindowImageRect("AICamera")[2] height = cv2.getWindowImageRect("AICamera")[3] print("width", width, "height", height) # 如果height小于1则赋值100 if height < 1: height = 100 # 调整图像大小以适应窗口 resized_frame = cv2.resize(draw_img, (width, height)) cv2.imshow("AICamera", resized_frame) # 检测按键,如果按下q键则退出循环 if cv2.waitKey(1) & 0xFF == ord('q'): break # 关闭摄像头 cap.release() # 关闭所有窗口 cv2.destroyAllWindows() # 读取配置文件 def read_config(file_path='./config/herb_ai.yaml'): with open(file_path, 'r') as file: config = yaml.safe_load(file) return config # 消息结构 class COPYDATASTRUCT(ctypes.Structure): _fields_ = [ ('dwData', ctypes.wintypes.LPARAM), ('cbData', ctypes.wintypes.DWORD), ('lpData', ctypes.c_char_p) ] # logging.info("准备加载安全检测模型..") # print("准备加载安全检测模型..") # model_safe = SAFETY_DETECT(config['model']['safe']) # # logging.info("安全检测模型加载成功。") # print("安全检测模型加载成功。") # logging.info("准备加载药材识别模型..") # print("准备加载药材识别模型..") # model_cls = HERB_IDENTIFY(config['model']['cls']) # logging.info("药材识别模型加载成功。") # print("药材识别模型加载成功。") class Listener: def __init__(self): WindowName = config['win']['windowName'] ClassName = config['win']['className'] message_map = { win32con.WM_COPYDATA: self.OnCopyData, win32con.WM_CLOSE: self.HandleClose, } wc = win32gui.WNDCLASS() wc.lpfnWndProc = message_map wc.lpszClassName = ClassName hinst = wc.hInstance = win32api.GetModuleHandle(None) classAtom = win32gui.RegisterClass(wc) self.hwnd = win32gui.CreateWindow( classAtom, WindowName, win32con.WS_OVERLAPPEDWINDOW, # 窗口样式 0, 0, win32con.CW_USEDEFAULT, win32con.CW_USEDEFAULT, 0, 0, hinst, None ) logger.info(f"启动成功.当前句柄{self.hwnd}") # # 隐藏窗口 # win32gui.ShowWindow(self.hwnd, win32con.SW_HIDE) print("启动成功.当前句柄", self.hwnd) def OnCopyData(self, hwnd, msg, wparam, lparam): try: # 记录开始时间 start_time = time.time() pCDS = ctypes.cast(lparam, PCOPYDATASTRUCT) s = ctypes.string_at(pCDS.contents.lpData).decode() strArr = s.split(",") logger.info(f"收到来自句柄{hwnd}的消息:{s}") print(f"收到来自句柄{hwnd}的消息:{s}") res = {} msg = "" # 发送指令: # AI药材识别:100 # AI上料区安全检测:101 # AI下料满料识别:102 # AI下料区安全检测:103 # 接收指令: 药材识别接收指令:airecognize, {json} # AI上料区安全检测:aidetect, {json} if strArr[0] == '100': pass # names, confs = cls_process(strArr[1]) # for idx, name in enumerate(names): # if name in res: # res[name] = res[name] if res[name] > confs[idx] else confs[idx] # else: # res[name] = confs[idx] # msg = "airecognize," + f"{res}" elif strArr[0] == '101': pass # names, confs = safe_process(strArr[1]) # for idx, name in enumerate(names): # if name in res: # res[name] = res[name] if res[name] > confs[idx] else confs[idx] # else: # res[name] = confs[idx] # msg = "aidetect," + f"{res}" logger.info(f"识别结果:{res}") self.send_msg(msg) # 记录结束时间 end_time = time.time() # 计算执行时间 execution_time = end_time - start_time # 打印执行时间 print(f"程序执行时间为:{execution_time}秒") logger.info(f"程序执行时间为:{execution_time,}秒") logger.info("-" * 20) except Exception as e: print(e) pass return 1 def send_msg(self, msg): # 通过寻找窗口名获取句柄 hwnd = FindWindow(config['tag_win']['className'], config['tag_win']['windowName']) print(f"返回结果,发现窗口:{hwnd}") logger.info(f"返回结果,发现窗口:{hwnd}") # 或者直接根据句柄来发送 # hwnd = 1707560 cds = COPYDATASTRUCT() cds.dwData = 0 msg_bytes = msg.encode('utf-8') cds.cbData = ctypes.sizeof(ctypes.create_string_buffer(msg_bytes)) cds.lpData = ctypes.c_char_p(msg_bytes) SendMessage(hwnd, win32con.WM_COPYDATA, 0, ctypes.byref(cds)) def HandleClose(self, hwnd, msg, wparam, lparam): logger.info("关闭窗口") print("handle close") # 关闭当前窗口 win32gui.DestroyWindow(hwnd) sys.exit() return 0 if __name__ == '__main__': # 累计每种药材不论名次出现的次数 class_count = {} # 累计每种药材置信度最高的次数 class_count_max = {} # 累计每种药材的置信度总和 class_sum = {} # cam1 = "USB Camera" # cam2 = "USB ZOOM Camera" # camUtil = CAM_UTIL(cam1, cam2) # webcams = camUtil.webcam_list # print("摄像头", webcams) save_path = "data/images/" # 是否上过料 is_loaded = False # 加载ONNX模型 load_identifier = IDENTIFIER("model/loading.onnx") hoister_position = IDENTIFIER("model/hl.onnx") safety_detect = SAFETY_DETECT("model/safety_det_.onnx") config = read_config() PCOPYDATASTRUCT = ctypes.POINTER(COPYDATASTRUCT) # 查询窗口方法 FindWindow = ctypes.windll.user32.FindWindowW # 发送消息方法 SendMessage = ctypes.windll.user32.SendMessageW l = Listener() load_identify() win32gui.PumpMessages()