import pandas as pd
|
from functools import lru_cache
|
|
class DataProcessingService:
|
def clean_data(self, df):
|
"""
|
清洗数据
|
:param df: 原始数据框
|
:return: 清洗后的数据框
|
"""
|
if df is None or df.empty:
|
return df
|
|
try:
|
# 复制数据框以避免修改原始数据
|
cleaned_df = df.copy()
|
|
# 处理缺失值
|
cleaned_df = cleaned_df.fillna(0)
|
|
# 确保数据类型正确
|
cleaned_df['count_under'] = cleaned_df['count_under'].astype(int)
|
cleaned_df['count_in_range'] = cleaned_df['count_in_range'].astype(int)
|
cleaned_df['count_over'] = cleaned_df['count_over'].astype(int)
|
|
# 确保time是datetime类型并处理时区
|
if 'time' in cleaned_df.columns:
|
# 转换为datetime类型
|
cleaned_df['time'] = pd.to_datetime(cleaned_df['time'])
|
|
# 处理时区
|
# 检查是否已经有时区信息
|
if cleaned_df['time'].dt.tz is None:
|
# 如果没有时区信息,假设是UTC时间并添加时区
|
cleaned_df['time'] = cleaned_df['time'].dt.tz_localize('UTC')
|
|
# 转换为上海时区(UTC+8)
|
cleaned_df['time'] = cleaned_df['time'].dt.tz_convert('Asia/Shanghai')
|
|
return cleaned_df
|
except Exception as e:
|
print(f"数据清洗失败: {e}")
|
return df
|
|
def calculate_statistics(self, df):
|
"""
|
计算基本统计信息
|
:param df: 数据框
|
:return: 统计信息字典
|
"""
|
if df is None or df.empty:
|
return {}
|
|
try:
|
stats = {
|
'total_records': len(df),
|
'count_under': {
|
'mean': df['count_under'].mean(),
|
'sum': df['count_under'].sum(),
|
'max': df['count_under'].max(),
|
'min': df['count_under'].min()
|
},
|
'count_in_range': {
|
'mean': df['count_in_range'].mean(),
|
'sum': df['count_in_range'].sum(),
|
'max': df['count_in_range'].max(),
|
'min': df['count_in_range'].min()
|
},
|
'count_over': {
|
'mean': df['count_over'].mean(),
|
'sum': df['count_over'].sum(),
|
'max': df['count_over'].max(),
|
'min': df['count_over'].min()
|
}
|
}
|
return stats
|
except Exception as e:
|
print(f"计算统计信息失败: {e}")
|
return {}
|
|
def identify_local_maxima(self, df, column='count_in_range'):
|
"""
|
识别指定列的局部极大值点
|
:param df: 数据框
|
:param column: 要分析的列名
|
:return: 包含局部极大值点的新数据框
|
"""
|
if df is None or df.empty:
|
return pd.DataFrame()
|
|
try:
|
# 复制数据框
|
maxima_df = df.copy()
|
|
# 识别局部极大值
|
# 对于每个数据点,检查它是否大于等于前后相邻点的值
|
# 并确保是连续相等值区域中的最后一个点
|
mask = []
|
values = maxima_df[column].values
|
|
for i in range(len(values)):
|
# 边界情况处理
|
if i == 0:
|
# 第一个点,只比较后一个点
|
is_max = values[i] >= values[i+1]
|
elif i == len(values) - 1:
|
# 最后一个点,只比较前一个点
|
is_max = values[i] >= values[i-1]
|
else:
|
# 中间点,比较前后点
|
is_max = values[i] >= values[i-1] and values[i] >= values[i+1]
|
|
# 确保是连续相等值区域中的最后一个点
|
if is_max and i < len(values) - 1 and values[i] == values[i+1]:
|
is_max = False
|
|
mask.append(is_max)
|
|
# 提取极大值点
|
maxima_df = maxima_df[mask]
|
|
return maxima_df
|
except Exception as e:
|
print(f"识别局部极大值失败: {e}")
|
return pd.DataFrame()
|
|
def identify_phase_maxima(self, df, column='count_in_range'):
|
"""
|
识别"最后一个阶段最大值"
|
:param df: 数据框
|
:param column: 要分析的列名
|
:return: 包含阶段最大值点的新数据框
|
"""
|
if df is None or df.empty:
|
return pd.DataFrame()
|
|
try:
|
# 复制数据框
|
phase_maxima_df = df.copy()
|
|
# 识别连续相同值的阶段
|
values = phase_maxima_df[column].values
|
phases = []
|
current_phase = []
|
|
for i, val in enumerate(values):
|
if not current_phase:
|
current_phase.append(i)
|
else:
|
if val == values[current_phase[0]]:
|
current_phase.append(i)
|
else:
|
phases.append(current_phase)
|
current_phase = [i]
|
|
# 添加最后一个阶段
|
if current_phase:
|
phases.append(current_phase)
|
|
# 提取每个阶段的最后一个点
|
phase_indices = []
|
for phase in phases:
|
phase_indices.append(phase[-1])
|
|
# 从这些点中识别最大值
|
phase_points = phase_maxima_df.iloc[phase_indices]
|
max_value = phase_points[column].max()
|
final_maxima = phase_points[phase_points[column] == max_value]
|
|
return final_maxima
|
except Exception as e:
|
print(f"识别阶段最大值失败: {e}")
|
return pd.DataFrame()
|
|
def calculate_pass_rate(self, row):
|
"""
|
计算单个数据点的合格率
|
:param row: 数据行
|
:return: 合格率(百分比)
|
"""
|
try:
|
total = row['count_under'] + row['count_in_range'] + row['count_over']
|
if total == 0:
|
return 0.0
|
pass_rate = (row['count_in_range'] / total) * 100
|
return round(pass_rate, 2)
|
except Exception as e:
|
print(f"计算合格率失败: {e}")
|
return 0.0
|
|
def calculate_overall_pass_rate(self, df):
|
"""
|
计算整体合格率
|
:param df: 数据框
|
:return: 整体合格率(百分比)
|
"""
|
if df is None or df.empty:
|
return 0.0
|
|
try:
|
total_under = df['count_under'].sum()
|
total_in_range = df['count_in_range'].sum()
|
total_over = df['count_over'].sum()
|
total = total_under + total_in_range + total_over
|
|
if total == 0:
|
return 0.0
|
|
overall_pass_rate = (total_in_range / total) * 100
|
return round(overall_pass_rate, 2)
|
except Exception as e:
|
print(f"计算整体合格率失败: {e}")
|
return 0.0
|
|
def analyze_extreme_points(self, df):
|
"""
|
分析极值点并计算相关统计
|
:param df: 数据框
|
:return: 包含极值点分析结果的字典
|
"""
|
if df is None or df.empty:
|
return {
|
'extreme_points': pd.DataFrame(),
|
'phase_maxima': pd.DataFrame(),
|
'overall_pass_rate': 0.0
|
}
|
|
try:
|
# 识别极值点
|
extreme_points = self.identify_local_maxima(df)
|
|
# 识别阶段最大值
|
phase_maxima = self.identify_phase_maxima(df)
|
|
# 计算每个极值点的合格率
|
if not extreme_points.empty:
|
extreme_points['pass_rate'] = extreme_points.apply(self.calculate_pass_rate, axis=1)
|
|
# 计算整体合格率
|
overall_pass_rate = self.calculate_overall_pass_rate(df)
|
|
return {
|
'extreme_points': extreme_points,
|
'phase_maxima': phase_maxima,
|
'overall_pass_rate': overall_pass_rate
|
}
|
except Exception as e:
|
print(f"分析极值点失败: {e}")
|
return {
|
'extreme_points': pd.DataFrame(),
|
'phase_maxima': pd.DataFrame(),
|
'overall_pass_rate': 0.0
|
}
|