baoshiwei
2025-04-22 88fc0f9f9b7fd3eb81c958ca41ed822cf3657c47
onnx/herb_ai_.py
copy from herb_ai.py copy to onnx/herb_ai_.py
Îļþ´Ó herb_ai.py ¸´ÖÆ
@@ -10,6 +10,7 @@
import win32gui
import multiprocessing
from safety_detect import SAFETY_DETECT
from cam_util import CAM_UTIL
from identifier import IDENTIFIER
import os
from logger_config import logger
@@ -22,34 +23,11 @@
def variance_of_laplacian(image):
    # è®¡ç®—输入图像的拉普拉斯响应的方差
    return cv2.Laplacian(image, cv2.CV_64F).var()
def clean_directory(path, days_threshold, max_files):
    """清理超过时间或数量的文件"""
    now = time.time()
    threshold = now - days_threshold * 24 * 3600
    files = []
    for f in os.listdir(path):
        file_path = os.path.join(path, f)
        if os.path.isfile(file_path):
            files.append((file_path, os.path.getmtime(file_path)))
    # æŒ‰ä¿®æ”¹æ—¶é—´é™åºæŽ’序,保留最新文件
    files.sort(key=lambda x: x[1], reverse=True)
    # åˆ é™¤è¿‡æœŸæ–‡ä»¶
    for file_info in files:
        if file_info[1] < threshold:
            os.remove(file_info[0])
    # ä¿ç•™æœ€æ–°æ–‡ä»¶ï¼Œåˆ é™¤å¤šä½™æ–‡ä»¶
    for file_info in files[max_files:]:
        os.remove(file_info[0])
# è°ƒç”¨å¦ä¸€ä¸ªé•¿ç„¦é•œå¤´ï¼Œæ‹æ‘„清晰的局部药材图片
def get_image():
    herb_identifier = IDENTIFIER("model/herb_identify.onnx")
    logger.info("识别线程启动")
    global is_loaded, class_count, class_count_max, class_sum
    camera2_index = config['cam']['cam2']
    camera2_index = 1
    print("第二个摄像头索引:" + str(camera2_index))
    # æ‰“开摄像头
    capture = cv2.VideoCapture(camera2_index, cv2.CAP_DSHOW)
@@ -76,7 +54,7 @@
            break
        count += 1
        if count == config['cam']['frames']:
        if count == 10:
            herb_probabilities = herb_identifier(frame2)
            top_five_classes = np.argsort(herb_probabilities, axis=1)[0][-5:][::-1]
            name = ""
@@ -90,17 +68,16 @@
            # è®¡ç®—拉普拉斯响应的方差
            laplacian = variance_of_laplacian(frame2)
            # ç”Ÿæˆä¿å­˜æ–‡ä»¶åï¼Œä»¥å½“前时间命名
            save_name2 = time.strftime("%Y%m%d%H%M%S", time.localtime()) + "_" +name +"_["+ str(round(laplacian, 2)) +"]"+  ".jpg"
            save_name2 = name +"_["+ str(round(laplacian, 2)) +"]_"+ time.strftime("%Y%m%d%H%M%S", time.localtime()) + ".jpg"
            logger.info(f"识别结果转换为保存图片名称:, {save_name2}")
            # åˆ¤æ–­å›¾åƒçš„æ¸…晰度
            # ä¿å­˜è°ƒæ•´å°ºå¯¸åŽçš„图片
            if laplacian > 200:
                c_ = save_path + "2/c/"
                # åˆ¤æ–­æ–‡ä»¶æ˜¯å¦å­˜åœ¨ï¼Œä¸å­˜åœ¨åˆ™åˆ›å»º
                if not os.path.exists(c_):
                    os.makedirs(c_)
                cv2.imwrite(c_ + save_name2, frame2)
                # æ–°å¢žæ¸…理调用
                clean_directory(c_, config['cam']['days_threshold'], config['cam']['max_files'])
                # æ¸…晰条件下累计识别结果中药材名称出现的次数
                # ç´¯è®¡æ¯ç§è¯æä¸è®ºåæ¬¡å‡ºçŽ°çš„æ¬¡æ•°,累计每种药材置信度最高的次数,累计每种药材的置信度总和
                # class_count = {}
