import streamlit as st
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np
import joblib
import os
from datetime import datetime, timedelta
from app.services.extruder_service import ExtruderService
from app.services.main_process_service import MainProcessService
# 尝试导入torch,如果失败则禁用深度学习模型支持
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
# 稳态识别类
class SteadyStateDetector:
def __init__(self):
pass
def detect_steady_state(self, df, weight_col='米重', window_size=20, std_threshold=0.5, duration_threshold=60):
"""
稳态识别逻辑:标记米重数据中的稳态段
:param df: 包含米重数据的数据框
:param weight_col: 米重列名
:param window_size: 滑动窗口大小(秒)
:param std_threshold: 标准差阈值
:param duration_threshold: 稳态持续时间阈值(秒)
:return: 包含稳态标记的数据框和稳态信息
"""
if df is None or df.empty:
return df, []
# 确保时间列是datetime类型
df['time'] = pd.to_datetime(df['time'])
# 计算滚动统计量
df['rolling_std'] = df[weight_col].rolling(window=window_size, min_periods=5).std()
df['rolling_mean'] = df[weight_col].rolling(window=window_size, min_periods=5).mean()
# 计算波动范围
df['fluctuation_range'] = (df['rolling_std'] / df['rolling_mean']) * 100
df['fluctuation_range'] = df['fluctuation_range'].fillna(0)
# 标记稳态点
df['is_steady'] = 0
steady_condition = (
(df['fluctuation_range'] < std_threshold) &
(df[weight_col] >= 0.1)
)
df.loc[steady_condition, 'is_steady'] = 1
# 识别连续稳态段
steady_segments = []
current_segment = {}
for i, row in df.iterrows():
if row['is_steady'] == 1:
if not current_segment:
current_segment = {
'start_time': row['time'],
'start_idx': i,
'weights': [row[weight_col]]
}
else:
current_segment['weights'].append(row[weight_col])
else:
if current_segment:
current_segment['end_time'] = df.loc[i-1, 'time'] if i > 0 else df.loc[i, 'time']
current_segment['end_idx'] = i-1
duration = (current_segment['end_time'] - current_segment['start_time']).total_seconds()
if duration >= duration_threshold:
weights_array = np.array(current_segment['weights'])
current_segment['duration'] = duration
current_segment['mean_weight'] = np.mean(weights_array)
current_segment['std_weight'] = np.std(weights_array)
current_segment['min_weight'] = np.min(weights_array)
current_segment['max_weight'] = np.max(weights_array)
current_segment['fluctuation_range'] = (current_segment['std_weight'] / current_segment['mean_weight']) * 100
# 计算置信度
confidence = 100 - (current_segment['fluctuation_range'] / std_threshold) * 50
confidence = max(50, min(100, confidence))
current_segment['confidence'] = confidence
steady_segments.append(current_segment)
current_segment = {}
# 处理最后一个稳态段
if current_segment:
current_segment['end_time'] = df['time'].iloc[-1]
current_segment['end_idx'] = len(df) - 1
duration = (current_segment['end_time'] - current_segment['start_time']).total_seconds()
if duration >= duration_threshold:
weights_array = np.array(current_segment['weights'])
current_segment['duration'] = duration
current_segment['mean_weight'] = np.mean(weights_array)
current_segment['std_weight'] = np.std(weights_array)
current_segment['min_weight'] = np.min(weights_array)
current_segment['max_weight'] = np.max(weights_array)
current_segment['fluctuation_range'] = (current_segment['std_weight'] / current_segment['mean_weight']) * 100
confidence = 100 - (current_segment['fluctuation_range'] / std_threshold) * 50
confidence = max(50, min(100, confidence))
current_segment['confidence'] = confidence
steady_segments.append(current_segment)
# 在数据框中标记完整的稳态段
for segment in steady_segments:
df.loc[segment['start_idx']:segment['end_idx'], 'is_steady'] = 1
return df, steady_segments
def show_metered_weight_forecast():
# 初始化服务
extruder_service = ExtruderService()
main_process_service = MainProcessService()
# 页面标题
st.title("米重预测分析")
# 初始化会话状态
if 'forecast_start_date' not in st.session_state:
st.session_state['forecast_start_date'] = datetime.now().date() - timedelta(days=7)
if 'forecast_end_date' not in st.session_state:
st.session_state['forecast_end_date'] = datetime.now().date()
if 'forecast_quick_select' not in st.session_state:
st.session_state['forecast_quick_select'] = "最近7天"
if 'selected_model' not in st.session_state:
st.session_state['selected_model'] = None
if 'selected_model_file' not in st.session_state:
st.session_state['selected_model_file'] = None
if 'forecast_use_steady_only' not in st.session_state:
st.session_state['forecast_use_steady_only'] = True
if 'forecast_steady_window' not in st.session_state:
st.session_state['forecast_steady_window'] = 20
if 'forecast_steady_threshold' not in st.session_state:
st.session_state['forecast_steady_threshold'] = 1.5
# 定义回调函数
def update_dates(qs):
st.session_state['forecast_quick_select'] = qs
today = datetime.now().date()
if qs == "今天":
st.session_state['forecast_start_date'] = today
st.session_state['forecast_end_date'] = today
elif qs == "最近3天":
st.session_state['forecast_start_date'] = today - timedelta(days=3)
st.session_state['forecast_end_date'] = today
elif qs == "最近7天":
st.session_state['forecast_start_date'] = today - timedelta(days=7)
st.session_state['forecast_end_date'] = today
elif qs == "最近30天":
st.session_state['forecast_start_date'] = today - timedelta(days=30)
st.session_state['forecast_end_date'] = today
def on_date_change():
st.session_state['forecast_quick_select'] = "自定义"
# 查询条件区域
with st.expander("🔍 数据选择", expanded=True):
# 添加自定义 CSS 实现响应式换行
st.markdown("""
""", unsafe_allow_html=True)
# 创建布局
cols = st.columns([1, 1, 1, 1, 1, 1.5, 1.5, 1])
options = ["今天", "最近3天", "最近7天", "最近30天", "自定义"]
for i, option in enumerate(options):
with cols[i]:
# 根据当前选择状态决定按钮类型
button_type = "primary" if st.session_state['forecast_quick_select'] == option else "secondary"
if st.button(option, key=f"btn_forecast_{option}", width='stretch', type=button_type):
update_dates(option)
st.rerun()
with cols[5]:
start_date = st.date_input(
"开始日期",
label_visibility="collapsed",
key="forecast_start_date",
on_change=on_date_change
)
with cols[6]:
end_date = st.date_input(
"结束日期",
label_visibility="collapsed",
key="forecast_end_date",
on_change=on_date_change
)
with cols[7]:
query_button = st.button("🚀 查询数据", key="forecast_query", width='stretch')
# 转换为datetime对象
start_dt = datetime.combine(start_date, datetime.min.time())
end_dt = datetime.combine(end_date, datetime.max.time())
# 模型选择区域
with st.expander("📁 模型选择", expanded=True):
# 创建模型目录(如果不存在)
model_dir = "saved_models"
os.makedirs(model_dir, exist_ok=True)
# 获取所有已保存的模型文件
model_files = [f for f in os.listdir(model_dir) if f.endswith('.joblib')]
model_files.sort(reverse=True) # 最新的模型排在前面
if not model_files:
st.warning("尚未保存任何模型,请先训练模型并保存。")
else:
# 模型选择下拉框
selected_model_file = st.selectbox(
"选择已保存的模型",
options=model_files,
help="选择要用于预测的模型文件",
key="forecast_selected_model"
)
# 加载并显示模型信息
if selected_model_file:
model_path = os.path.join(model_dir, selected_model_file)
model_info = joblib.load(model_path)
# 显示模型基本信息
st.subheader("📊 模型信息")
info_cols = st.columns(2)
with info_cols[0]:
st.metric("模型类型", model_info['model_type'])
st.metric("创建时间", model_info['created_at'].strftime('%Y-%m-%d %H:%M:%S'))
st.metric("使用稳态数据", "是" if model_info.get('use_steady_data', False) else "否")
with info_cols[1]:
st.metric("R² 得分", f"{model_info['r2_score']:.4f}")
st.metric("均方误差 (MSE)", f"{model_info['mse']:.6f}")
st.metric("均方根误差 (RMSE)", f"{model_info['rmse']:.6f}")
# 显示模型特征
st.write("🔑 模型使用的特征:")
st.code(", ".join(model_info['features']))
# 如果是深度学习模型,显示序列长度
if 'sequence_length' in model_info:
st.metric("序列长度", model_info['sequence_length'])
# 保存模型信息到会话状态
st.session_state['selected_model'] = model_info
st.session_state['selected_model_file'] = selected_model_file
# 稳态识别配置
st.markdown("---")
st.write("⚖️ **稳态识别配置**")
steady_cols = st.columns(3)
with steady_cols[0]:
st.checkbox(
"仅预测稳态数据",
value=st.session_state['forecast_use_steady_only'],
key="forecast_use_steady_only",
help="启用后,只对处于稳态时段的数据进行米重预测"
)
with steady_cols[1]:
st.slider(
"滑动窗口大小 (秒)",
min_value=5,
max_value=60,
value=st.session_state['forecast_steady_window'],
step=5,
key="forecast_steady_window",
help="用于稳态识别的滑动窗口大小"
)
with steady_cols[2]:
st.slider(
"波动阈值 (%)",
min_value=0.1,
max_value=2.0,
value=st.session_state['forecast_steady_threshold'],
step=0.1,
key="forecast_steady_threshold",
help="稳态识别的波动范围阈值"
)
# 预测功能区域
st.subheader("🔮 米重预测")
if query_button and st.session_state['selected_model']:
with st.spinner("正在获取数据并进行预测..."):
# 1. 获取完整的挤出机数据
df_extruder_full = extruder_service.get_extruder_data(start_dt, end_dt)
# 2. 获取主流程控制数据
df_main_speed = main_process_service.get_cutting_setting_data(start_dt, end_dt)
df_temp = main_process_service.get_temperature_control_data(start_dt, end_dt)
# 检查是否有数据
has_data = any([
df_extruder_full is not None and not df_extruder_full.empty,
df_main_speed is not None and not df_main_speed.empty,
df_temp is not None and not df_temp.empty
])
if not has_data:
st.warning("所选时间段内未找到任何数据,请尝试调整查询条件。")
else:
# 数据整合与预处理
def integrate_data(df_extruder_full, df_main_speed, df_temp):
# 确保挤出机数据存在
if df_extruder_full is None or df_extruder_full.empty:
return None
# 创建只包含米重和时间的主数据集
df_merged = df_extruder_full[['time', 'metered_weight', 'screw_speed_actual', 'head_pressure']].copy()
# 整合主流程数据
if df_main_speed is not None and not df_main_speed.empty:
df_main_speed = df_main_speed[['time', 'process_main_speed']]
df_merged = pd.merge_asof(
df_merged.sort_values('time'),
df_main_speed.sort_values('time'),
on='time',
direction='nearest',
tolerance=pd.Timedelta('1min')
)
# 整合温度数据
if df_temp is not None and not df_temp.empty:
temp_cols = ['time', 'nakata_extruder_screw_display_temp',
'nakata_extruder_rear_barrel_display_temp',
'nakata_extruder_front_barrel_display_temp',
'nakata_extruder_head_display_temp']
df_temp_subset = df_temp[temp_cols].copy()
df_merged = pd.merge_asof(
df_merged.sort_values('time'),
df_temp_subset.sort_values('time'),
on='time',
direction='nearest',
tolerance=pd.Timedelta('1min')
)
# 重命名列以提高可读性
df_merged.rename(columns={
'screw_speed_actual': '螺杆转速',
'head_pressure': '机头压力',
'process_main_speed': '流程主速',
'nakata_extruder_screw_display_temp': '螺杆温度',
'nakata_extruder_rear_barrel_display_temp': '后机筒温度',
'nakata_extruder_front_barrel_display_temp': '前机筒温度',
'nakata_extruder_head_display_temp': '机头温度'
}, inplace=True)
# 清理数据
df_merged.dropna(subset=['metered_weight'], inplace=True)
return df_merged
# 执行数据整合
df_analysis = integrate_data(df_extruder_full, df_main_speed, df_temp)
if df_analysis is None or df_analysis.empty:
st.warning("数据整合失败,请检查数据质量或调整时间范围。")
else:
# 重命名米重列
df_analysis.rename(columns={'metered_weight': '米重'}, inplace=True)
# 稳态识别
steady_detector = SteadyStateDetector()
# 获取稳态识别参数
use_steady_only = st.session_state.get('forecast_use_steady_only', True)
steady_window = st.session_state.get('forecast_steady_window', 20)
steady_threshold = st.session_state.get('forecast_steady_threshold', 0.5)
# 执行稳态识别
df_analysis_with_steady, steady_segments = steady_detector.detect_steady_state(
df_analysis,
weight_col='米重',
window_size=steady_window,
std_threshold=steady_threshold
)
# 更新df_analysis为包含稳态标记的数据
df_analysis = df_analysis_with_steady
# 显示稳态统计信息
total_data = len(df_analysis)
steady_data = len(df_analysis[df_analysis['is_steady'] == 1])
steady_ratio = (steady_data / total_data * 100) if total_data > 0 else 0
st.subheader("📊 稳态数据统计")
stats_cols = st.columns(4)
stats_cols[0].metric("总数据量", total_data)
stats_cols[1].metric("稳态数据量", steady_data)
stats_cols[2].metric("稳态数据比例", f"{steady_ratio:.1f}%")
stats_cols[3].metric("稳态段数量", len(steady_segments))
# 获取模型信息
model_info = st.session_state['selected_model']
required_features = model_info['features']
# 检查所有必需的特征是否在数据中
missing_features = [f for f in required_features if f not in df_analysis.columns]
if missing_features:
st.warning(f"数据中缺少以下特征: {', '.join(missing_features)}")
else:
# 准备所有数据用于显示
df_all = df_analysis.dropna(subset=required_features + ['米重']).copy()
if len(df_all) == 0:
st.warning("没有足够的有效数据进行预测,请调整时间范围或检查数据质量。")
else:
# 根据配置决定是否只使用稳态数据进行预测
if use_steady_only:
df_pred_steady = df_all[df_all['is_steady'] == 1].copy()
if len(df_pred_steady) > 0:
df_pred = df_pred_steady
st.info(f"已启用稳态过滤,使用 {len(df_pred)} 条稳态数据进行预测")
else:
df_pred = df_all.copy()
st.warning("未找到稳态数据,将使用所有数据进行预测")
else:
df_pred = df_all.copy()
# 执行预测 - 只对选定的数据(稳态或全部)进行预测
X_pred = df_pred[required_features]
predicted_weights = []
# 获取模型
model = model_info['model']
# 检查模型类型并执行预测
if model_info['model_type'] in ['LSTM', 'GRU', 'BiLSTM']:
# 深度学习模型预测
if not TORCH_AVAILABLE:
st.error("PyTorch 未安装,无法使用深度学习模型进行预测。")
st.stop()
# 数据标准化
scaler_X = model_info['scaler_X']
scaler_y = model_info['scaler_y']
X_scaled = scaler_X.transform(X_pred)
# 获取序列长度
sequence_length = model_info['sequence_length']
# 为深度学习模型创建序列
def create_sequences(data, seq_length):
sequences = []
for i in range(len(data) - seq_length + 1):
seq = data[i:i+seq_length]
sequences.append(seq)
return np.array(sequences)
X_sequences = create_sequences(X_scaled, sequence_length)
# 转换为PyTorch张量
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
X_tensor = torch.tensor(X_sequences, dtype=torch.float32).to(device)
# 预测
model.eval()
with torch.no_grad():
y_pred_scaled_tensor = model(X_tensor)
y_pred_scaled = y_pred_scaled_tensor.cpu().numpy().ravel()
# 反归一化
predicted = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).ravel()
# 由于序列预测,我们需要填充前面的缺失值
predicted_weights = [np.nan] * (sequence_length - 1) + list(predicted)
elif model_info['model_type'] in ['SVR', 'MLP']:
# 支持向量机或多层感知器预测
# 数据标准化
scaler_X = model_info['scaler_X']
scaler_y = model_info['scaler_y']
X_scaled = scaler_X.transform(X_pred)
# 预测
y_pred_scaled = model.predict(X_scaled)
# 反归一化
predicted_weights = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).ravel()
else:
# 其他模型(如随机森林、梯度提升、线性回归等)
predicted_weights = model.predict(X_pred)
# 将预测结果添加到数据框中
df_pred['预测米重'] = predicted_weights
# 确保时间列是datetime类型
df_pred['time'] = pd.to_datetime(df_pred['time'])
# 数据对比功能
st.subheader("📊 预测结果对比分析")
# 计算预测误差
df_pred['误差'] = df_pred['预测米重'] - df_pred['米重']
df_pred['绝对误差'] = abs(df_pred['误差'])
df_pred['相对误差'] = (df_pred['绝对误差'] / df_pred['米重']) * 100
# 显示误差统计信息
error_stats = df_pred.dropna(subset=['预测米重']).describe()
stats_cols = st.columns(3)
with stats_cols[0]:
st.metric("平均实际米重", f"{error_stats['米重']['mean']:.4f} Kg/m")
st.metric("平均预测米重", f"{error_stats['预测米重']['mean']:.4f} Kg/m")
with stats_cols[1]:
st.metric("平均绝对误差", f"{error_stats['绝对误差']['mean']:.4f} Kg/m")
st.metric("最大绝对误差", f"{error_stats['绝对误差']['max']:.4f} Kg/m")
with stats_cols[2]:
st.metric("平均相对误差", f"{error_stats['相对误差']['mean']:.2f}%")
st.metric("最大相对误差", f"{error_stats['相对误差']['max']:.2f}%")
# 可视化展示
st.subheader("📈 米重趋势对比")
# 创建趋势图 - 使用所有数据df_all进行显示
fig = go.Figure()
# 确保时间列是datetime类型
df_all['time'] = pd.to_datetime(df_all['time'])
# # 添加实时米重数据点(稳态数据用蓝色,非稳态数据用灰色)
# if 'is_steady' in df_all.columns:
# # 稳态数据 - 使用点显示
# steady_data = df_all[df_all['is_steady'] == 1]
# non_steady_data = df_all[df_all['is_steady'] == 0]
# if len(steady_data) > 0:
# fig.add_trace(go.Scatter(
# x=steady_data['time'],
# y=steady_data['米重'],
# name='实时米重(稳态)',
# mode='markers',
# marker=dict(color='blue', size=3),
# hovertemplate='时间: %{x}
实时米重(稳态): %{y:.4f} Kg/m'
# ))
# # 非稳态数据也显示,但不进行预测
# if len(non_steady_data) > 0:
# fig.add_trace(go.Scatter(
# x=non_steady_data['time'],
# y=non_steady_data['米重'],
# name='实时米重(非稳态)',
# mode='markers',
# marker=dict(color='lightgray', size=3),
# hovertemplate='时间: %{x}
实时米重(非稳态): %{y:.4f} Kg/m'
# ))
# else:
# 如果没有稳态标记,显示所有数据点
fig.add_trace(go.Scatter(
x=df_all['time'],
y=df_all['米重'],
name='实时米重',
mode='lines',
line=dict(color='blue', width=1.5),
# hovertemplate='时间: %{x}
实时米重: %{y:.4f} Kg/m'
))
# 添加预测米重曲线 - 只对预测的数据(稳态或全部)显示
fig.add_trace(go.Scatter(
x=df_pred['time'],
y=df_pred['预测米重'],
name='预测米重',
mode='lines',
line=dict(color='red', width=2, dash='dash'),
marker=dict(size=3),
# hovertemplate='时间: %{x}
预测米重: %{y:.4f} Kg/m'
))
# 添加所有挤出机参数曲线 - 使用所有数据
colors = ['green', 'orange', 'purple', 'brown', 'pink', 'gray', 'olive', 'cyan', 'magenta', 'yellow', 'lime', 'teal']
for i, feature in enumerate(required_features):
# 为每个特征分配不同的颜色
color = colors[i % len(colors)]
# 确保特征存在于所有数据中
if feature in df_all.columns:
fig.add_trace(go.Scatter(
x=df_all['time'],
y=df_all[feature],
name=feature,
mode='lines',
line=dict(color=color, width=1.5),
yaxis=f'y{i+2}',
# hovertemplate=f'时间: %{{x}}
{feature}: %{{y}}'
))
# 配置图表布局
layout = {
'title': '米重预测与实时数据对比',
'xaxis': {
'title': '时间',
'rangeslider': {'visible': True},
'type': 'date',
'tickformat': '%Y-%m-%d %H:%M'
},
'yaxis': {
'title': '米重 (Kg/m)',
'title_font': {'color': 'blue'},
'tickfont': {'color': 'blue'},
'side': 'left',
'fixedrange': False # 允许y轴缩放
},
'legend': {
'orientation': 'h',
'yanchor': 'bottom',
'y': 1.02,
'xanchor': 'right',
'x': 1
},
'height': 600,
'margin': {'l': 100, 'r': 200, 't': 100, 'b': 100},
'hovermode': 'x unified'
}
# 添加额外的y轴配置 - 为所有特征创建y轴
for i, feature in enumerate(required_features):
layout[f'yaxis{i+2}'] = {
'title': feature,
'title_font': {'color': colors[i % len(colors)]},
'tickfont': {'color': colors[i % len(colors)]},
'overlaying': 'y',
'side': 'right',
'anchor': 'free',
'position': 1 - (i+1)*0.08,
'fixedrange': False # 允许y轴缩放
}
fig.update_layout(layout)
# 显示趋势图 - 启用完整的交互功能
st.plotly_chart(fig, use_container_width=True, config={
'scrollZoom': True,
'displayModeBar': True,
'modeBarButtonsToAdd': ['pan2d', 'select2d', 'lasso2d', 'resetScale2d'],
'displaylogo': False
})
# 误差分析图
st.subheader("📉 预测误差分析")
# 创建误差分布直方图
fig_error = px.histogram(df_pred.dropna(subset=['相对误差']), x='相对误差', nbins=50,
title='预测相对误差分布',
labels={'相对误差': '相对误差 (%)'})
fig_error.update_layout(
xaxis_title='相对误差 (%)',
yaxis_title='频次',
height=400
)
st.plotly_chart(fig_error, use_container_width=True)
# 数据预览
st.subheader("🔍 数据预览")
preview_columns = ['time', '米重', '预测米重', '误差', '绝对误差', '相对误差']
if 'is_steady' in df_pred.columns:
preview_columns.append('is_steady')
preview_columns.extend(required_features)
st.dataframe(df_pred[preview_columns].head(20),
use_container_width=True)
# 导出数据
st.subheader("💾 导出数据")
# 将数据转换为CSV格式
csv = df_pred.to_csv(index=False)
# 创建下载按钮
st.download_button(
label="导出预测结果数据 (CSV)",
data=csv,
file_name=f"metered_weight_forecast_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv",
help="点击按钮导出预测结果数据"
)
elif query_button:
st.warning("请先选择一个模型。")
else:
st.info("请选择时间范围和模型,然后点击'查询数据'按钮开始预测分析。")
# 页面入口
if __name__ == "__main__":
show_metered_weight_forecast()