import streamlit as st import plotly.express as px import pandas as pd from datetime import datetime, timedelta from app.services.data_query_service import DataQueryService from app.services.data_processing_service import DataProcessingService from app.services.extruder_service import ExtruderService # 设置页面配置 st.set_page_config( page_title="数据分析系统", page_icon="📊", layout="wide" ) # 左侧菜单导航 st.sidebar.title("系统导航") menu = st.sidebar.radio( "选择分析模块", ["分拣磅秤", "挤出机"], index=0, key="main_menu" ) # 右侧内容区域 if menu == "分拣磅秤": # 初始化服务 query_service = DataQueryService() processing_service = DataProcessingService() # 页面标题 st.title("分拣磅秤数据分析") # 查询条件区域(迁移到右侧顶部) st.subheader("查询配置") col1, col2, col3 = st.columns([2, 2, 1]) with col1: start_date = st.date_input("开始日期", datetime.now() - timedelta(days=7), key="sorting_start_date") with col2: end_date = st.date_input("结束日期", datetime.now(), key="sorting_end_date") with col3: st.write("") # 占位 query_button = st.button("查询数据", key="sorting_query") # 转换为datetime对象(包含时间) start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) # 查询按钮处理 if query_button: # 验证日期范围 if start_datetime > end_datetime: st.error("开始日期不能晚于结束日期!") else: # 显示加载状态 with st.spinner("正在查询数据..."): # 查询数据 raw_data = query_service.get_sorting_scale_data(start_datetime, end_datetime) if raw_data is None or raw_data.empty: st.warning("未查询到数据,请检查日期范围或数据库连接!") st.session_state['query_results'] = None else: # 清洗数据 cleaned_data = processing_service.clean_data(raw_data) # 计算统计信息 stats = processing_service.calculate_statistics(cleaned_data) # 分析极值点 extreme_analysis = processing_service.analyze_extreme_points(cleaned_data) extreme_points = extreme_analysis['extreme_points'] phase_maxima = extreme_analysis['phase_maxima'] overall_pass_rate = extreme_analysis['overall_pass_rate'] # 缓存结果 st.session_state['query_results'] = { 'cleaned_data': cleaned_data, 'stats': stats, 'extreme_points': extreme_points, 'phase_maxima': phase_maxima, 'overall_pass_rate': overall_pass_rate } # 显示数据概览 st.subheader("数据概览") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("总记录数", stats.get('total_records', 0)) with col2: st.metric("平均合格数", round(stats.get('count_in_range', {}).get('mean', 0), 2)) with col3: st.metric("数据时间范围", f"{cleaned_data['time'].min()} 至 {cleaned_data['time'].max()}") with col4: st.metric("整体合格率", f"{overall_pass_rate}%") # 显示趋势图 st.subheader("数据趋势图") fig = px.line( cleaned_data, x='time', y=['count_under', 'count_in_range', 'count_over'], labels={ 'time': '时间', 'value': '数量', 'variable': '数据类型' }, title='分拣磅秤数据趋势', color_discrete_map={ 'count_under': 'red', 'count_in_range': 'green', 'count_over': 'blue' } ) # 自定义图例 fig.for_each_trace(lambda t: t.update(name={ 'count_under': '低于标准', 'count_in_range': '在标准范围内', 'count_over': '高于标准' }[t.name])) # 配置图表缩放功能 fig.update_layout( xaxis=dict( fixedrange=False ), yaxis=dict( fixedrange=False ), dragmode='zoom' ) # 配置图表参数 config = { 'scrollZoom': True } # 显示图表 st.plotly_chart(fig, use_container_width=True, config=config) # 显示极值点分析 st.subheader("极值点分析") if not extreme_points.empty: # 准备展示数据 display_df = extreme_points[['time', 'count_under', 'count_in_range', 'count_over', 'pass_rate']].copy() # 格式化时间戳 display_df['time'] = display_df['time'].dt.strftime('%Y-%m-%d %H:%M:%S') # 修改列名 display_df.columns = ['时间戳', '超下限数值', '范围内数值', '超上限数值', '合格率(%)'] # 显示数据表格 st.dataframe(display_df, use_container_width=True) # 显示极值点数量 st.info(f"共识别到 {len(extreme_points)} 个极值点") # 添加导出功能 import csv import io # 创建CSV数据 csv_buffer = io.StringIO() display_df.to_csv(csv_buffer, index=False, encoding='utf-8-sig') csv_data = csv_buffer.getvalue() # 添加下载按钮 st.download_button( label="下载极值点分析结果", data=csv_data, file_name=f"极值点分析_{start_date}_{end_date}.csv", mime="text/csv" ) else: st.warning("未识别到极值点") # 显示重量趋势图 st.subheader("重量趋势图") if 'weight' in cleaned_data.columns: # 创建重量趋势图 import plotly.graph_objects as go # 创建图表 weight_fig = go.Figure() # 检查是否包含阈值相关字段 has_thresholds = all(col in cleaned_data.columns for col in ['baseline_value', 'over_difference', 'under_difference']) # 计算动态阈值 if has_thresholds: # 复制数据以避免修改原始数据 threshold_data = cleaned_data.copy() # 处理零值 for col in ['baseline_value', 'over_difference', 'under_difference']: threshold_data[col] = threshold_data[col].replace(0, pd.NA) # 向前填充缺失值 threshold_data = threshold_data.ffill() # 计算上下限阈值 threshold_data['upper_threshold'] = threshold_data['over_difference'] threshold_data['lower_threshold'] = threshold_data['under_difference'] # 标记超出阈值的点 threshold_data['is_out_of_range'] = (threshold_data['weight'] > threshold_data['upper_threshold']) | (threshold_data['weight'] < threshold_data['lower_threshold']) # 添加基准值线(动态) weight_fig.add_trace(go.Scatter( x=threshold_data['time'], y=threshold_data['baseline_value'], name='基准值', line=dict(color='green', width=2), opacity=0.7 )) # 添加上限阈值线(动态) weight_fig.add_trace(go.Scatter( x=threshold_data['time'], y=threshold_data['upper_threshold'], name='上限阈值', line=dict(color='red', width=2), opacity=0.7 )) # 添加下限阈值线(动态) weight_fig.add_trace(go.Scatter( x=threshold_data['time'], y=threshold_data['lower_threshold'], name='下限阈值', line=dict(color='orange', width=2), opacity=0.7 )) # 分离正常和异常数据点 normal_data = threshold_data[~threshold_data['is_out_of_range']] out_of_range_data = threshold_data[threshold_data['is_out_of_range']] # 添加正常重量点 weight_fig.add_trace(go.Scatter( x=normal_data['time'], y=normal_data['weight'], name='重量 (正常)', opacity=0.8, mode='markers', marker=dict( size=4, color='blue', symbol='circle', line=dict( width=0, color='blue' ) ) )) # 添加异常重量点 if not out_of_range_data.empty: weight_fig.add_trace(go.Scatter( x=out_of_range_data['time'], y=out_of_range_data['weight'], name='重量 (异常)', opacity=0.8, mode='markers', marker=dict( size=4, color='red', symbol='triangle-up', line=dict( width=2, color='darkred' ) ) )) else: # 没有阈值数据,只显示重量趋势 weight_fig.add_trace(go.Scatter( x=cleaned_data['time'], y=cleaned_data['weight'], name='重量', line=dict(color='blue', width=2), opacity=0.8 )) st.warning("数据中不包含阈值相关字段,无法显示阈值线和异常警示!") # 配置图表布局 weight_fig.update_layout( title='重量随时间变化趋势', xaxis_title='时间', yaxis_title='重量', xaxis=dict( rangeslider=dict( visible=True ), type='date', fixedrange=False ), yaxis=dict( fixedrange=False ), legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), hovermode='x unified', height=600, dragmode='zoom', # 添加自定义工具栏按钮 updatemenus=[ dict( type="buttons", direction="left", buttons=list([ dict( args=["visible", [True, True, True, True, True]], label="显示全部", method="restyle" ), dict( args=["visible", [False, False, False, True, True]], label="仅显示重量", method="restyle" ), dict( args=["visible", [True, True, True, False, False]], label="仅显示阈值", method="restyle" ), dict( args=["visible", [True, True, True, True, False]], label="显示正常重量", method="restyle" ), dict( args=["visible", [True, True, True, False, True]], label="显示异常重量", method="restyle" ) ]), pad={"r": 10, "t": 10}, showactive=True, x=0.1, xanchor="left", y=1.1, yanchor="top" ), ] ) # 配置图表参数 config = { 'scrollZoom': True, 'toImageButtonOptions': { 'format': 'png', 'filename': '重量趋势图', 'height': 600, 'width': 1000, 'scale': 1 } } # 显示图表 st.plotly_chart(weight_fig, use_container_width=True, config=config) # 显示数据表格 st.subheader("原始数据") st.dataframe(cleaned_data, use_container_width=True) # 显示详细统计信息 if stats: st.subheader("详细统计信息") with st.expander("查看详细统计"): col_stats1, col_stats2, col_stats3 = st.columns(3) with col_stats1: st.write("**低于标准**") st.write(f"平均值: {round(stats['count_under']['mean'], 2)}") st.write(f"总和: {stats['count_under']['sum']}") st.write(f"最大值: {stats['count_under']['max']}") st.write(f"最小值: {stats['count_under']['min']}") with col_stats2: st.write("**在标准范围内**") st.write(f"平均值: {round(stats['count_in_range']['mean'], 2)}") st.write(f"总和: {stats['count_in_range']['sum']}") st.write(f"最大值: {stats['count_in_range']['max']}") st.write(f"最小值: {stats['count_in_range']['min']}") with col_stats3: st.write("**高于标准**") st.write(f"平均值: {round(stats['count_over']['mean'], 2)}") st.write(f"总和: {stats['count_over']['sum']}") st.write(f"最大值: {stats['count_over']['max']}") st.write(f"最小值: {stats['count_over']['min']}") # 数据库连接状态 st.sidebar.subheader("数据库状态") if 'query_service' in locals() and query_service.db.is_connected(): st.sidebar.success("数据库连接正常") else: st.sidebar.warning("数据库未连接") elif menu == "挤出机": # 初始化服务 extruder_service = ExtruderService() processing_service = DataProcessingService() # 页面标题 st.title("挤出机数据分析") # 查询条件区域 st.subheader("查询配置") col1, col2, col3 = st.columns([2, 2, 1]) with col1: start_date = st.date_input("开始日期", datetime.now() - timedelta(days=7), key="extruder_start_date") with col2: end_date = st.date_input("结束日期", datetime.now(), key="extruder_end_date") with col3: st.write("") # 占位 query_button = st.button("查询数据", key="extruder_query") # 转换为datetime对象(包含时间) start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) # 查询按钮处理 if query_button: # 验证日期范围 if start_datetime > end_datetime: st.error("开始日期不能晚于结束日期!") else: # 显示加载状态 with st.spinner("正在查询数据..."): # 查询数据 raw_data = extruder_service.get_extruder_data(start_datetime, end_datetime) if raw_data is None or raw_data.empty: st.warning("未查询到数据,请检查日期范围或数据库连接!") st.session_state['extruder_results'] = None else: # 清洗数据 cleaned_data = processing_service.clean_data(raw_data) # 检测换批事件 batch_changes = extruder_service.detect_batch_changes(cleaned_data) # 分析参数趋势 # trends = extruder_service.analyze_parameter_trends(cleaned_data) # 缓存结果 st.session_state['extruder_results'] = { 'cleaned_data': cleaned_data, 'batch_changes': batch_changes, # 'trends': trends, } # 显示数据概览 st.subheader("数据概览") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("总记录数", len(cleaned_data)) with col2: st.metric("换批次数", len(batch_changes)) with col3: st.metric("数据时间范围", f"{cleaned_data['time'].min()} 至 {cleaned_data['time'].max()}") # 显示换批分析 st.subheader("换批分析") if not batch_changes.empty: # 准备展示数据 batch_display = batch_changes[['batch_id', 'compound_code', 'start_time', 'end_time', 'duration_minutes']].copy() # 格式化时间 batch_display['start_time'] = batch_display['start_time'].dt.strftime('%Y-%m-%d %H:%M:%S') batch_display['end_time'] = batch_display['end_time'].dt.strftime('%Y-%m-%d %H:%M:%S') # 修改列名 batch_display.columns = ['批号', '胶料号', '开始时间', '结束时间', '持续时长(分钟)'] # 显示数据表格 st.dataframe(batch_display, use_container_width=True) else: st.warning("未检测到换批事件") # 显示换料操作可视化图表 st.subheader("换料操作可视化") if not batch_changes.empty: # 创建换料操作可视化图表 import plotly.graph_objects as go # 准备图表数据 fig = go.Figure() # 添加关键参数趋势线 fig.add_trace(go.Scatter( x=cleaned_data['time'], y=cleaned_data['screw_speed_actual'], name='实际转速', line=dict(color='blue', width=2), opacity=0.8 )) fig.add_trace(go.Scatter( x=cleaned_data['time'], y=cleaned_data['head_pressure'], name='机头压力', line=dict(color='red', width=2), opacity=0.8, yaxis='y2' )) fig.add_trace(go.Scatter( x=cleaned_data['time'], y=cleaned_data['extruder_current'], name='挤出机电流', line=dict(color='green', width=2), opacity=0.8, yaxis='y3' )) fig.add_trace(go.Scatter( x=cleaned_data['time'], y=cleaned_data['metered_weight'], name='米重', line=dict(color='orange', width=2), opacity=0.8, yaxis='y4' )) # 添加换料事件标记 for i, row in batch_changes.iterrows(): # 添加垂直线 fig.add_shape( type="line", x0=row['start_time'], y0=0, x1=row['start_time'], y1=1, yref="paper", line=dict( color="purple", width=2, dash="dash" ) ) # 添加注释 fig.add_annotation( x=row['start_time'], y=1, yref='paper', text=f'换料: {row["compound_code"]}\n批号: {row["batch_id"]}', showarrow=True, arrowhead=1, ax=0, ay=-60 ) # 配置图表布局 fig.update_layout( title='换料操作关键参数变化趋势', xaxis_title='时间', xaxis=dict( rangeslider=dict( visible=True ), type='date' ), yaxis_title='实际转速 (rpm)', yaxis2=dict( title='机头压力 (MPa)', overlaying='y', side='right', position=0.85 ), yaxis3=dict( title='挤出机电流 (A)', overlaying='y', side='right', position=0.92 ), yaxis4=dict( title='米重 (kg)', overlaying='y', side='right', position=1 ), legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), hovermode='x unified', height=700 ) # 显示图表 st.plotly_chart(fig, use_container_width=True) # 添加数据导出功能 import csv import io import pandas as pd # 准备导出数据 export_data = [] for i, row in batch_changes.iterrows(): # 获取换料前后的数据 before_change = cleaned_data[cleaned_data['time'] < row['start_time']].tail(5) after_change = cleaned_data[cleaned_data['time'] >= row['start_time']].head(5) # 添加换料事件记录 export_data.append({ 'event_type': '换料事件', 'batch_id': row['batch_id'], 'compound_code': row['compound_code'], 'time': row['start_time'], 'screw_speed': '', 'head_pressure': '', 'extruder_current': '', 'metered_weight': '' }) # 添加换料前数据 for _, before_row in before_change.iterrows(): export_data.append({ 'event_type': '换料前', 'batch_id': row['batch_id'], 'compound_code': row['compound_code'], 'time': before_row['time'], 'screw_speed': before_row['screw_speed_actual'], 'head_pressure': before_row['head_pressure'], 'extruder_current': before_row['extruder_current'], 'metered_weight': before_row['metered_weight'] }) # 添加换料后数据 for _, after_row in after_change.iterrows(): export_data.append({ 'event_type': '换料后', 'batch_id': row['batch_id'], 'compound_code': row['compound_code'], 'time': after_row['time'], 'screw_speed': after_row['screw_speed_actual'], 'head_pressure': after_row['head_pressure'], 'extruder_current': after_row['extruder_current'], 'metered_weight': after_row['metered_weight'] }) # 转换为DataFrame export_df = pd.DataFrame(export_data) # 创建CSV数据 csv_buffer = io.StringIO() export_df.to_csv(csv_buffer, index=False, encoding='utf-8-sig') csv_data = csv_buffer.getvalue() # 添加下载按钮 st.download_button( label="下载换料操作分析数据", data=csv_data, file_name=f"换料操作分析_{start_date}_{end_date}.csv", mime="text/csv" ) else: st.warning("未检测到换批事件,无法生成换料操作图表") # 显示原始数据 st.subheader("原始数据") st.dataframe(cleaned_data, use_container_width=True) # 数据库连接状态 st.sidebar.subheader("数据库状态") if 'extruder_service' in locals() and extruder_service.db.is_connected(): st.sidebar.success("数据库连接正常") else: st.sidebar.warning("数据库未连接") # 页脚 st.sidebar.markdown("---") st.sidebar.markdown("© 2026 数据分析系统") # 缓存清理机制 def clear_cache(): """清理会话缓存,当切换菜单或更新查询条件时调用""" if 'query_results' in st.session_state: del st.session_state['query_results'] if 'extruder_results' in st.session_state: del st.session_state['extruder_results'] # 初始化会话状态 if 'query_results' not in st.session_state: st.session_state['query_results'] = None if 'extruder_results' not in st.session_state: st.session_state['extruder_results'] = None # 监听菜单切换,清理缓存 if 'previous_menu' not in st.session_state: st.session_state['previous_menu'] = menu elif st.session_state['previous_menu'] != menu: clear_cache() st.session_state['previous_menu'] = menu # 关闭数据库连接(当应用结束时) def on_app_close(): if 'query_service' in locals(): query_service.close_connection() if 'extruder_service' in locals(): extruder_service.close_connection() # 注册应用关闭回调 st.session_state['app_close'] = on_app_close