@@ -127,11 +104,10 @@
                is_loaded = True
            else:
                n_ = save_path + "2/n/"
                # åˆ¤æ–­æ–‡ä»¶æ˜¯å¦å­˜åœ¨ï¼Œä¸å­˜åœ¨åˆ™åˆ›å»º
                if not os.path.exists(n_):
                    os.makedirs(n_)
                cv2.imwrite(n_ + save_name2, frame2)
                # æ–°å¢žæ¸…理调用
                clean_directory(n_, config['cam']['days_threshold'], config['cam']['max_files'])
            # cv2.imshow("Camera", resized_frame2)
            print("保存图片:", save_name2)
            break
@@ -139,30 +115,19 @@
    capture.release()
def send_result():
    global is_loaded,class_count, class_count_max, class_sum
    # å¯¹class_count进行排序,按照值从大到小排序,返回值最大的前五个
    sorted_class_count = dict(sorted(class_count.items(), key=lambda x: x[1], reverse=True)[:5])
    # å¯¹class_count_max进行排序,按照值从大到小排序,返回值最大的前五个
    sorted_class_count_max = dict(sorted(class_count_max.items(), key=lambda x: x[1], reverse=True)[:5])
    # å¯¹ class_sum进行排序,按照值从大到小排序,返回值最大的前五个
    sorted_class_sum = dict(sorted(class_sum.items(), key=lambda x: x[1], reverse=True)[:5])
    # å°†ä¸‰ç§ç»Ÿè®¡ç»“果输出到日志中
    logger.info("class_count:"+str(class_count))
    logger.info("sorted_class_count:"+str(sorted_class_count))
    logger.info("class_count_max:"+str(class_count_max))
    logger.info("sorted_class_count_max:"+str(sorted_class_count_max))
    logger.info("class_sum:"+str(class_sum))
    logger.info("sorted_class_sum:"+str(sorted_class_sum))
    is_loaded = False
    count_msg = "airecognize," + f"{sorted_class_count}"
    logger.info("发送药材识别结果:"+str(count_msg))
    l.send_msg(count_msg)
    l.send_msg("airecognize," + f"{class_count}")
    pass
def load_identify():
    global is_loaded
    global is_loaded, frame
    # æ‘„像头索引号,通常为0表示第一个摄像头
    camera_index = config['cam']['cam1']
    camera_index = 0
    print("第一个摄像头索引:" + str(camera_index))
    # æ‰“开摄像头
    cap = cv2.VideoCapture(camera_index, cv2.CAP_DSHOW)
@@ -182,24 +147,21 @@
    # è®¡æ—¶å™¨
    frame_count = 0
    start_time = time.time()
    stime = time.time()
    sstime = time.time()
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    # ä¸Šæ¬¡è¯†åˆ«ç»“æžœ
    class_old = "1"
    # ç´¯è®¡æ¬¡æ•°
    count = 0
    # ä¸Šæ–™çŠ¶æ€
    status = "没有上料"
    # åˆ›å»ºçª—口并设置为可调整大小
    cv2.namedWindow("AICamera", cv2.WINDOW_NORMAL)
    # å¾ªçŽ¯è¯»å–æ‘„åƒå¤´ç”»é¢
    while True:
        logger.info("循环读取摄像头画面")
        # logger.info("循环读取摄像头画面")
        # ç¡çœ 100毫秒
        time.sleep(config['cam']['sleep'])
        time.sleep(0.1)
        ret, frame = cap.read()
        if not ret:
            print("无法读取摄像头画面")
