import pandas as pd from functools import lru_cache from app.database.database import DatabaseConnection class ExtruderService: def __init__(self): self.db = DatabaseConnection() def get_extruder_data(self, start_date, end_date): """ 查询挤出机数据 :param start_date: 开始日期 :param end_date: 结束日期 :return: 包含挤出机数据的数据框 """ try: # 连接数据库 if not self.db.is_connected(): if not self.db.connect(): return None connection = self.db.get_connection() # SQL查询语句 query = """ SELECT time, compound_code, screw_speed_set, screw_speed_actual, extruder_current, head_pressure, machine_head_speed, compound_code_ref, compound_ratio, die_ratio, color_line, buffer_compound, spec_name, metered_weight FROM public.aics_extruder_data WHERE time BETWEEN %s AND %s ORDER BY time ASC """ # 执行查询并转换为DataFrame df = pd.read_sql(query, connection, params=(start_date, end_date)) return df except Exception as e: print(f"查询数据失败: {e}") return None finally: # 注意:这里不关闭连接,以便后续查询复用 pass def detect_batch_changes(self, df): """ 基于胶料号变更检测换批事件 :param df: 挤出机数据框 :return: 包含换批事件的数据框 """ if df is None or df.empty: return pd.DataFrame() try: # 复制数据框 batch_df = df.copy() # 检测compound_code变更 # 将 compound_code 列整体向下偏移一行,用于比较当前行与前一行是否相同 batch_df['compound_code_shift'] = batch_df['compound_code'].shift(1) # 若当前行 compound_code 与前一行不同,则标记为换批(1),否则为 0 ,第一行特殊处理为 0 batch_df['is_batch_change'] = (batch_df['compound_code'] != batch_df['compound_code_shift']).astype(int) batch_df['is_batch_change'].iloc[0] = 0 # 打印batch_df print(batch_df) # 提取所有换批事件的索引 change_indices = batch_df[batch_df['is_batch_change'] == 1].index.tolist() # 计算批次信息 batch_events = [] # 处理第一个批次(从数据开始到第一次换批) if not change_indices: # 没有换批事件,整个数据集是一个批次 if not batch_df.empty: start_time = batch_df['time'].iloc[0] end_time = batch_df['time'].iloc[-1] compound_code = batch_df['compound_code'].iloc[0] duration = (end_time - start_time).total_seconds() / 60 # 转换为分钟 production = batch_df['metered_weight'].sum() batch_events.append({ 'batch_id': start_time.strftime('%Y%m%d%H%M%S'), 'compound_code': compound_code, 'start_time': start_time, 'end_time': end_time, 'duration_minutes': round(duration, 2) }) else: # 处理第一个批次 first_batch_data = batch_df.iloc[:change_indices[0] + 1] if not first_batch_data.empty: start_time = first_batch_data['time'].iloc[0] end_time = first_batch_data['time'].iloc[-1] compound_code = first_batch_data['compound_code'].iloc[0] duration = (end_time - start_time).total_seconds() / 60 # 转换为分钟 production = first_batch_data['metered_weight'].sum() batch_events.append({ 'batch_id': start_time.strftime('%Y%m%d%H%M%S'), 'compound_code': compound_code, 'start_time': start_time, 'end_time': end_time, 'duration_minutes': round(duration, 2) }) # 处理中间批次 for i in range(len(change_indices) - 1): batch_data = batch_df.iloc[change_indices[i]:change_indices[i + 1] + 1] if not batch_data.empty: start_time = batch_data['time'].iloc[0] end_time = batch_data['time'].iloc[-1] compound_code = batch_data['compound_code'].iloc[0] duration = (end_time - start_time).total_seconds() / 60 # 转换为分钟 production = batch_data['metered_weight'].sum() batch_events.append({ 'batch_id': start_time.strftime('%Y%m%d%H%M%S'), 'compound_code': compound_code, 'start_time': start_time, 'end_time': end_time, 'duration_minutes': round(duration, 2), 'production_kg': round(production, 2) }) # 处理最后一个批次 last_batch_data = batch_df.iloc[change_indices[-1]:] if not last_batch_data.empty: start_time = last_batch_data['time'].iloc[0] end_time = last_batch_data['time'].iloc[-1] compound_code = last_batch_data['compound_code'].iloc[0] duration = (end_time - start_time).total_seconds() / 60 # 转换为分钟 production = last_batch_data['metered_weight'].sum() batch_events.append({ 'batch_id': start_time.strftime('%Y%m%d%H%M%S'), 'compound_code': compound_code, 'start_time': start_time, 'end_time': end_time, 'duration_minutes': round(duration, 2), 'production_kg': round(production, 2) }) return pd.DataFrame(batch_events) except Exception as e: print(f"检测换批事件失败: {e}") return pd.DataFrame() def analyze_parameter_trends(self, df): """ 分析参数变化趋势 :param df: 挤出机数据框 :return: 包含趋势分析结果的字典 """ if df is None or df.empty: return {} try: trends = { 'screw_speed': { 'set': df['screw_speed_set'].describe(), 'actual': df['screw_speed_actual'].describe() }, 'extruder_current': df['extruder_current'].describe(), 'head_pressure': df['head_pressure'].describe(), 'metered_weight': df['metered_weight'].describe() } return trends except Exception as e: print(f"分析参数趋势失败: {e}") return {} def close_connection(self): """关闭数据库连接""" self.db.disconnect()