import sys
|
import time
|
import cv2
|
import numpy as np
|
import ctypes
|
import ctypes.wintypes
|
import yaml
|
import win32api
|
import win32con
|
import win32gui
|
import multiprocessing
|
from safety_detect import SAFETY_DETECT
|
from identifier import IDENTIFIER
|
|
import os
|
from logger_config import logger
|
import threading
|
|
multiprocessing.freeze_support()
|
|
|
def variance_of_laplacian(image):
|
# 计算输入图像的拉普拉斯响应的方差
|
return cv2.Laplacian(image, cv2.CV_64F).var()
|
|
|
def clean_directory(path, days_threshold, max_files):
|
"""清理超过时间或数量的文件"""
|
now = time.time()
|
threshold = now - days_threshold * 24 * 3600
|
files = []
|
for f in os.listdir(path):
|
file_path = os.path.join(path, f)
|
if os.path.isfile(file_path):
|
files.append((file_path, os.path.getmtime(file_path)))
|
# 按修改时间降序排序,保留最新文件
|
files.sort(key=lambda x: x[1], reverse=True)
|
# 删除过期文件
|
for file_info in files:
|
if file_info[1] < threshold:
|
os.remove(file_info[0])
|
# 保留最新文件,删除多余文件
|
for file_info in files[max_files:]:
|
os.remove(file_info[0])
|
|
|
# 调用另一个长焦镜头,拍摄清晰的局部药材图片
|
def get_image():
|
herb_identifier = IDENTIFIER("./model/herb_id")
|
logger.info("识别线程启动")
|
global is_loaded, class_count, class_count_max, class_sum
|
camera2_index = config['cam']['cam2']
|
print("第二个摄像头索引:" + str(camera2_index))
|
# 打开摄像头
|
capture = cv2.VideoCapture(camera2_index, cv2.CAP_DSHOW)
|
# 设置分辨率y
|
capture.set(cv2.CAP_PROP_FRAME_WIDTH, 2048) # 宽度
|
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 1540) # 高度
|
# 检查摄像头是否成功打开
|
if not capture.isOpened():
|
print("无法打开摄像头2")
|
logger.error("无法打开摄像头2")
|
exit()
|
width2 = capture.get(cv2.CAP_PROP_FRAME_WIDTH)
|
height2 = capture.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
print("摄像头2分辨率:", width2, "x", height2)
|
logger.info(f"摄像头2分辨率:, {width2}, x, {height2}")
|
# 循环读取摄像头画面
|
# Shadows name 'width' from outer scope
|
count = 0
|
while True:
|
ret2, frame2 = capture.read()
|
if not ret2:
|
print("无法读取摄像头画面")
|
logger.error("无法读取摄像头画面")
|
break
|
count += 1
|
|
if count == config['cam']['frames']:
|
herb_probabilities = herb_identifier(frame2)
|
top_five_classes = np.argsort(herb_probabilities, axis=1)[0][-5:][::-1]
|
name = ""
|
for i, class_id in enumerate(top_five_classes):
|
# 保留两位小数
|
probability = round(herb_probabilities[0][class_id], 2)
|
herb_class = herb_identifier.class_names[class_id]
|
name = name + herb_class + "=" + str(probability)
|
# 显示画面
|
# cv2.imshow('Output2', resized_frame2)
|
# 计算拉普拉斯响应的方差
|
laplacian = variance_of_laplacian(frame2)
|
# 生成保存文件名,以当前时间命名
|
save_name2 = time.strftime("%Y%m%d%H%M%S", time.localtime()) + "_" +name +"_["+ str(round(laplacian, 2)) +"]"+ ".jpg"
|
logger.info(f"识别结果转换为保存图片名称:, {save_name2}")
|
# 判断图像的清晰度
|
# 保存调整尺寸后的图片
|
if laplacian > 150:
|
c_ = save_path + "2/c/"
|
if not os.path.exists(c_):
|
os.makedirs(c_)
|
cv2.imwrite(c_ + save_name2, frame2)
|
# 新增清理调用
|
clean_directory(c_, config['cam']['days_threshold'], config['cam']['max_files'])
|
# 清晰条件下累计识别结果中药材名称出现的次数
|
# 累计每种药材不论名次出现的次数,累计每种药材置信度最高的次数,累计每种药材的置信度总和
|
# class_count = {}
|
# class_count_max = {}
|
# class_sum = {}
|
for i in range(len(top_five_classes)):
|
class_id = top_five_classes[i]
|
herb_class = herb_identifier.class_names[class_id]
|
# 累计每种药材不论名次出现的次数
|
if herb_class in class_count:
|
class_count[herb_class] += 1
|
else:
|
class_count[herb_class] = 1
|
# 累计每种药材置信度最高的次数
|
if i == 0 and herb_class in class_count_max:
|
class_count_max[herb_class] += 1
|
elif i == 0:
|
class_count_max[herb_class] = 1
|
# 累计每种药材的置信度总和
|
if herb_class in class_sum:
|
class_sum[herb_class] += herb_probabilities[0][class_id]
|
else:
|
class_sum[herb_class] = herb_probabilities[0][class_id]
|
is_loaded = True
|
else:
|
n_ = save_path + "2/n/"
|
if not os.path.exists(n_):
|
os.makedirs(n_)
|
cv2.imwrite(n_ + save_name2, frame2)
|
# 新增清理调用
|
clean_directory(n_, config['cam']['days_threshold'], config['cam']['max_files'])
|
# cv2.imshow("Camera", resized_frame2)
|
print("保存图片:", save_name2)
|
break
|
# 结束线程
|
capture.release()
|
def send_result():
|
global is_loaded,class_count, class_count_max, class_sum
|
# 对class_count进行排序,按照值从大到小排序,返回值最大的前五个
|
sorted_class_count = dict(sorted(class_count.items(), key=lambda x: x[1], reverse=True)[:5])
|
# 对class_count_max进行排序,按照值从大到小排序,返回值最大的前五个
|
sorted_class_count_max = dict(sorted(class_count_max.items(), key=lambda x: x[1], reverse=True)[:5])
|
# 对 class_sum进行排序,按照值从大到小排序,返回值最大的前五个
|
sorted_class_sum = dict(sorted(class_sum.items(), key=lambda x: x[1], reverse=True)[:5])
|
# 将三种统计结果输出到日志中
|
logger.info("class_count:"+str(class_count))
|
logger.info("sorted_class_count:"+str(sorted_class_count))
|
logger.info("class_count_max:"+str(class_count_max))
|
logger.info("sorted_class_count_max:"+str(sorted_class_count_max))
|
logger.info("class_sum:"+str(class_sum))
|
logger.info("sorted_class_sum:"+str(sorted_class_sum))
|
is_loaded = False
|
count_msg = "airecognize," + f"{sorted_class_count}"
|
logger.info("发送药材识别结果:"+str(count_msg))
|
l.send_msg(count_msg)
|
pass
|
|
|
def load_identify():
|
global is_loaded
|
# 摄像头索引号,通常为0表示第一个摄像头
|
camera_index = config['cam']['cam1']
|
print("第一个摄像头索引:" + str(camera_index))
|
# 打开摄像头
|
cap = cv2.VideoCapture(camera_index, cv2.CAP_DSHOW)
|
# 设置分辨率
|
# cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3840) # 宽度
|
# cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2160) # 高度
|
# 检查摄像头是否成功打开
|
if not cap.isOpened():
|
print("无法打开摄像头")
|
logger.error("无法打开摄像头")
|
exit()
|
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
|
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
print("摄像头分辨率:", width, "x", height)
|
logger.info(f"摄像头分辨率:, {width}, x, {height}")
|
# 目标图像尺寸
|
|
stime = time.time()
|
if not os.path.exists(save_path):
|
os.makedirs(save_path)
|
# 上次识别结果
|
class_old = "1"
|
# 累计次数
|
count = 0
|
# 上料状态
|
status = "没有上料"
|
|
# 创建窗口并设置为可调整大小
|
cv2.namedWindow("AICamera", cv2.WINDOW_NORMAL)
|
|
# 循环读取摄像头画面
|
while True:
|
start_time = time.time()
|
# 睡眠100毫秒
|
time.sleep(config['cam']['sleep'])
|
ret, frame = cap.read()
|
if not ret:
|
print("无法读取摄像头画面")
|
logger.error("无法读取摄像头画面")
|
break
|
# 获取当前时间
|
current_time = time.time()
|
# 每隔3秒取一帧图像
|
|
# 安全检测
|
boxes, scores, class_ids = safety_detect(frame)
|
draw_img = safety_detect.draw_detections(frame, class_ids, scores,boxes )
|
|
det_res = {}
|
if class_ids is not None:
|
# 遍历class_ids 转换成类别名称
|
for i in range(len(class_ids)):
|
class_id = class_ids[i]
|
class_name = safety_detect.class_names[class_id]
|
# 存入到det_res中
|
if class_name in det_res:
|
det_res[class_name] = det_res[class_name] if det_res[class_name] > scores[i] else scores[i]
|
else:
|
det_res[class_name] = scores[i]
|
# 如果det_res不为空,则打印det_res
|
if det_res != {}:
|
print(det_res)
|
logger.info(f"安全检测识别结果, {det_res}")
|
# 如果cass_ids中包含0,则表示有安全检测到人体
|
if 0 in class_ids:
|
res_ = "aidetect," + f"{det_res}"
|
logger.info("发送安全检测结果:"+str(res_))
|
l.send_msg(res_)
|
if 81 in class_ids:
|
logger.info("识别到药材,正在上料")
|
print("识别到药材,正在上料")
|
|
# 上料识别
|
probabilities = load_identifier(frame)
|
# 找到最大概率的类别
|
predicted_class = np.argmax(probabilities, axis=1)[0]
|
max_probability = np.max(probabilities, axis=1)[0]
|
class_ = load_identifier.class_names[predicted_class]
|
# 计算类型重复的次数,类别更换之后重新计数
|
if class_ != class_old:
|
count = 0
|
else:
|
count += 1
|
class_old = class_
|
if class_ == "shangliao":
|
print(f"{class_}:{count}: {max_probability}")
|
logger.info(f"{class_}:{count}: {max_probability}")
|
|
# 判断是否上料并且上料次数大于10次
|
if class_ == "shangliao" and count > 10:
|
status = "正在上料"
|
# 每隔3秒取一帧图像
|
# 如果距离上一次保存已经过去1秒,则保存当前画面
|
if current_time - stime >= 10.0:
|
save_name = time.strftime("%Y%m%d%H%M%S", time.localtime()) + ".jpg"
|
# 保存调整尺寸后的图片
|
path_ = save_path + "1/"
|
if not os.path.exists(path_):
|
os.makedirs(path_)
|
cv2.imwrite(path_ + save_name, frame)
|
# 新增清理调用
|
clean_directory(path_, config['cam']['days_threshold'], config['cam']['max_files'])
|
# 重置计时器
|
stime = time.time()
|
|
thread = threading.Thread(target=get_image)
|
thread.start()
|
|
else:
|
status = "没有上料"
|
if class_ == "meishangliao" and count == 3 and is_loaded:
|
logger.info("停止上料后发送识别结果")
|
send_result()
|
if class_ == "meishangliao" and count == 1000:
|
is_loaded = False
|
logger.info("长时间未上料,重置正在上料状态")
|
# print(status)
|
|
# 上料机位置识别
|
probabilities2 = hoister_position(frame)
|
predicted_class2 = np.argmax(probabilities2, axis=1)[0]
|
max_probability2 = np.max(probabilities2, axis=1)[0]
|
class_2 = hoister_position.class_names[predicted_class2]
|
if class_2 == "high":
|
print(f"-----------{class_2}:{predicted_class2}: {max_probability2}")
|
logger.info(f"-----------{class_2}:{predicted_class2}: {max_probability2}")
|
if predicted_class2 == 0:
|
feeder_res = {class_2: max_probability2}
|
class_feeder = "aifeeder," + f"{feeder_res}"
|
print("send_msg", class_feeder)
|
logger.info("发送上料机位置识别结果:"+str(class_feeder))
|
l.send_msg(class_feeder)
|
# 计算帧速率
|
end_time = time.time()
|
fps = (1 / (end_time - start_time))
|
# print(f"FPS: {fps:.2f}")
|
# 将FPS绘制在图像上
|
cv2.putText(draw_img, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2,
|
cv2.LINE_AA)
|
# 显示画面
|
# 获取当前窗口大小
|
width = cv2.getWindowImageRect("AICamera")[2]
|
height = cv2.getWindowImageRect("AICamera")[3]
|
# print("width", width, "height", height)
|
|
# 如果height小于1则赋值100
|
if height < 1:
|
height = 100
|
|
# 调整图像大小以适应窗口
|
resized_frame = cv2.resize(draw_img, (width, height))
|
|
cv2.imshow("AICamera", resized_frame)
|
# 检测按键,如果按下q键则退出循环
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
break
|
# 关闭摄像头
|
cap.release()
|
# 关闭所有窗口
|
cv2.destroyAllWindows()
|
|
|
|
# 读取配置文件
|
def read_config(file_path='./config/herb_ai.yaml'):
|
with open(file_path, 'r') as file:
|
config = yaml.safe_load(file)
|
return config
|
|
|
|
|
|
# 消息结构
|
class COPYDATASTRUCT(ctypes.Structure):
|
_fields_ = [
|
('dwData', ctypes.wintypes.LPARAM),
|
('cbData', ctypes.wintypes.DWORD),
|
('lpData', ctypes.c_char_p)
|
]
|
|
class Listener:
|
def __init__(self):
|
WindowName = config['win']['windowName']
|
ClassName = config['win']['className']
|
message_map = {
|
win32con.WM_COPYDATA: self.OnCopyData,
|
win32con.WM_CLOSE: self.HandleClose,
|
}
|
wc = win32gui.WNDCLASS()
|
wc.lpfnWndProc = message_map
|
wc.lpszClassName = ClassName
|
hinst = wc.hInstance = win32api.GetModuleHandle(None)
|
classAtom = win32gui.RegisterClass(wc)
|
self.hwnd = win32gui.CreateWindow(
|
classAtom,
|
WindowName,
|
win32con.WS_OVERLAPPEDWINDOW, # 窗口样式
|
0,
|
0,
|
win32con.CW_USEDEFAULT,
|
win32con.CW_USEDEFAULT,
|
0,
|
0,
|
hinst,
|
None
|
)
|
logger.info(f"启动成功.当前句柄{self.hwnd}")
|
# # 隐藏窗口
|
# win32gui.ShowWindow(self.hwnd, win32con.SW_HIDE)
|
print("启动成功.当前句柄", self.hwnd)
|
|
|
def OnCopyData(self, hwnd, msg, wparam, lparam):
|
try:
|
# 记录开始时间
|
start_time = time.time()
|
pCDS = ctypes.cast(lparam, PCOPYDATASTRUCT)
|
s = ctypes.string_at(pCDS.contents.lpData).decode()
|
strArr = s.split(",")
|
logger.info(f"收到来自句柄{hwnd}的消息:{s}")
|
print(f"收到来自句柄{hwnd}的消息:{s}")
|
res = {}
|
msg = ""
|
# 发送指令:
|
# AI药材识别:100
|
# AI上料区安全检测:101
|
# AI下料满料识别:102
|
# AI下料区安全检测:103
|
|
# 接收指令: 药材识别接收指令:airecognize, {json}
|
# AI上料区安全检测:aidetect, {json}
|
if strArr[0] == '100':
|
pass
|
# names, confs = cls_process(strArr[1])
|
# for idx, name in enumerate(names):
|
# if name in res:
|
# res[name] = res[name] if res[name] > confs[idx] else confs[idx]
|
# else:
|
# res[name] = confs[idx]
|
# msg = "airecognize," + f"{res}"
|
elif strArr[0] == '101':
|
pass
|
# names, confs = safe_process(strArr[1])
|
# for idx, name in enumerate(names):
|
# if name in res:
|
# res[name] = res[name] if res[name] > confs[idx] else confs[idx]
|
# else:
|
# res[name] = confs[idx]
|
# msg = "aidetect," + f"{res}"
|
|
|
|
logger.info(f"识别结果:{res}")
|
self.send_msg(msg)
|
# 记录结束时间
|
end_time = time.time()
|
# 计算执行时间
|
execution_time = end_time - start_time
|
# 打印执行时间
|
print(f"程序执行时间为:{execution_time}秒")
|
logger.info(f"程序执行时间为:{execution_time,}秒")
|
logger.info("-" * 20)
|
|
except Exception as e:
|
print(e)
|
pass
|
return 1
|
|
def send_msg(self, msg):
|
# 通过寻找窗口名获取句柄
|
hwnd = FindWindow(config['tag_win']['className'], config['tag_win']['windowName'])
|
print(f"返回结果,发现窗口:{hwnd}")
|
logger.info(f"返回结果,发现窗口:{hwnd}")
|
# 或者直接根据句柄来发送
|
# hwnd = 1707560
|
cds = COPYDATASTRUCT()
|
cds.dwData = 0
|
msg_bytes = msg.encode('utf-8')
|
cds.cbData = ctypes.sizeof(ctypes.create_string_buffer(msg_bytes))
|
cds.lpData = ctypes.c_char_p(msg_bytes)
|
SendMessage(hwnd, win32con.WM_COPYDATA, 0, ctypes.byref(cds))
|
|
def HandleClose(self, hwnd, msg, wparam, lparam):
|
logger.info("关闭窗口")
|
print("handle close")
|
# 关闭当前窗口
|
win32gui.DestroyWindow(hwnd)
|
sys.exit()
|
return 0
|
|
if __name__ == '__main__':
|
|
|
# 累计每种药材不论名次出现的次数
|
class_count = {}
|
# 累计每种药材置信度最高的次数
|
class_count_max = {}
|
# 累计每种药材的置信度总和
|
class_sum = {}
|
# cam1 = "USB Camera"
|
# cam2 = "USB ZOOM Camera"
|
# camUtil = CAM_UTIL(cam1, cam2)
|
# webcams = camUtil.webcam_list
|
# print("摄像头", webcams)
|
save_path = "data/images/"
|
# 是否上过料
|
is_loaded = False
|
# 加载ONNX模型
|
|
print("加载模型===============")
|
load_identifier = IDENTIFIER("./model/load_id")
|
hoister_position = IDENTIFIER("./model/feeder_id")
|
safety_detect = SAFETY_DETECT("./model/safe_det")
|
config = read_config()
|
PCOPYDATASTRUCT = ctypes.POINTER(COPYDATASTRUCT)
|
|
# 查询窗口方法
|
FindWindow = ctypes.windll.user32.FindWindowW
|
# 发送消息方法
|
SendMessage = ctypes.windll.user32.SendMessageW
|
l = Listener()
|
load_identify()
|
win32gui.PumpMessages()
|