@@ -207,107 +169,26 @@
            break
        # èŽ·å–å½“å‰æ—¶é—´
        current_time = time.time()
        # æ¯éš”3秒取一帧图像
        # æ¯éš”n秒取一帧图像
        if current_time - sstime >= config['cam']['sleep']:
            sstime = current_time
        # å®‰å…¨æ£€æµ‹
        boxes, scores, class_ids = safety_detect(frame)
        draw_img = safety_detect.draw_detections(frame, boxes, scores, class_ids)
        det_res = {}
        if class_ids is not None:
            # éåކclass_ids è½¬æ¢æˆç±»åˆ«åç§°
            for i in range(len(class_ids)):
                class_id = class_ids[i]
                class_name = safety_detect.class_names[class_id]
                # å­˜å…¥åˆ°det_res中
                if class_name in det_res:
                    det_res[class_name] = det_res[class_name] if det_res[class_name] > scores[i] else scores[i]
                else:
                    det_res[class_name] = scores[i]
        print(det_res)
        logger.info(f"安全检测识别结果, {det_res}")
        # å¦‚æžœcass_ids中包含0,则表示有安全检测到人体
        if 0 in class_ids:
            res_ = "aidetect," + f"{det_res}"
            logger.info("发送安全检测结果:"+str(res_))
            l.send_msg(res_)
        # ä¸Šæ–™è¯†åˆ«
        probabilities = load_identifier(frame)
        # æ‰¾åˆ°æœ€å¤§æ¦‚率的类别
        predicted_class = np.argmax(probabilities, axis=1)[0]
        max_probability = np.max(probabilities, axis=1)[0]
        class_ = load_identifier.class_names[predicted_class]
        # è®¡ç®—类型重复的次数,类别更换之后重新计数
        if class_ != class_old:
            count = 0
        else:
            count += 1
        class_old = class_
        print(f"{class_}:{count}: {max_probability}")
        logger.info(f"{class_}:{count}: {max_probability}")
        # åˆ¤æ–­æ˜¯å¦ä¸Šæ–™å¹¶ä¸”上料次数大于10次
        if class_ == "shangliao" and count > 10:
            status = "正在上料"
            # æ¯éš”3秒取一帧图像
            # å¦‚果距离上一次保存已经过去1秒,则保存当前画面
            if current_time - stime >= 10.0:
                save_name = time.strftime("%Y%m%d%H%M%S", time.localtime()) + ".jpg"
                # ä¿å­˜è°ƒæ•´å°ºå¯¸åŽçš„图片
                path_ = save_path + "1/"
                if not os.path.exists(path_):
                    os.makedirs(path_)
                cv2.imwrite(path_ + save_name, frame)
                # æ–°å¢žæ¸…理调用
                clean_directory(path_, config['cam']['days_threshold'], config['cam']['max_files'])
                # é‡ç½®è®¡æ—¶å™¨
                stime = time.time()
                thread = threading.Thread(target=get_image)
                thread.start()
        else:
            status = "没有上料"
            if class_ == "meishangliao" and count == 3 and is_loaded:
                logger.info("停止上料后发送识别结果")
                send_result()
            if class_ == "meishangliao" and count == 1000:
                is_loaded = False
                logger.info("长时间未上料,重置正在上料状态")
            thread1 = threading.Thread(target=method_name)
            thread1.start()
            # frame = draw_img
        # print(status)
        # ä¸Šæ–™æœºä½ç½®è¯†åˆ«
        probabilities2 = hoister_position(frame);
        predicted_class2 = np.argmax(probabilities2, axis=1)[0]
        max_probability2 = np.max(probabilities2, axis=1)[0]
        class_2 = hoister_position.class_names[predicted_class2]
        print(f"-----------{class_2}:{predicted_class2}: {max_probability2}")
        logger.info(f"-----------{class_2}:{predicted_class2}: {max_probability2}")
        if predicted_class2 == 0:
            feeder_res = {class_2: max_probability2}
            class_feeder = "aifeeder," + f"{feeder_res}"
            print("send_msg", class_feeder)
            logger.info("发送上料机位置识别结果:"+str(class_feeder))
            l.send_msg(class_feeder)
        # è®¡ç®—帧速率
        frame_count += 1
        end_time = time.time()
        elapsed_time = end_time - start_time
        fps = frame_count / elapsed_time
        # print(f"FPS: {fps:.2f}")
        # å°†FPS绘制在图像上
        cv2.putText(draw_img, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2,
                    cv2.LINE_AA)
        # frame_count += 1
        # end_time = time.time()
        # elapsed_time = end_time - start_time
        # fps = frame_count / elapsed_time
        # # print(f"FPS: {fps:.2f}")
        # # å°†FPS绘制在图像上
        # cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2,
        #             cv2.LINE_AA)
        # æ˜¾ç¤ºç”»é¢
        # èŽ·å–å½“å‰çª—å£å¤§å°
        width = cv2.getWindowImageRect("AICamera")[2]
        height = cv2.getWindowImageRect("AICamera")[3]
        # è°ƒæ•´å›¾åƒå¤§å°ä»¥é€‚应窗口
        resized_frame = cv2.resize(draw_img, (width, height))
        cv2.imshow("AICamera", resized_frame)
        cv2.imshow("AICamera", frame)
        # æ£€æµ‹æŒ‰é”®ï¼Œå¦‚果按下q键则退出循环
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
@@ -316,6 +197,69 @@
    # å…³é—­æ‰€æœ‰çª—口
    cv2.destroyAllWindows()
