import pandas as pd from functools import lru_cache class DataProcessingService: def clean_data(self, df): """ 清洗数据 :param df: 原始数据框 :return: 清洗后的数据框 """ if df is None or df.empty: return df try: # 复制数据框以避免修改原始数据 cleaned_df = df.copy() # 处理缺失值 cleaned_df = cleaned_df.fillna(0) # 确保数据类型正确(仅当列存在时) for col in ['count_under', 'count_in_range', 'count_over']: if col in cleaned_df.columns: cleaned_df[col] = cleaned_df[col].astype(int) # 确保time是datetime类型并处理时区 if 'time' in cleaned_df.columns: # 转换为datetime类型 cleaned_df['time'] = pd.to_datetime(cleaned_df['time']) return cleaned_df except Exception as e: print(f"数据清洗失败: {e}") return df def calculate_statistics(self, df): """ 计算基本统计信息 :param df: 数据框 :return: 统计信息字典 """ if df is None or df.empty: return {} try: stats = { 'total_records': len(df) } # 仅当列存在时计算统计信息 for col in ['count_under', 'count_in_range', 'count_over']: if col in df.columns: stats[col] = { 'mean': df[col].mean(), 'sum': df[col].sum(), 'max': df[col].max(), 'min': df[col].min() } return stats except Exception as e: print(f"计算统计信息失败: {e}") return {} def identify_local_maxima(self, df, column='count_in_range'): """ 识别指定列的局部极大值点 :param df: 数据框 :param column: 要分析的列名 :return: 包含局部极大值点的新数据框 """ if df is None or df.empty: return pd.DataFrame() try: # 复制数据框 maxima_df = df.copy() # 识别局部极大值 # 对于每个数据点,检查它是否大于等于前后相邻点的值 # 并确保是连续相等值区域中的最后一个点 mask = [] values = maxima_df[column].values for i in range(len(values)): # 边界情况处理 if i == 0: # 第一个点,只比较后一个点 is_max = values[i] >= values[i+1] elif i == len(values) - 1: # 最后一个点,只比较前一个点 is_max = values[i] >= values[i-1] else: # 中间点,比较前后点 is_max = values[i] >= values[i-1] and values[i] >= values[i+1] # 确保是连续相等值区域中的最后一个点 if is_max and i < len(values) - 1 and values[i] == values[i+1]: is_max = False mask.append(is_max) # 提取极大值点 maxima_df = maxima_df[mask] return maxima_df except Exception as e: print(f"识别局部极大值失败: {e}") return pd.DataFrame() def identify_phase_maxima(self, df, column='count_in_range'): """ 识别"最后一个阶段最大值" :param df: 数据框 :param column: 要分析的列名 :return: 包含阶段最大值点的新数据框 """ if df is None or df.empty: return pd.DataFrame() try: # 复制数据框 phase_maxima_df = df.copy() # 识别连续相同值的阶段 values = phase_maxima_df[column].values phases = [] current_phase = [] for i, val in enumerate(values): if not current_phase: current_phase.append(i) else: if val == values[current_phase[0]]: current_phase.append(i) else: phases.append(current_phase) current_phase = [i] # 添加最后一个阶段 if current_phase: phases.append(current_phase) # 提取每个阶段的最后一个点 phase_indices = [] for phase in phases: phase_indices.append(phase[-1]) # 从这些点中识别最大值 phase_points = phase_maxima_df.iloc[phase_indices] max_value = phase_points[column].max() final_maxima = phase_points[phase_points[column] == max_value] return final_maxima except Exception as e: print(f"识别阶段最大值失败: {e}") return pd.DataFrame() def calculate_pass_rate(self, row): """ 计算单个数据点的合格率 :param row: 数据行 :return: 合格率(百分比) """ try: total = row['count_under'] + row['count_in_range'] + row['count_over'] if total == 0: return 0.0 pass_rate = (row['count_in_range'] / total) * 100 return round(pass_rate, 2) except Exception as e: print(f"计算合格率失败: {e}") return 0.0 def calculate_overall_pass_rate(self, df): """ 计算整体合格率 :param df: 数据框 :return: 整体合格率(百分比) """ if df is None or df.empty: return 0.0 try: total_under = df['count_under'].sum() total_in_range = df['count_in_range'].sum() total_over = df['count_over'].sum() total = total_under + total_in_range + total_over if total == 0: return 0.0 overall_pass_rate = (total_in_range / total) * 100 return round(overall_pass_rate, 2) except Exception as e: print(f"计算整体合格率失败: {e}") return 0.0 def analyze_extreme_points(self, df): """ 分析极值点并计算相关统计 :param df: 数据框 :return: 包含极值点分析结果的字典 """ if df is None or df.empty: return { 'extreme_points': pd.DataFrame(), 'phase_maxima': pd.DataFrame(), 'overall_pass_rate': 0.0 } try: # 识别极值点 extreme_points = self.identify_local_maxima(df) # print("识别极值点:", extreme_points) # 识别阶段最大值 # phase_maxima = self.identify_phase_maxima(df) # print("识别阶段最大值:", phase_maxima) # 计算每个极值点的合格率 if not extreme_points.empty: extreme_points['pass_rate'] = extreme_points.apply(self.calculate_pass_rate, axis=1) # 计算整体合格率 overall_pass_rate = self.calculate_overall_pass_rate(extreme_points) return { 'extreme_points': extreme_points, 'phase_maxima': pd.DataFrame(), 'overall_pass_rate': overall_pass_rate } except Exception as e: print(f"分析极值点失败: {e}") return { 'extreme_points': pd.DataFrame(), 'phase_maxima': pd.DataFrame(), 'overall_pass_rate': 0.0 }