baoshiwei
2026-04-01 81b0ad0124847f083990d574dc8d20961ec6e713
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import pandas as pd
from functools import lru_cache
 
class DataProcessingService:
    def clean_data(self, df):
        """
        清洗数据
        :param df: 原始数据框
        :return: 清洗后的数据框
        """
        if df is None or df.empty:
            return df
        
        try:
            # 复制数据框以避免修改原始数据
            cleaned_df = df.copy()
            
            # 处理缺失值
            cleaned_df = cleaned_df.fillna(0)
            
            # 确保数据类型正确(仅当列存在时)
            for col in ['count_under', 'count_in_range', 'count_over']:
                if col in cleaned_df.columns:
                    cleaned_df[col] = cleaned_df[col].astype(int)
            
            # 确保time是datetime类型并处理时区
            if 'time' in cleaned_df.columns:
                # 转换为datetime类型
                cleaned_df['time'] = pd.to_datetime(cleaned_df['time'])
            
            return cleaned_df
        except Exception as e:
            print(f"数据清洗失败: {e}")
            return df
    
    def calculate_statistics(self, df):
        """
        计算基本统计信息
        :param df: 数据框
        :return: 统计信息字典
        """
        if df is None or df.empty:
            return {}
        
        try:
            stats = {
                'total_records': len(df)
            }
            
            # 仅当列存在时计算统计信息
            for col in ['count_under', 'count_in_range', 'count_over']:
                if col in df.columns:
                    stats[col] = {
                        'mean': df[col].mean(),
                        'sum': df[col].sum(),
                        'max': df[col].max(),
                        'min': df[col].min()
                    }
            return stats
        except Exception as e:
            print(f"计算统计信息失败: {e}")
            return {}
    
    def identify_local_maxima(self, df, column='count_in_range'):
        """
        识别指定列的局部极大值点
        :param df: 数据框
        :param column: 要分析的列名
        :return: 包含局部极大值点的新数据框
        """
        if df is None or df.empty:
            return pd.DataFrame()
        
        try:
            # 复制数据框
            maxima_df = df.copy()
            
            # 识别局部极大值
            # 对于每个数据点,检查它是否大于等于前后相邻点的值
            # 并确保是连续相等值区域中的最后一个点
            mask = []
            values = maxima_df[column].values
            
            for i in range(len(values)):
                # 边界情况处理
                if i == 0:
                    # 第一个点,只比较后一个点
                    is_max = values[i] >= values[i+1]
                elif i == len(values) - 1:
                    # 最后一个点,只比较前一个点
                    is_max = values[i] >= values[i-1]
                else:
                    # 中间点,比较前后点
                    is_max = values[i] >= values[i-1] and values[i] >= values[i+1]
                
                # 确保是连续相等值区域中的最后一个点
                if is_max and i < len(values) - 1 and values[i] == values[i+1]:
                    is_max = False
                
                mask.append(is_max)
            
            # 提取极大值点
            maxima_df = maxima_df[mask]
            
            return maxima_df
        except Exception as e:
            print(f"识别局部极大值失败: {e}")
            return pd.DataFrame()
    
    def identify_phase_maxima(self, df, column='count_in_range'):
        """
        识别"最后一个阶段最大值"
        :param df: 数据框
        :param column: 要分析的列名
        :return: 包含阶段最大值点的新数据框
        """
        if df is None or df.empty:
            return pd.DataFrame()
        
        try:
            # 复制数据框
            phase_maxima_df = df.copy()
            
            # 识别连续相同值的阶段
            values = phase_maxima_df[column].values
            phases = []
            current_phase = []
            
            for i, val in enumerate(values):
                if not current_phase:
                    current_phase.append(i)
                else:
                    if val == values[current_phase[0]]:
                        current_phase.append(i)
                    else:
                        phases.append(current_phase)
                        current_phase = [i]
            
            # 添加最后一个阶段
            if current_phase:
                phases.append(current_phase)
            
            # 提取每个阶段的最后一个点
            phase_indices = []
            for phase in phases:
                phase_indices.append(phase[-1])
            
            # 从这些点中识别最大值
            phase_points = phase_maxima_df.iloc[phase_indices]
            max_value = phase_points[column].max()
            final_maxima = phase_points[phase_points[column] == max_value]
            
            return final_maxima
        except Exception as e:
            print(f"识别阶段最大值失败: {e}")
            return pd.DataFrame()
    
    def calculate_pass_rate(self, row):
        """
        计算单个数据点的合格率
        :param row: 数据行
        :return: 合格率(百分比)
        """
        try:
            total = row['count_under'] + row['count_in_range'] + row['count_over']
            if total == 0:
                return 0.0
            pass_rate = (row['count_in_range'] / total) * 100
            return round(pass_rate, 2)
        except Exception as e:
            print(f"计算合格率失败: {e}")
            return 0.0
    
    def calculate_overall_pass_rate(self, df):
        """
        计算整体合格率
        :param df: 数据框
        :return: 整体合格率(百分比)
        """
        if df is None or df.empty:
            return 0.0
        
        try:
            total_under = df['count_under'].sum()
            total_in_range = df['count_in_range'].sum()
            total_over = df['count_over'].sum()
            total = total_under + total_in_range + total_over
            
            if total == 0:
                return 0.0
            
            overall_pass_rate = (total_in_range / total) * 100
            return round(overall_pass_rate, 2)
        except Exception as e:
            print(f"计算整体合格率失败: {e}")
            return 0.0
    
    def analyze_extreme_points(self, df):
        """
        分析极值点并计算相关统计
        :param df: 数据框
        :return: 包含极值点分析结果的字典
        """
        if df is None or df.empty:
            return {
                'extreme_points': pd.DataFrame(),
                'phase_maxima': pd.DataFrame(),
                'overall_pass_rate': 0.0
            }
        
        try:
            # 识别极值点
            extreme_points = self.identify_local_maxima(df)
            # print("识别极值点:", extreme_points)
            # 识别阶段最大值
            # phase_maxima = self.identify_phase_maxima(df)
            # print("识别阶段最大值:", phase_maxima)
            
            # 计算每个极值点的合格率
            if not extreme_points.empty:
                extreme_points['pass_rate'] = extreme_points.apply(self.calculate_pass_rate, axis=1)
            
            # 计算整体合格率
            overall_pass_rate = self.calculate_overall_pass_rate(extreme_points)
            
            return {
                'extreme_points': extreme_points,
                'phase_maxima': pd.DataFrame(),
                'overall_pass_rate': overall_pass_rate
            }
        except Exception as e:
            print(f"分析极值点失败: {e}")
            return {
                'extreme_points': pd.DataFrame(),
                'phase_maxima': pd.DataFrame(),
                'overall_pass_rate': 0.0
            }