import pandas as pd from functools import lru_cache from app.database.database import DatabaseConnection class DataQueryService: def __init__(self): self.db = DatabaseConnection() def get_sorting_scale_data(self, start_date, end_date): """ 查询分拣磅秤数据 :param start_date: 开始日期 :param end_date: 结束日期 :return: 包含count_under, count_in_range, count_over的数据框 """ try: # 连接数据库 if not self.db.is_connected(): if not self.db.connect(): return None connection = self.db.get_connection() # SQL查询语句 query = """ SELECT time, count_under, count_in_range, count_over, weight, baseline_value, over_difference, under_difference FROM public.aics_sorting_scale_data WHERE time BETWEEN %s AND %s ORDER BY time ASC """ # 执行查询并转换为DataFrame df = pd.read_sql(query, connection, params=(start_date, end_date)) return df except Exception as e: print(f"查询数据失败: {e}") return None finally: # 注意:这里不关闭连接,以便后续查询复用 pass def close_connection(self): """关闭数据库连接""" self.db.disconnect()