import pandas as pd
|
import numpy as np
|
|
# 尝试导入torch,如果失败则标记为不可用
|
try:
|
import torch
|
except ImportError:
|
torch = None
|
|
class ParameterAdjustmentAdvisor:
|
"""
|
挤出机参数调节建议器
|
根据实时米重与标准米重的偏差,给出螺杆转速和流程主速的调整建议
|
"""
|
|
def __init__(self):
|
# 初始化默认参数关系系数
|
# 这些系数表示米重偏差1%时,参数需要调整的百分比
|
# 可以根据实际生产数据进行优化
|
self.default_coefficients = {
|
'screw_speed': 0.1, # 米重偏差1%,螺杆转速调整0.3%(降低调整幅度)
|
'process_main_speed': -0.1 # 米重偏差1%,流程主速调整-0.2%(降低调整幅度)
|
}
|
|
# 默认参数上下限(可根据实际设备情况调整)
|
self.default_limits = {
|
'screw_speed': {'min': 30, 'max': 500},
|
'process_main_speed': {'min': 0, 'max': 200}
|
}
|
|
# 最大调整幅度限制(百分比)
|
self.max_adjustment_percentage = {
|
'screw_speed': 15.0, # 螺杆转速单次最大调整15%
|
'process_main_speed': 10.0 # 流程主速单次最大调整10%
|
}
|
|
def calculate_adjustment(self, real_time_weight, standard_weight, upper_limit, lower_limit,
|
current_screw_speed, current_process_speed,
|
current_screw_temperature=None, current_rear_barrel_temperature=None,
|
current_front_barrel_temperature=None, current_head_temperature=None,
|
coefficients=None, limits=None):
|
"""
|
计算参数调整建议
|
|
:param real_time_weight: 实时米重 (Kg/m)
|
:param standard_weight: 标准米重 (Kg/m)
|
:param upper_limit: 米重上限 (Kg/m)
|
:param lower_limit: 米重下限 (Kg/m)
|
:param current_screw_speed: 当前螺杆转速 (rpm)
|
:param current_process_speed: 当前流程主速 (m/min)
|
:param current_screw_temperature: 当前螺杆温度 (°C)
|
:param current_rear_barrel_temperature: 当前后机筒温度 (°C)
|
:param current_front_barrel_temperature: 当前前机筒温度 (°C)
|
:param current_head_temperature: 当前机头温度 (°C)
|
:param coefficients: 自定义参数关系系数 (可选)
|
:param limits: 自定义参数上下限 (可选)
|
:return: 调整建议字典
|
"""
|
# 使用默认系数或自定义系数
|
coeffs = coefficients if coefficients else self.default_coefficients
|
|
# 使用默认上下限或自定义上下限
|
param_limits = limits if limits else self.default_limits
|
|
# 计算米重偏差
|
weight_deviation = real_time_weight - standard_weight
|
deviation_percentage = (weight_deviation / standard_weight) * 100 if standard_weight != 0 else 0
|
|
# 确定米重状态
|
if real_time_weight > upper_limit:
|
status = "超上限"
|
elif real_time_weight < lower_limit:
|
status = "超下限"
|
else:
|
status = "正常范围"
|
|
# 计算温度影响因子 - 考虑温度对挤出过程的影响
|
# 温度越高,物料流动性越好,相同螺杆转速下挤出量越大
|
temperature_factor = 1.0
|
|
# 如果提供了温度参数,计算温度影响因子
|
if current_screw_temperature is not None and current_head_temperature is not None:
|
# 计算平均温度与参考温度的偏差
|
reference_temperature = 80.0 # 参考温度,可根据实际工艺调整
|
avg_temperature = (current_screw_temperature + current_head_temperature) / 2
|
temperature_deviation = avg_temperature - reference_temperature
|
|
# 温度偏差每变化10°C,影响因子变化5%
|
temperature_factor = 1.0 + (temperature_deviation / 10.0) * 0.05
|
|
# 限制温度影响因子的范围
|
temperature_factor = max(0.8, min(1.2, temperature_factor))
|
|
# 计算调整量 - 考虑温度影响因子
|
# 米重偏差 * 系数 * 温度因子 = 调整百分比
|
screw_speed_adjustment_percent = -deviation_percentage * coeffs['screw_speed'] * temperature_factor
|
process_speed_adjustment_percent = -deviation_percentage * coeffs['process_main_speed'] * temperature_factor
|
|
# 限制调整幅度,避免调整量过大
|
screw_speed_adjustment_percent = max(
|
-self.max_adjustment_percentage['screw_speed'],
|
min(self.max_adjustment_percentage['screw_speed'],
|
screw_speed_adjustment_percent)
|
)
|
|
process_speed_adjustment_percent = max(
|
-self.max_adjustment_percentage['process_main_speed'],
|
min(self.max_adjustment_percentage['process_main_speed'],
|
process_speed_adjustment_percent)
|
)
|
|
# 计算实际调整量
|
screw_speed_adjustment = (screw_speed_adjustment_percent / 100) * current_screw_speed
|
process_speed_adjustment = (process_speed_adjustment_percent / 100) * current_process_speed
|
|
# 计算调整后的参数值
|
new_screw_speed = current_screw_speed + screw_speed_adjustment
|
new_process_speed = current_process_speed + process_speed_adjustment
|
|
# 确保参数在合理范围内
|
new_screw_speed = max(param_limits['screw_speed']['min'],
|
min(param_limits['screw_speed']['max'], new_screw_speed))
|
|
new_process_speed = max(param_limits['process_main_speed']['min'],
|
min(param_limits['process_main_speed']['max'], new_process_speed))
|
|
# 重新计算实际调整百分比(考虑了最大调整幅度和参数上下限)
|
screw_speed_adjust_percent = ((new_screw_speed - current_screw_speed) / current_screw_speed) * 100 if current_screw_speed != 0 else 0
|
process_speed_adjust_percent = ((new_process_speed - current_process_speed) / current_process_speed) * 100 if current_process_speed != 0 else 0
|
|
# 生成调整建议 - 无论米重是否在范围内,都提供调整建议,目标是让实时米重尽可能接近标准米重
|
recommendation = self._generate_recommendation(
|
deviation_percentage,
|
screw_speed_adjust_percent,
|
new_screw_speed,
|
process_speed_adjust_percent,
|
new_process_speed
|
)
|
|
return {
|
'status': status,
|
'real_time_weight': real_time_weight,
|
'standard_weight': standard_weight,
|
'upper_limit': upper_limit,
|
'lower_limit': lower_limit,
|
'deviation': weight_deviation,
|
'deviation_percentage': deviation_percentage,
|
'current_screw_speed': current_screw_speed,
|
'current_process_speed': current_process_speed,
|
'new_screw_speed': new_screw_speed,
|
'new_process_speed': new_process_speed,
|
'screw_speed_adjustment': screw_speed_adjustment,
|
'process_speed_adjustment': process_speed_adjustment,
|
'screw_speed_adjust_percent': screw_speed_adjust_percent,
|
'process_speed_adjust_percent': process_speed_adjust_percent,
|
'recommendation': recommendation
|
}
|
|
def _generate_recommendation(self, deviation_percentage, screw_speed_adjust_percent,
|
new_screw_speed, process_speed_adjust_percent, new_process_speed):
|
"""
|
生成调整建议文本
|
|
:param deviation_percentage: 米重偏差百分比
|
:param screw_speed_adjust_percent: 螺杆转速调整百分比
|
:param new_screw_speed: 调整后的螺杆转速
|
:param process_speed_adjust_percent: 流程主速调整百分比
|
:param new_process_speed: 调整后的流程主速
|
:return: 调整建议文本
|
"""
|
# 根据偏差方向和大小生成建议
|
abs_deviation = abs(deviation_percentage)
|
|
if abs_deviation < 0.5:
|
# 偏差很小,接近标准值
|
return f"米重偏差很小 ({abs_deviation:.2f}%),接近标准值。\n" \
|
f"建议微调以进一步接近标准值:\n" \
|
f"1. 将螺杆转速调整至 {new_screw_speed:.1f} rpm " \
|
f"( {'降低' if screw_speed_adjust_percent < 0 else '提高'} {abs(screw_speed_adjust_percent):.2f}% )\n" \
|
f"2. 将流程主速调整至 {new_process_speed:.1f} m/min " \
|
f"( {'降低' if process_speed_adjust_percent < 0 else '提高'} {abs(process_speed_adjust_percent):.2f}% )"
|
elif deviation_percentage > 0:
|
# 米重偏高,需要降低米重
|
return f"米重偏差 {abs_deviation:.2f}%(偏高),建议调整:\n" \
|
f"1. 将螺杆转速从当前值调整至 {new_screw_speed:.1f} rpm " \
|
f"( {'降低' if screw_speed_adjust_percent < 0 else '提高'} {abs(screw_speed_adjust_percent):.2f}% )\n" \
|
f"2. 将流程主速从当前值调整至 {new_process_speed:.1f} m/min " \
|
f"( {'降低' if process_speed_adjust_percent < 0 else '提高'} {abs(process_speed_adjust_percent):.2f}% )"
|
else:
|
# 米重偏低,需要提高米重
|
return f"米重偏差 {abs_deviation:.2f}%(偏低),建议调整:\n" \
|
f"1. 将螺杆转速从当前值调整至 {new_screw_speed:.1f} rpm " \
|
f"( {'降低' if screw_speed_adjust_percent < 0 else '提高'} {abs(screw_speed_adjust_percent):.2f}% )\n" \
|
f"2. 将流程主速从当前值调整至 {new_process_speed:.1f} m/min " \
|
f"( {'降低' if process_speed_adjust_percent < 0 else '提高'} {abs(process_speed_adjust_percent):.2f}% )"
|
|
def predict_weight(self, model_info, screw_speed, head_pressure, process_speed,
|
screw_temperature, rear_barrel_temperature,
|
front_barrel_temperature, head_temperature):
|
"""
|
使用模型预测米重
|
|
:param model_info: 包含模型和缩放器的模型信息字典
|
:param screw_speed: 螺杆转速 (rpm)
|
:param head_pressure: 机头压力 (bar)
|
:param process_speed: 流程主速 (m/min)
|
:param screw_temperature: 螺杆温度 (°C)
|
:param rear_barrel_temperature: 后机筒温度 (°C)
|
:param front_barrel_temperature: 前机筒温度 (°C)
|
:param head_temperature: 机头温度 (°C)
|
:return: 预测的米重值 (Kg/m)
|
"""
|
try:
|
# 获取模型需要的特征
|
required_features = model_info['features']
|
|
# 准备输入数据,保持与模型特征顺序一致
|
input_features = {
|
'螺杆转速': screw_speed,
|
'机头压力': head_pressure,
|
'流程主速': process_speed,
|
'螺杆温度': screw_temperature,
|
'后机筒温度': rear_barrel_temperature,
|
'前机筒温度': front_barrel_temperature,
|
'机头温度': head_temperature
|
}
|
|
# 根据模型特征创建输入DataFrame
|
input_df = pd.DataFrame([input_features])[required_features]
|
|
# 初始化预测结果
|
predicted_weight = None
|
|
# 获取模型
|
model = model_info['model']
|
|
# 根据模型类型执行不同的预测逻辑
|
if model_info['model_type'] in ['LSTM', 'GRU', 'BiLSTM']:
|
# 深度学习模型预测
|
if torch is None:
|
print("PyTorch not available, cannot predict with deep learning models")
|
return None
|
|
# 数据标准化
|
scaler_X = model_info['scaler_X']
|
scaler_y = model_info['scaler_y']
|
input_scaled = scaler_X.transform(input_df)
|
|
# 获取序列长度
|
sequence_length = model_info['sequence_length']
|
|
# 为深度学习模型创建序列
|
input_seq = np.tile(input_scaled, (sequence_length, 1)).reshape(1, sequence_length, -1)
|
|
# 转换为PyTorch张量
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
input_tensor = torch.tensor(input_seq, dtype=torch.float32).to(device)
|
|
# 预测
|
model.eval()
|
with torch.no_grad():
|
y_pred_scaled_tensor = model(input_tensor)
|
y_pred_scaled = y_pred_scaled_tensor.cpu().numpy().ravel()[0]
|
|
# 反归一化
|
predicted_weight = scaler_y.inverse_transform(np.array([[y_pred_scaled]]))[0][0]
|
|
elif model_info['model_type'] in ['SVR', 'MLP', 'GradientBoosting']:
|
# 需要特征缩放的模型
|
scaler_X = model_info['scaler_X']
|
scaler_y = model_info['scaler_y']
|
input_scaled = scaler_X.transform(input_df)
|
|
# 预测
|
y_pred_scaled = model.predict(input_scaled)[0]
|
|
# 反归一化
|
predicted_weight = scaler_y.inverse_transform(np.array([[y_pred_scaled]]))[0][0]
|
|
else:
|
# 其他模型(如随机森林、线性回归等)
|
predicted_weight = model.predict(input_df)[0]
|
|
return predicted_weight
|
except Exception as e:
|
print(f"模型预测失败: {e}")
|
import traceback
|
traceback.print_exc()
|
return None
|
def iterative_adjustment(self, initial_params, model_info, max_iterations=5, tolerance=0.5):
|
"""
|
迭代调整参数,直到预测米重满足偏差要求
|
|
:param initial_params: 初始参数字典,包含:
|
- real_time_weight: 实时米重
|
- standard_weight: 标准米重
|
- upper_limit: 米重上限
|
- lower_limit: 米重下限
|
- current_screw_speed: 当前螺杆转速
|
- current_process_speed: 当前流程主速
|
- current_screw_temperature: 螺杆温度
|
- current_rear_barrel_temperature: 后机筒温度
|
- current_front_barrel_temperature: 前机筒温度
|
- current_head_temperature: 机头温度
|
- current_head_pressure: 机头压力
|
:param model_info: 模型信息字典
|
:param max_iterations: 最大迭代次数
|
:param tolerance: 允许的米重偏差百分比阈值
|
:return: 迭代调整结果字典,包含:
|
- final_result: 最终调整结果
|
- iteration_history: 每次迭代的历史记录
|
- converged: 是否收敛
|
"""
|
iteration_history = []
|
converged = False
|
|
# 保存初始参数,用于最终结果计算
|
original_params = initial_params.copy()
|
|
# 初始化当前参数为初始参数
|
current_params = initial_params.copy()
|
|
for i in range(max_iterations):
|
# 计算当前迭代的调整建议
|
adjustment_result = self.calculate_adjustment(
|
real_time_weight=current_params['real_time_weight'],
|
standard_weight=current_params['standard_weight'],
|
upper_limit=current_params['upper_limit'],
|
lower_limit=current_params['lower_limit'],
|
current_screw_speed=current_params['current_screw_speed'],
|
current_process_speed=current_params['current_process_speed'],
|
current_screw_temperature=current_params['current_screw_temperature'],
|
current_rear_barrel_temperature=current_params['current_rear_barrel_temperature'],
|
current_front_barrel_temperature=current_params['current_front_barrel_temperature'],
|
current_head_temperature=current_params['current_head_temperature']
|
)
|
|
# 使用模型预测调整后的米重
|
predicted_weight = self.predict_weight(
|
model_info=model_info,
|
screw_speed=adjustment_result['new_screw_speed'],
|
head_pressure=current_params['current_head_pressure'],
|
process_speed=adjustment_result['new_process_speed'],
|
screw_temperature=current_params['current_screw_temperature'],
|
rear_barrel_temperature=current_params['current_rear_barrel_temperature'],
|
front_barrel_temperature=current_params['current_front_barrel_temperature'],
|
head_temperature=current_params['current_head_temperature']
|
)
|
|
# 检查模型预测是否成功
|
if predicted_weight is None:
|
print(f"模型预测失败,终止迭代调整")
|
iteration_history.append({
|
'iteration': i + 1,
|
'current_screw_speed': current_params['current_screw_speed'],
|
'current_process_speed': current_params['current_process_speed'],
|
'adjusted_screw_speed': adjustment_result['new_screw_speed'],
|
'adjusted_process_speed': adjustment_result['new_process_speed'],
|
'predicted_weight': None,
|
'predicted_deviation': None,
|
'predicted_deviation_percent': None,
|
'screw_speed_adjustment': adjustment_result['screw_speed_adjustment'],
|
'process_speed_adjustment': adjustment_result['process_speed_adjustment']
|
})
|
break
|
|
# 计算预测偏差
|
predicted_deviation = predicted_weight - current_params['standard_weight']
|
predicted_deviation_percent = (predicted_deviation / current_params['standard_weight']) * 100
|
|
# 保存迭代历史
|
iteration_history.append({
|
'iteration': i + 1,
|
'current_screw_speed': current_params['current_screw_speed'],
|
'current_process_speed': current_params['current_process_speed'],
|
'adjusted_screw_speed': adjustment_result['new_screw_speed'],
|
'adjusted_process_speed': adjustment_result['new_process_speed'],
|
'predicted_weight': predicted_weight,
|
'predicted_deviation': predicted_deviation,
|
'predicted_deviation_percent': predicted_deviation_percent,
|
'screw_speed_adjustment': adjustment_result['screw_speed_adjustment'],
|
'process_speed_adjustment': adjustment_result['process_speed_adjustment']
|
})
|
|
# 检查是否收敛
|
if abs(predicted_deviation_percent) <= tolerance:
|
converged = True
|
break
|
|
# 更新当前参数,准备下一次迭代
|
current_params.update({
|
'real_time_weight': predicted_weight,
|
'current_screw_speed': adjustment_result['new_screw_speed'],
|
'current_process_speed': adjustment_result['new_process_speed']
|
})
|
|
# 获取最终调整后的参数
|
final_screw_speed = iteration_history[-1]['adjusted_screw_speed']
|
final_process_speed = iteration_history[-1]['adjusted_process_speed']
|
final_predicted_weight = iteration_history[-1]['predicted_weight']
|
|
# 计算从初始参数到最终参数的总调整
|
# 创建一个包含初始参数的调整结果
|
final_result = {
|
'status': '正常范围' if abs((final_predicted_weight - original_params['standard_weight']) / original_params['standard_weight'] * 100) <= tolerance else '超上限' if final_predicted_weight > original_params['upper_limit'] else '超下限',
|
'real_time_weight': original_params['real_time_weight'], # 初始实时米重
|
'standard_weight': original_params['standard_weight'],
|
'upper_limit': original_params['upper_limit'],
|
'lower_limit': original_params['lower_limit'],
|
'deviation': original_params['real_time_weight'] - original_params['standard_weight'], # 初始偏差
|
'deviation_percentage': (original_params['real_time_weight'] - original_params['standard_weight']) / original_params['standard_weight'] * 100, # 初始偏差百分比
|
'current_screw_speed': original_params['current_screw_speed'], # 初始螺杆转速
|
'current_process_speed': original_params['current_process_speed'], # 初始流程主速
|
'new_screw_speed': final_screw_speed, # 最终调整后的螺杆转速
|
'new_process_speed': final_process_speed, # 最终调整后的流程主速
|
'screw_speed_adjustment': final_screw_speed - original_params['current_screw_speed'], # 总调整量
|
'process_speed_adjustment': final_process_speed - original_params['current_process_speed'], # 总调整量
|
'screw_speed_adjust_percent': ((final_screw_speed - original_params['current_screw_speed']) / original_params['current_screw_speed']) * 100 if original_params['current_screw_speed'] != 0 else 0, # 总调整百分比
|
'process_speed_adjust_percent': ((final_process_speed - original_params['current_process_speed']) / original_params['current_process_speed']) * 100 if original_params['current_process_speed'] != 0 else 0, # 总调整百分比
|
'predicted_weight': final_predicted_weight # 最终预测米重
|
}
|
|
# 生成调整建议文本
|
final_deviation_percent = (final_predicted_weight - original_params['standard_weight']) / original_params['standard_weight'] * 100
|
final_result['recommendation'] = self._generate_recommendation(
|
final_deviation_percent,
|
final_result['screw_speed_adjust_percent'],
|
final_screw_speed,
|
final_result['process_speed_adjust_percent'],
|
final_process_speed
|
)
|
|
# 添加温度相关参数(用于预测)
|
final_result['current_screw_temperature'] = original_params['current_screw_temperature']
|
final_result['current_rear_barrel_temperature'] = original_params['current_rear_barrel_temperature']
|
final_result['current_front_barrel_temperature'] = original_params['current_front_barrel_temperature']
|
final_result['current_head_temperature'] = original_params['current_head_temperature']
|
|
# 添加最终预测相关信息
|
final_predicted_deviation = final_predicted_weight - original_params['standard_weight']
|
final_predicted_deviation_percent = (final_predicted_deviation / original_params['standard_weight']) * 100
|
final_result['final_predicted_deviation'] = final_predicted_deviation
|
final_result['final_predicted_deviation_percent'] = final_predicted_deviation_percent
|
|
return {
|
'final_result': final_result,
|
'iteration_history': iteration_history,
|
'converged': converged,
|
'total_iterations': len(iteration_history),
|
'initial_params': original_params # 保存初始参数,用于结果展示
|
}
|
def analyze_historical_adjustments(self, df):
|
"""
|
分析历史调整数据,优化调整系数
|
|
:param df: 包含历史调整数据的DataFrame,需要包含以下列:
|
- real_time_weight: 实时米重
|
- standard_weight: 标准米重
|
- current_screw_speed: 当前螺杆转速
|
- current_process_speed: 当前流程主速
|
- adjusted_screw_speed: 调整后的螺杆转速
|
- adjusted_process_speed: 调整后的流程主速
|
- result_weight: 调整后的米重
|
:return: 优化后的系数
|
"""
|
if df is None or df.empty:
|
return self.default_coefficients
|
|
try:
|
# 计算米重偏差
|
df['weight_deviation'] = df['real_time_weight'] - df['standard_weight']
|
df['deviation_percentage'] = (df['weight_deviation'] / df['standard_weight']) * 100
|
|
# 计算参数调整百分比
|
df['screw_speed_adjust_percent'] = ((df['adjusted_screw_speed'] - df['current_screw_speed']) / df['current_screw_speed']) * 100
|
df['process_speed_adjust_percent'] = ((df['adjusted_process_speed'] - df['current_process_speed']) / df['current_process_speed']) * 100
|
|
# 计算调整效果
|
df['adjustment_effect'] = df['result_weight'] - df['real_time_weight']
|
|
# 使用线性回归计算优化系数
|
# 系数 = 参数调整百分比 / 米重偏差百分比
|
screw_speed_coeff = df['screw_speed_adjust_percent'].sum() / df['deviation_percentage'].sum() if df['deviation_percentage'].sum() != 0 else self.default_coefficients['screw_speed']
|
process_speed_coeff = df['process_speed_adjust_percent'].sum() / df['deviation_percentage'].sum() if df['deviation_percentage'].sum() != 0 else self.default_coefficients['process_main_speed']
|
|
# 取绝对值,确保方向正确
|
optimized_coeffs = {
|
'screw_speed': abs(screw_speed_coeff),
|
'process_main_speed': -abs(process_speed_coeff) # 流程主速与米重负相关
|
}
|
|
return optimized_coeffs
|
except Exception as e:
|
print(f"分析历史调整数据失败: {e}")
|
return self.default_coefficients
|