import streamlit as st import plotly.express as px import plotly.graph_objects as go import pandas as pd from datetime import datetime, timedelta from app.services.data_query_service import DataQueryService from app.services.data_processing_service import DataProcessingService def show_sorting_dashboard(): # 初始化服务 query_service = DataQueryService() processing_service = DataProcessingService() # 页面标题 st.title("分拣磅秤数据分析") # 初始化会话状态用于日期同步 if 'sorting_start_date' not in st.session_state: st.session_state['sorting_start_date'] = datetime.now().date() - timedelta(days=7) if 'sorting_end_date' not in st.session_state: st.session_state['sorting_end_date'] = datetime.now().date() if 'sorting_quick_select' not in st.session_state: st.session_state['sorting_quick_select'] = "最近7天" # 定义回调函数 def update_dates(qs): st.session_state['sorting_quick_select'] = qs today = datetime.now().date() if qs == "今天": st.session_state['sorting_start_date'] = today st.session_state['sorting_end_date'] = today elif qs == "最近3天": st.session_state['sorting_start_date'] = today - timedelta(days=3) st.session_state['sorting_end_date'] = today elif qs == "最近7天": st.session_state['sorting_start_date'] = today - timedelta(days=7) st.session_state['sorting_end_date'] = today elif qs == "最近30天": st.session_state['sorting_start_date'] = today - timedelta(days=30) st.session_state['sorting_end_date'] = today def on_date_change(): st.session_state['sorting_quick_select'] = "自定义" # 查询条件区域 with st.expander("🔍 查询配置", expanded=True): # 添加自定义 CSS 实现响应式换行 st.markdown(""" """, unsafe_allow_html=True) # 创建布局 cols = st.columns([1, 1, 1, 1, 1, 1.5, 1.5, 1]) options = ["今天", "最近3天", "最近7天", "最近30天", "自定义"] for i, option in enumerate(options): with cols[i]: # 根据当前选择状态决定按钮类型 button_type = "primary" if st.session_state['sorting_quick_select'] == option else "secondary" if st.button(option, key=f"btn_{option}", width='stretch', type=button_type): update_dates(option) st.rerun() with cols[5]: start_date = st.date_input( "开始日期", label_visibility="collapsed", key="sorting_start_date", on_change=on_date_change ) with cols[6]: end_date = st.date_input( "结束日期", label_visibility="collapsed", key="sorting_end_date", on_change=on_date_change ) with cols[7]: query_button = st.button("🚀 查询", key="sorting_query", width='stretch') # 转换为datetime对象(包含时间) start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) # 查询按钮处理 if query_button: # 验证日期范围 if start_datetime > end_datetime: st.error("开始日期不能晚于结束日期!") else: # 显示加载状态 with st.spinner("正在查询数据..."): # 查询数据 raw_data = query_service.get_sorting_scale_data(start_datetime, end_datetime) if raw_data is None or raw_data.empty: st.warning("未查询到数据,请检查日期范围或数据库连接!") st.session_state['query_results'] = None else: # 清洗数据 cleaned_data = processing_service.clean_data(raw_data) # 计算统计信息 stats = processing_service.calculate_statistics(cleaned_data) # 分析极值点 extreme_analysis = processing_service.analyze_extreme_points(cleaned_data) extreme_points = extreme_analysis['extreme_points'] phase_maxima = extreme_analysis['phase_maxima'] overall_pass_rate = extreme_analysis['overall_pass_rate'] # 缓存结果 st.session_state['query_results'] = { 'cleaned_data': cleaned_data, 'stats': stats, 'extreme_points': extreme_points, 'phase_maxima': phase_maxima, 'overall_pass_rate': overall_pass_rate } # 显示数据概览 st.subheader("数据概览") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("总记录数", stats.get('total_records', 0)) with col2: st.metric("平均合格数", round(stats.get('count_in_range', {}).get('mean', 0), 2)) with col3: st.metric("数据时间范围", f"{cleaned_data['time'].min()} 至 {cleaned_data['time'].max()}") with col4: st.metric("整体合格率", f"{overall_pass_rate}%") # 显示趋势图 st.subheader("数据趋势图") fig = px.line( cleaned_data, x='time', y=['count_under', 'count_in_range', 'count_over'], labels={ 'time': '时间', 'value': '数量', 'variable': '数据类型' }, title='分拣磅秤数据趋势', color_discrete_map={ 'count_under': 'red', 'count_in_range': 'green', 'count_over': 'blue' } ) # 自定义图例 fig.for_each_trace(lambda t: t.update(name={ 'count_under': '低于标准', 'count_in_range': '在标准范围内', 'count_over': '高于标准' }[t.name])) # 配置图表缩放功能 fig.update_layout( xaxis=dict( fixedrange=False, rangeslider=dict(visible=True) ), yaxis=dict(fixedrange=False), hovermode='x unified', dragmode='zoom' ) # 配置图表参数 config = {'scrollZoom': True} # 显示图表 st.plotly_chart(fig, width='stretch', config=config) # 显示极值点分析 st.subheader("极值点分析") if not extreme_points.empty: # 准备展示数据 display_df = extreme_points[['time', 'count_under', 'count_in_range', 'count_over', 'pass_rate']].copy() # 格式化时间戳 display_df['time'] = display_df['time'].dt.strftime('%Y-%m-%d %H:%M:%S') # 修改列名 display_df.columns = ['时间戳', '超下限数值', '范围内数值', '超上限数值', '合格率(%)'] # 显示数据表格 st.dataframe(display_df, use_container_width=True) # 显示极值点数量 st.info(f"共识别到 {len(extreme_points)} 个极值点") # 添加导出功能 import io # 创建CSV数据 csv_buffer = io.StringIO() display_df.to_csv(csv_buffer, index=False, encoding='utf-8-sig') csv_data = csv_buffer.getvalue() # 添加下载按钮 st.download_button( label="下载极值点分析结果", data=csv_data, file_name=f"极值点分析_{start_date}_{end_date}.csv", mime="text/csv" ) else: st.warning("未识别到极值点") # 显示重量趋势图 st.subheader("重量趋势图") if 'weight' in cleaned_data.columns: # 创建图表 weight_fig = go.Figure() # 检查是否包含阈值相关字段 has_thresholds = all(col in cleaned_data.columns for col in ['baseline_value', 'over_difference', 'under_difference']) # 计算动态阈值 if has_thresholds: # 复制数据以避免修改原始数据 threshold_data = cleaned_data.copy() # 处理零值 for col in ['baseline_value', 'over_difference', 'under_difference']: threshold_data[col] = threshold_data[col].replace(0, pd.NA) # 向前填充缺失值 threshold_data = threshold_data.ffill() # 计算上下限阈值 threshold_data['upper_threshold'] = threshold_data['over_difference'] threshold_data['lower_threshold'] = threshold_data['under_difference'] # 标记超出阈值的点 threshold_data['is_out_of_range'] = (threshold_data['weight'] > threshold_data['upper_threshold']) | (threshold_data['weight'] < threshold_data['lower_threshold']) # 添加基准值线(动态) weight_fig.add_trace(go.Scatter( x=threshold_data['time'], y=threshold_data['baseline_value'], name='基准值', line=dict(color='green', width=2), opacity=0.7 )) # 添加上限阈值线(动态) weight_fig.add_trace(go.Scatter( x=threshold_data['time'], y=threshold_data['upper_threshold'], name='上限阈值', line=dict(color='red', width=2), opacity=0.7 )) # 添加下限阈值线(动态) weight_fig.add_trace(go.Scatter( x=threshold_data['time'], y=threshold_data['lower_threshold'], name='下限阈值', line=dict(color='orange', width=2), opacity=0.7 )) # 分离正常和异常数据点 normal_data = threshold_data[~threshold_data['is_out_of_range']] out_of_range_data = threshold_data[threshold_data['is_out_of_range']] # 添加正常重量点 weight_fig.add_trace(go.Scatter( x=normal_data['time'], y=normal_data['weight'], name='重量 (正常)', opacity=0.8, mode='markers', marker=dict( size=4, color='blue', symbol='circle', line=dict(width=0, color='blue') ) )) # 添加异常重量点 if not out_of_range_data.empty: weight_fig.add_trace(go.Scatter( x=out_of_range_data['time'], y=out_of_range_data['weight'], name='重量 (异常)', opacity=0.8, mode='markers', marker=dict( size=4, color='red', symbol='triangle-up', line=dict(width=2, color='darkred') ) )) else: # 没有阈值数据,只显示重量趋势 weight_fig.add_trace(go.Scatter( x=cleaned_data['time'], y=cleaned_data['weight'], name='重量', line=dict(color='blue', width=2), opacity=0.8 )) st.warning("数据中不包含阈值相关字段,无法显示阈值线和异常警示!") # 配置图表布局 weight_fig.update_layout( title='重量随时间变化趋势', xaxis_title='时间', yaxis_title='重量', xaxis=dict( rangeslider=dict(visible=True), type='date', fixedrange=False ), yaxis=dict(fixedrange=False), legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), hovermode='x unified', height=600, dragmode='zoom', updatemenus=[ dict( type="buttons", direction="left", buttons=list([ dict(args=["visible", [True, True, True, True, True]], label="显示全部", method="restyle"), dict(args=["visible", [False, False, False, True, True]], label="仅显示重量", method="restyle"), dict(args=["visible", [True, True, True, False, False]], label="仅显示阈值", method="restyle"), dict(args=["visible", [True, True, True, True, False]], label="显示正常重量", method="restyle"), dict(args=["visible", [True, True, True, False, True]], label="显示异常重量", method="restyle") ]), pad={"r": 10, "t": 10}, showactive=True, x=0.1, xanchor="left", y=1.1, yanchor="top" ), ] ) # 显示图表 st.plotly_chart(weight_fig, width='stretch', config={'scrollZoom': True}) # 显示数据表格 st.subheader("原始数据") st.dataframe(cleaned_data, use_container_width=True) # 显示详细统计信息 if stats: st.subheader("详细统计信息") with st.expander("查看详细统计"): col_stats1, col_stats2, col_stats3 = st.columns(3) with col_stats1: st.write("**低于标准**") st.write(f"平均值: {round(stats['count_under']['mean'], 2)}") st.write(f"总和: {stats['count_under']['sum']}") st.write(f"最大值: {stats['count_under']['max']}") st.write(f"最小值: {stats['count_under']['min']}") with col_stats2: st.write("**在标准范围内**") st.write(f"平均值: {round(stats['count_in_range']['mean'], 2)}") st.write(f"总和: {stats['count_in_range']['sum']}") st.write(f"最大值: {stats['count_in_range']['max']}") st.write(f"最小值: {stats['count_in_range']['min']}") with col_stats3: st.write("**高于标准**") st.write(f"平均值: {round(stats['count_over']['mean'], 2)}") st.write(f"总和: {stats['count_over']['sum']}") st.write(f"最大值: {stats['count_over']['max']}") st.write(f"最小值: {stats['count_over']['min']}") # 数据库连接状态 st.sidebar.subheader("数据库状态") if query_service.db.is_connected(): st.sidebar.success("数据库连接正常") else: st.sidebar.warning("数据库未连接") if __name__ == "__main__": show_sorting_dashboard()