import streamlit as st
|
import plotly.express as px
|
import plotly.graph_objects as go
|
import pandas as pd
|
from datetime import datetime, timedelta
|
from app.services.data_query_service import DataQueryService
|
from app.services.data_processing_service import DataProcessingService
|
|
def show_sorting_dashboard():
|
# 初始化服务
|
query_service = DataQueryService()
|
processing_service = DataProcessingService()
|
|
# 页面标题
|
st.title("分拣磅秤数据分析")
|
|
# 初始化会话状态用于日期同步
|
if 'sorting_start_date' not in st.session_state:
|
st.session_state['sorting_start_date'] = datetime.now().date() - timedelta(days=7)
|
if 'sorting_end_date' not in st.session_state:
|
st.session_state['sorting_end_date'] = datetime.now().date()
|
if 'sorting_quick_select' not in st.session_state:
|
st.session_state['sorting_quick_select'] = "最近7天"
|
|
# 定义回调函数
|
def update_dates(qs):
|
st.session_state['sorting_quick_select'] = qs
|
today = datetime.now().date()
|
if qs == "今天":
|
st.session_state['sorting_start_date'] = today
|
st.session_state['sorting_end_date'] = today
|
elif qs == "最近3天":
|
st.session_state['sorting_start_date'] = today - timedelta(days=3)
|
st.session_state['sorting_end_date'] = today
|
elif qs == "最近7天":
|
st.session_state['sorting_start_date'] = today - timedelta(days=7)
|
st.session_state['sorting_end_date'] = today
|
elif qs == "最近30天":
|
st.session_state['sorting_start_date'] = today - timedelta(days=30)
|
st.session_state['sorting_end_date'] = today
|
|
def on_date_change():
|
st.session_state['sorting_quick_select'] = "自定义"
|
|
# 查询条件区域
|
with st.expander("🔍 查询配置", expanded=True):
|
# 添加自定义 CSS 实现响应式换行
|
st.markdown("""
|
<style>
|
/* 强制列容器换行 */
|
[data-testid="stExpander"] [data-testid="column"] {
|
flex: 1 1 120px !important;
|
min-width: 120px !important;
|
}
|
/* 针对日期输入框列稍微加宽一点 */
|
@media (min-width: 768px) {
|
[data-testid="stExpander"] [data-testid="column"]:nth-child(6),
|
[data-testid="stExpander"] [data-testid="column"]:nth-child(7) {
|
flex: 2 1 180px !important;
|
min-width: 180px !important;
|
}
|
}
|
</style>
|
""", unsafe_allow_html=True)
|
|
# 创建布局
|
cols = st.columns([1, 1, 1, 1, 1, 1.5, 1.5, 1])
|
|
options = ["今天", "最近3天", "最近7天", "最近30天", "自定义"]
|
for i, option in enumerate(options):
|
with cols[i]:
|
# 根据当前选择状态决定按钮类型
|
button_type = "primary" if st.session_state['sorting_quick_select'] == option else "secondary"
|
if st.button(option, key=f"btn_{option}", width='stretch', type=button_type):
|
update_dates(option)
|
st.rerun()
|
|
with cols[5]:
|
start_date = st.date_input(
|
"开始日期",
|
label_visibility="collapsed",
|
key="sorting_start_date",
|
on_change=on_date_change
|
)
|
|
with cols[6]:
|
end_date = st.date_input(
|
"结束日期",
|
label_visibility="collapsed",
|
key="sorting_end_date",
|
on_change=on_date_change
|
)
|
|
with cols[7]:
|
query_button = st.button("🚀 查询", key="sorting_query", width='stretch')
|
|
# 转换为datetime对象(包含时间)
|
start_datetime = datetime.combine(start_date, datetime.min.time())
|
end_datetime = datetime.combine(end_date, datetime.max.time())
|
|
# 查询按钮处理
|
if query_button:
|
# 验证日期范围
|
if start_datetime > end_datetime:
|
st.error("开始日期不能晚于结束日期!")
|
else:
|
# 显示加载状态
|
with st.spinner("正在查询数据..."):
|
# 查询数据
|
raw_data = query_service.get_sorting_scale_data(start_datetime, end_datetime)
|
|
if raw_data is None or raw_data.empty:
|
st.warning("未查询到数据,请检查日期范围或数据库连接!")
|
st.session_state['query_results'] = None
|
else:
|
# 清洗数据
|
cleaned_data = processing_service.clean_data(raw_data)
|
|
# 计算统计信息
|
stats = processing_service.calculate_statistics(cleaned_data)
|
|
# 分析极值点
|
extreme_analysis = processing_service.analyze_extreme_points(cleaned_data)
|
extreme_points = extreme_analysis['extreme_points']
|
phase_maxima = extreme_analysis['phase_maxima']
|
overall_pass_rate = extreme_analysis['overall_pass_rate']
|
|
# 缓存结果
|
st.session_state['query_results'] = {
|
'cleaned_data': cleaned_data,
|
'stats': stats,
|
'extreme_points': extreme_points,
|
'phase_maxima': phase_maxima,
|
'overall_pass_rate': overall_pass_rate
|
}
|
|
# 显示数据概览
|
st.subheader("数据概览")
|
col1, col2, col3, col4 = st.columns(4)
|
|
with col1:
|
st.metric("总记录数", stats.get('total_records', 0))
|
|
with col2:
|
st.metric("平均合格数", round(stats.get('count_in_range', {}).get('mean', 0), 2))
|
|
with col3:
|
st.metric("数据时间范围", f"{cleaned_data['time'].min()} 至 {cleaned_data['time'].max()}")
|
|
with col4:
|
st.metric("整体合格率", f"{overall_pass_rate}%")
|
|
# 显示趋势图
|
st.subheader("数据趋势图")
|
fig = px.line(
|
cleaned_data,
|
x='time',
|
y=['count_under', 'count_in_range', 'count_over'],
|
labels={
|
'time': '时间',
|
'value': '数量',
|
'variable': '数据类型'
|
},
|
title='分拣磅秤数据趋势',
|
color_discrete_map={
|
'count_under': 'red',
|
'count_in_range': 'green',
|
'count_over': 'blue'
|
}
|
)
|
|
# 自定义图例
|
fig.for_each_trace(lambda t: t.update(name={
|
'count_under': '低于标准',
|
'count_in_range': '在标准范围内',
|
'count_over': '高于标准'
|
}[t.name]))
|
|
# 配置图表缩放功能
|
fig.update_layout(
|
xaxis=dict(
|
fixedrange=False,
|
rangeslider=dict(visible=True)
|
),
|
yaxis=dict(fixedrange=False),
|
hovermode='x unified',
|
dragmode='zoom'
|
|
)
|
|
# 配置图表参数
|
config = {'scrollZoom': True}
|
|
# 显示图表
|
st.plotly_chart(fig, width='stretch', config=config)
|
|
# 显示极值点分析
|
st.subheader("极值点分析")
|
if not extreme_points.empty:
|
# 准备展示数据
|
display_df = extreme_points[['time', 'count_under', 'count_in_range', 'count_over', 'pass_rate']].copy()
|
|
# 格式化时间戳
|
display_df['time'] = display_df['time'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
# 修改列名
|
display_df.columns = ['时间戳', '超下限数值', '范围内数值', '超上限数值', '合格率(%)']
|
|
# 显示数据表格
|
st.dataframe(display_df, use_container_width=True)
|
|
# 显示极值点数量
|
st.info(f"共识别到 {len(extreme_points)} 个极值点")
|
|
# 添加导出功能
|
import io
|
|
# 创建CSV数据
|
csv_buffer = io.StringIO()
|
display_df.to_csv(csv_buffer, index=False, encoding='utf-8-sig')
|
csv_data = csv_buffer.getvalue()
|
|
# 添加下载按钮
|
st.download_button(
|
label="下载极值点分析结果",
|
data=csv_data,
|
file_name=f"极值点分析_{start_date}_{end_date}.csv",
|
mime="text/csv"
|
)
|
else:
|
st.warning("未识别到极值点")
|
|
|
# 显示重量趋势图
|
st.subheader("重量趋势图")
|
if 'weight' in cleaned_data.columns:
|
# 创建图表
|
weight_fig = go.Figure()
|
|
# 检查是否包含阈值相关字段
|
has_thresholds = all(col in cleaned_data.columns for col in ['baseline_value', 'over_difference', 'under_difference'])
|
|
# 计算动态阈值
|
if has_thresholds:
|
# 复制数据以避免修改原始数据
|
threshold_data = cleaned_data.copy()
|
|
# 处理零值
|
for col in ['baseline_value', 'over_difference', 'under_difference']:
|
threshold_data[col] = threshold_data[col].replace(0, pd.NA)
|
|
# 向前填充缺失值
|
threshold_data = threshold_data.ffill()
|
|
# 计算上下限阈值
|
threshold_data['upper_threshold'] = threshold_data['over_difference']
|
threshold_data['lower_threshold'] = threshold_data['under_difference']
|
|
# 标记超出阈值的点
|
threshold_data['is_out_of_range'] = (threshold_data['weight'] > threshold_data['upper_threshold']) | (threshold_data['weight'] < threshold_data['lower_threshold'])
|
|
# 添加基准值线(动态)
|
weight_fig.add_trace(go.Scatter(
|
x=threshold_data['time'],
|
y=threshold_data['baseline_value'],
|
name='基准值',
|
line=dict(color='green', width=2),
|
opacity=0.7
|
))
|
|
# 添加上限阈值线(动态)
|
weight_fig.add_trace(go.Scatter(
|
x=threshold_data['time'],
|
y=threshold_data['upper_threshold'],
|
name='上限阈值',
|
line=dict(color='red', width=2),
|
opacity=0.7
|
))
|
|
# 添加下限阈值线(动态)
|
weight_fig.add_trace(go.Scatter(
|
x=threshold_data['time'],
|
y=threshold_data['lower_threshold'],
|
name='下限阈值',
|
line=dict(color='orange', width=2),
|
opacity=0.7
|
))
|
|
# 分离正常和异常数据点
|
normal_data = threshold_data[~threshold_data['is_out_of_range']]
|
out_of_range_data = threshold_data[threshold_data['is_out_of_range']]
|
|
# 添加正常重量点
|
weight_fig.add_trace(go.Scatter(
|
x=normal_data['time'],
|
y=normal_data['weight'],
|
name='重量 (正常)',
|
opacity=0.8,
|
mode='markers',
|
marker=dict(
|
size=4,
|
color='blue',
|
symbol='circle',
|
line=dict(width=0, color='blue')
|
)
|
))
|
|
# 添加异常重量点
|
if not out_of_range_data.empty:
|
weight_fig.add_trace(go.Scatter(
|
x=out_of_range_data['time'],
|
y=out_of_range_data['weight'],
|
name='重量 (异常)',
|
opacity=0.8,
|
mode='markers',
|
marker=dict(
|
size=4,
|
color='red',
|
symbol='triangle-up',
|
line=dict(width=2, color='darkred')
|
)
|
))
|
else:
|
# 没有阈值数据,只显示重量趋势
|
weight_fig.add_trace(go.Scatter(
|
x=cleaned_data['time'],
|
y=cleaned_data['weight'],
|
name='重量',
|
line=dict(color='blue', width=2),
|
opacity=0.8
|
))
|
st.warning("数据中不包含阈值相关字段,无法显示阈值线和异常警示!")
|
|
# 配置图表布局
|
weight_fig.update_layout(
|
title='重量随时间变化趋势',
|
xaxis_title='时间',
|
yaxis_title='重量',
|
xaxis=dict(
|
rangeslider=dict(visible=True),
|
type='date',
|
fixedrange=False
|
),
|
yaxis=dict(fixedrange=False),
|
legend=dict(
|
orientation="h",
|
yanchor="bottom",
|
y=1.02,
|
xanchor="right",
|
x=1
|
),
|
hovermode='x unified',
|
height=600,
|
dragmode='zoom',
|
updatemenus=[
|
dict(
|
type="buttons",
|
direction="left",
|
buttons=list([
|
dict(args=["visible", [True, True, True, True, True]], label="显示全部", method="restyle"),
|
dict(args=["visible", [False, False, False, True, True]], label="仅显示重量", method="restyle"),
|
dict(args=["visible", [True, True, True, False, False]], label="仅显示阈值", method="restyle"),
|
dict(args=["visible", [True, True, True, True, False]], label="显示正常重量", method="restyle"),
|
dict(args=["visible", [True, True, True, False, True]], label="显示异常重量", method="restyle")
|
]),
|
pad={"r": 10, "t": 10},
|
showactive=True,
|
x=0.1,
|
xanchor="left",
|
y=1.1,
|
yanchor="top"
|
),
|
]
|
)
|
|
# 显示图表
|
st.plotly_chart(weight_fig, width='stretch', config={'scrollZoom': True})
|
|
# 显示数据表格
|
st.subheader("原始数据")
|
st.dataframe(cleaned_data, use_container_width=True)
|
|
# 显示详细统计信息
|
if stats:
|
st.subheader("详细统计信息")
|
with st.expander("查看详细统计"):
|
col_stats1, col_stats2, col_stats3 = st.columns(3)
|
|
with col_stats1:
|
st.write("**低于标准**")
|
st.write(f"平均值: {round(stats['count_under']['mean'], 2)}")
|
st.write(f"总和: {stats['count_under']['sum']}")
|
st.write(f"最大值: {stats['count_under']['max']}")
|
st.write(f"最小值: {stats['count_under']['min']}")
|
|
with col_stats2:
|
st.write("**在标准范围内**")
|
st.write(f"平均值: {round(stats['count_in_range']['mean'], 2)}")
|
st.write(f"总和: {stats['count_in_range']['sum']}")
|
st.write(f"最大值: {stats['count_in_range']['max']}")
|
st.write(f"最小值: {stats['count_in_range']['min']}")
|
|
with col_stats3:
|
st.write("**高于标准**")
|
st.write(f"平均值: {round(stats['count_over']['mean'], 2)}")
|
st.write(f"总和: {stats['count_over']['sum']}")
|
st.write(f"最大值: {stats['count_over']['max']}")
|
st.write(f"最小值: {stats['count_over']['min']}")
|
|
# 数据库连接状态
|
st.sidebar.subheader("数据库状态")
|
if query_service.db.is_connected():
|
st.sidebar.success("数据库连接正常")
|
else:
|
st.sidebar.warning("数据库未连接")
|
|
if __name__ == "__main__":
|
show_sorting_dashboard()
|