def method_name(  ):
    global is_loaded,count,class_old,stime,frame
    # å®‰å…¨æ£€æµ‹
    boxes, scores, class_ids = safety_detect(frame)
    draw_img = safety_detect.draw_detections(frame, boxes, scores, class_ids)
    print(boxes, scores, class_ids)
    det_res = {}
    if class_ids is not None:
        # éåކclass_ids è½¬æ¢æˆç±»åˆ«åç§°
        for i in range(len(class_ids)):
            class_id = class_ids[i]
            class_name = safety_detect.class_names[class_id]
            # å­˜å…¥åˆ°det_res中
            if class_name in det_res:
                det_res[class_name] = det_res[class_name] if det_res[class_name] > scores[i] else scores[i]
            else:
                det_res[class_name] = scores[i]
    logger.info(f"安全检测识别结果, {det_res}")
    # å¦‚æžœcass_ids中包含0,则表示有安全检测到人体
    if 0 in class_ids:
        l.send_msg("aidetect," + f"{det_res}")
    # ä¸Šæ–™è¯†åˆ«
    probabilities = load_identifier(frame)
    # æ‰¾åˆ°æœ€å¤§æ¦‚率的类别
    predicted_class = np.argmax(probabilities, axis=1)[0]
    max_probability = np.max(probabilities, axis=1)[0]
    class_ = load_identifier.class_names[predicted_class]
    # è®¡ç®—类型重复的次数,类别更换之后重新计数
    if class_ != class_old:
        count = 0
    else:
        count += 1
    class_old = class_
    print(f"{class_}:{count}: {max_probability}")
    logger.info(f"{class_}:{count}: {max_probability}")
    # åˆ¤æ–­æ˜¯å¦ä¸Šæ–™å¹¶ä¸”上料次数大于10次
    if class_ == "shangliao" and count > 10:
        status = "正在上料"
        # æ¯éš”3秒取一帧图像
        # å¦‚果距离上一次保存已经过去1秒,则保存当前画面
        current_time = time.time()
        if current_time - stime >= 10.0:
            save_name = time.strftime("%Y%m%d%H%M%S", time.localtime()) + ".jpg"
            # ä¿å­˜è°ƒæ•´å°ºå¯¸åŽçš„图片
            path_ = save_path + "1/"
            if not os.path.exists(path_):
                os.makedirs(path_)
            cv2.imwrite(path_ + save_name, frame)
            # é‡ç½®è®¡æ—¶å™¨
            stime = time.time()
            thread = threading.Thread(target=get_image)
            thread.start()
    else:
        status = "没有上料"
        if class_ == "meishangliao" and count == 3 and is_loaded:
            logger.info("停止上料后发送识别结果")
            send_result()
        if class_ == "meishangliao" and count == 1000:
            is_loaded = False
            logger.info("长时间未上料,重置正在上料状态")
# è¯»å–配置文件
@@ -386,7 +330,7 @@
    def OnCopyData(self, hwnd, msg, wparam, lparam):
        try:
            # è®°å½•开始时间
            start_time = time.time()
            startTime = time.time()
            pCDS = ctypes.cast(lparam, PCOPYDATASTRUCT)
            s = ctypes.string_at(pCDS.contents.lpData).decode()
            strArr = s.split(",")
@@ -426,9 +370,9 @@
            logger.info(f"识别结果:{res}")
            self.send_msg(msg)
            # è®°å½•结束时间
            end_time = time.time()
            endTime = time.time()
            # è®¡ç®—执行时间
            execution_time = end_time - start_time
            execution_time = endTime - startTime
            # æ‰“印执行时间
            print(f"程序执行时间为:{execution_time}秒")
            logger.info(f"程序执行时间为:{execution_time,}秒")
@@ -468,6 +412,12 @@
    class_count_max = {}
    # ç´¯è®¡æ¯ç§è¯æçš„置信度总和
    class_sum = {}
    # ä¸Šæ¬¡è¯†åˆ«ç»“æžœ
    class_old = "1"
    # ç´¯è®¡æ¬¡æ•°
    count = 0
    stime = time.time()
    frame = None
    # cam1 = "USB Camera"
    # cam2 = "USB ZOOM Camera"
    # camUtil = CAM_UTIL(cam1, cam2)
@@ -477,9 +427,8 @@
    # æ˜¯å¦ä¸Šè¿‡æ–™
    is_loaded = False
    # åŠ è½½ONNX模型
    herb_identifier = IDENTIFIER("model/herb_identify.onnx")
    load_identifier = IDENTIFIER("model/loading.onnx")
    hoister_position = IDENTIFIER("model/hl.onnx")
    safety_detect = SAFETY_DETECT("model/safety_det.onnx")
    config = read_config()
    PCOPYDATASTRUCT = ctypes.POINTER(COPYDATASTRUCT)