import pandas as pd from functools import lru_cache from datetime import timedelta from app.database.database import DatabaseConnection class DataQueryService: def __init__(self): self.db = DatabaseConnection() self.timezone_offset = 8 # 默认东八区(北京时间) def get_sorting_scale_data(self, start_date, end_date): """ 查询分拣磅秤数据 :param start_date: 开始日期 (本地时间) :param end_date: 结束日期 (本地时间) :return: 包含count_under, count_in_range, count_over的数据框 (返回本地时间) """ try: # 将本地时间转换为UTC时间进行查询 start_date_utc = start_date - timedelta(hours=self.timezone_offset) end_date_utc = end_date - timedelta(hours=self.timezone_offset) # 连接数据库 if not self.db.is_connected(): if not self.db.connect(): return None # connection = self.db.get_connection() # SQL查询语句 query = """ SELECT time, count_under, count_in_range, count_over, weight, baseline_value, over_difference, under_difference FROM public.aics_sorting_scale_data WHERE time BETWEEN %s AND %s ORDER BY time ASC """ # 执行查询并转换为DataFrame df = pd.read_sql(query, self.db.get_connection(), params=(start_date_utc, end_date_utc)) # 将查询结果中的UTC时间转换回本地时间 if not df.empty and 'time' in df.columns: df['time'] = pd.to_datetime(df['time']) + timedelta(hours=self.timezone_offset) return df except Exception as e: print(f"查询数据失败: {e}") return None finally: # 注意:这里不关闭连接,以便后续查询复用 pass def close_connection(self): """关闭数据库连接""" self.db.disconnect()