import pandas as pd import numpy as np # 尝试导入torch,如果失败则标记为不可用 try: import torch except ImportError: torch = None class ParameterAdjustmentAdvisor: """ 挤出机参数调节建议器 根据实时米重与标准米重的偏差,给出螺杆转速和流程主速的调整建议 """ def __init__(self): # 初始化默认参数关系系数 # 这些系数表示米重偏差1%时,参数需要调整的百分比 # 可以根据实际生产数据进行优化 self.default_coefficients = { 'screw_speed': 0.1, # 米重偏差1%,螺杆转速调整0.3%(降低调整幅度) 'process_main_speed': -0.1 # 米重偏差1%,流程主速调整-0.2%(降低调整幅度) } # 默认参数上下限(可根据实际设备情况调整) self.default_limits = { 'screw_speed': {'min': 30, 'max': 500}, 'process_main_speed': {'min': 0, 'max': 200} } # 最大调整幅度限制(百分比) self.max_adjustment_percentage = { 'screw_speed': 15.0, # 螺杆转速单次最大调整15% 'process_main_speed': 10.0 # 流程主速单次最大调整10% } def calculate_adjustment(self, real_time_weight, standard_weight, upper_limit, lower_limit, current_screw_speed, current_process_speed, current_screw_temperature=None, current_rear_barrel_temperature=None, current_front_barrel_temperature=None, current_head_temperature=None, coefficients=None, limits=None): """ 计算参数调整建议 :param real_time_weight: 实时米重 (Kg/m) :param standard_weight: 标准米重 (Kg/m) :param upper_limit: 米重上限 (Kg/m) :param lower_limit: 米重下限 (Kg/m) :param current_screw_speed: 当前螺杆转速 (rpm) :param current_process_speed: 当前流程主速 (m/min) :param current_screw_temperature: 当前螺杆温度 (°C) :param current_rear_barrel_temperature: 当前后机筒温度 (°C) :param current_front_barrel_temperature: 当前前机筒温度 (°C) :param current_head_temperature: 当前机头温度 (°C) :param coefficients: 自定义参数关系系数 (可选) :param limits: 自定义参数上下限 (可选) :return: 调整建议字典 """ # 使用默认系数或自定义系数 coeffs = coefficients if coefficients else self.default_coefficients # 使用默认上下限或自定义上下限 param_limits = limits if limits else self.default_limits # 计算米重偏差 weight_deviation = real_time_weight - standard_weight deviation_percentage = (weight_deviation / standard_weight) * 100 if standard_weight != 0 else 0 # 确定米重状态 if real_time_weight > upper_limit: status = "超上限" elif real_time_weight < lower_limit: status = "超下限" else: status = "正常范围" # 计算温度影响因子 - 考虑温度对挤出过程的影响 # 温度越高,物料流动性越好,相同螺杆转速下挤出量越大 temperature_factor = 1.0 # 如果提供了温度参数,计算温度影响因子 if current_screw_temperature is not None and current_head_temperature is not None: # 计算平均温度与参考温度的偏差 reference_temperature = 80.0 # 参考温度,可根据实际工艺调整 avg_temperature = (current_screw_temperature + current_head_temperature) / 2 temperature_deviation = avg_temperature - reference_temperature # 温度偏差每变化10°C,影响因子变化5% temperature_factor = 1.0 + (temperature_deviation / 10.0) * 0.05 # 限制温度影响因子的范围 temperature_factor = max(0.8, min(1.2, temperature_factor)) # 计算调整量 - 考虑温度影响因子 # 米重偏差 * 系数 * 温度因子 = 调整百分比 screw_speed_adjustment_percent = -deviation_percentage * coeffs['screw_speed'] * temperature_factor process_speed_adjustment_percent = -deviation_percentage * coeffs['process_main_speed'] * temperature_factor # 限制调整幅度,避免调整量过大 screw_speed_adjustment_percent = max( -self.max_adjustment_percentage['screw_speed'], min(self.max_adjustment_percentage['screw_speed'], screw_speed_adjustment_percent) ) process_speed_adjustment_percent = max( -self.max_adjustment_percentage['process_main_speed'], min(self.max_adjustment_percentage['process_main_speed'], process_speed_adjustment_percent) ) # 计算实际调整量 screw_speed_adjustment = (screw_speed_adjustment_percent / 100) * current_screw_speed process_speed_adjustment = (process_speed_adjustment_percent / 100) * current_process_speed # 计算调整后的参数值 new_screw_speed = current_screw_speed + screw_speed_adjustment new_process_speed = current_process_speed + process_speed_adjustment # 确保参数在合理范围内 new_screw_speed = max(param_limits['screw_speed']['min'], min(param_limits['screw_speed']['max'], new_screw_speed)) new_process_speed = max(param_limits['process_main_speed']['min'], min(param_limits['process_main_speed']['max'], new_process_speed)) # 重新计算实际调整百分比(考虑了最大调整幅度和参数上下限) screw_speed_adjust_percent = ((new_screw_speed - current_screw_speed) / current_screw_speed) * 100 if current_screw_speed != 0 else 0 process_speed_adjust_percent = ((new_process_speed - current_process_speed) / current_process_speed) * 100 if current_process_speed != 0 else 0 # 生成调整建议 - 无论米重是否在范围内,都提供调整建议,目标是让实时米重尽可能接近标准米重 recommendation = self._generate_recommendation( deviation_percentage, screw_speed_adjust_percent, new_screw_speed, process_speed_adjust_percent, new_process_speed ) return { 'status': status, 'real_time_weight': real_time_weight, 'standard_weight': standard_weight, 'upper_limit': upper_limit, 'lower_limit': lower_limit, 'deviation': weight_deviation, 'deviation_percentage': deviation_percentage, 'current_screw_speed': current_screw_speed, 'current_process_speed': current_process_speed, 'new_screw_speed': new_screw_speed, 'new_process_speed': new_process_speed, 'screw_speed_adjustment': screw_speed_adjustment, 'process_speed_adjustment': process_speed_adjustment, 'screw_speed_adjust_percent': screw_speed_adjust_percent, 'process_speed_adjust_percent': process_speed_adjust_percent, 'recommendation': recommendation } def _generate_recommendation(self, deviation_percentage, screw_speed_adjust_percent, new_screw_speed, process_speed_adjust_percent, new_process_speed): """ 生成调整建议文本 :param deviation_percentage: 米重偏差百分比 :param screw_speed_adjust_percent: 螺杆转速调整百分比 :param new_screw_speed: 调整后的螺杆转速 :param process_speed_adjust_percent: 流程主速调整百分比 :param new_process_speed: 调整后的流程主速 :return: 调整建议文本 """ # 根据偏差方向和大小生成建议 abs_deviation = abs(deviation_percentage) if abs_deviation < 0.5: # 偏差很小,接近标准值 return f"米重偏差很小 ({abs_deviation:.2f}%),接近标准值。\n" \ f"建议微调以进一步接近标准值:\n" \ f"1. 将螺杆转速调整至 {new_screw_speed:.1f} rpm " \ f"( {'降低' if screw_speed_adjust_percent < 0 else '提高'} {abs(screw_speed_adjust_percent):.2f}% )\n" \ f"2. 将流程主速调整至 {new_process_speed:.1f} m/min " \ f"( {'降低' if process_speed_adjust_percent < 0 else '提高'} {abs(process_speed_adjust_percent):.2f}% )" elif deviation_percentage > 0: # 米重偏高,需要降低米重 return f"米重偏差 {abs_deviation:.2f}%(偏高),建议调整:\n" \ f"1. 将螺杆转速从当前值调整至 {new_screw_speed:.1f} rpm " \ f"( {'降低' if screw_speed_adjust_percent < 0 else '提高'} {abs(screw_speed_adjust_percent):.2f}% )\n" \ f"2. 将流程主速从当前值调整至 {new_process_speed:.1f} m/min " \ f"( {'降低' if process_speed_adjust_percent < 0 else '提高'} {abs(process_speed_adjust_percent):.2f}% )" else: # 米重偏低,需要提高米重 return f"米重偏差 {abs_deviation:.2f}%(偏低),建议调整:\n" \ f"1. 将螺杆转速从当前值调整至 {new_screw_speed:.1f} rpm " \ f"( {'降低' if screw_speed_adjust_percent < 0 else '提高'} {abs(screw_speed_adjust_percent):.2f}% )\n" \ f"2. 将流程主速从当前值调整至 {new_process_speed:.1f} m/min " \ f"( {'降低' if process_speed_adjust_percent < 0 else '提高'} {abs(process_speed_adjust_percent):.2f}% )" def predict_weight(self, model_info, screw_speed, head_pressure, process_speed, screw_temperature, rear_barrel_temperature, front_barrel_temperature, head_temperature): """ 使用模型预测米重 :param model_info: 包含模型和缩放器的模型信息字典 :param screw_speed: 螺杆转速 (rpm) :param head_pressure: 机头压力 (bar) :param process_speed: 流程主速 (m/min) :param screw_temperature: 螺杆温度 (°C) :param rear_barrel_temperature: 后机筒温度 (°C) :param front_barrel_temperature: 前机筒温度 (°C) :param head_temperature: 机头温度 (°C) :return: 预测的米重值 (Kg/m) """ try: # 获取模型需要的特征 required_features = model_info['features'] # 准备输入数据,保持与模型特征顺序一致 input_features = { '螺杆转速': screw_speed, '机头压力': head_pressure, '流程主速': process_speed, '螺杆温度': screw_temperature, '后机筒温度': rear_barrel_temperature, '前机筒温度': front_barrel_temperature, '机头温度': head_temperature } # 根据模型特征创建输入DataFrame input_df = pd.DataFrame([input_features])[required_features] # 初始化预测结果 predicted_weight = None # 获取模型 model = model_info['model'] # 根据模型类型执行不同的预测逻辑 if model_info['model_type'] in ['LSTM', 'GRU', 'BiLSTM']: # 深度学习模型预测 if torch is None: print("PyTorch not available, cannot predict with deep learning models") return None # 数据标准化 scaler_X = model_info['scaler_X'] scaler_y = model_info['scaler_y'] input_scaled = scaler_X.transform(input_df) # 获取序列长度 sequence_length = model_info['sequence_length'] # 为深度学习模型创建序列 input_seq = np.tile(input_scaled, (sequence_length, 1)).reshape(1, sequence_length, -1) # 转换为PyTorch张量 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') input_tensor = torch.tensor(input_seq, dtype=torch.float32).to(device) # 预测 model.eval() with torch.no_grad(): y_pred_scaled_tensor = model(input_tensor) y_pred_scaled = y_pred_scaled_tensor.cpu().numpy().ravel()[0] # 反归一化 predicted_weight = scaler_y.inverse_transform(np.array([[y_pred_scaled]]))[0][0] elif model_info['model_type'] in ['SVR', 'MLP', 'GradientBoosting']: # 需要特征缩放的模型 scaler_X = model_info['scaler_X'] scaler_y = model_info['scaler_y'] input_scaled = scaler_X.transform(input_df) # 预测 y_pred_scaled = model.predict(input_scaled)[0] # 反归一化 predicted_weight = scaler_y.inverse_transform(np.array([[y_pred_scaled]]))[0][0] else: # 其他模型(如随机森林、线性回归等) predicted_weight = model.predict(input_df)[0] return predicted_weight except Exception as e: print(f"模型预测失败: {e}") import traceback traceback.print_exc() return None def iterative_adjustment(self, initial_params, model_info, max_iterations=5, tolerance=0.5): """ 迭代调整参数,直到预测米重满足偏差要求 :param initial_params: 初始参数字典,包含: - real_time_weight: 实时米重 - standard_weight: 标准米重 - upper_limit: 米重上限 - lower_limit: 米重下限 - current_screw_speed: 当前螺杆转速 - current_process_speed: 当前流程主速 - current_screw_temperature: 螺杆温度 - current_rear_barrel_temperature: 后机筒温度 - current_front_barrel_temperature: 前机筒温度 - current_head_temperature: 机头温度 - current_head_pressure: 机头压力 :param model_info: 模型信息字典 :param max_iterations: 最大迭代次数 :param tolerance: 允许的米重偏差百分比阈值 :return: 迭代调整结果字典,包含: - final_result: 最终调整结果 - iteration_history: 每次迭代的历史记录 - converged: 是否收敛 """ iteration_history = [] converged = False # 保存初始参数,用于最终结果计算 original_params = initial_params.copy() # 初始化当前参数为初始参数 current_params = initial_params.copy() for i in range(max_iterations): # 计算当前迭代的调整建议 adjustment_result = self.calculate_adjustment( real_time_weight=current_params['real_time_weight'], standard_weight=current_params['standard_weight'], upper_limit=current_params['upper_limit'], lower_limit=current_params['lower_limit'], current_screw_speed=current_params['current_screw_speed'], current_process_speed=current_params['current_process_speed'], current_screw_temperature=current_params['current_screw_temperature'], current_rear_barrel_temperature=current_params['current_rear_barrel_temperature'], current_front_barrel_temperature=current_params['current_front_barrel_temperature'], current_head_temperature=current_params['current_head_temperature'] ) # 使用模型预测调整后的米重 predicted_weight = self.predict_weight( model_info=model_info, screw_speed=adjustment_result['new_screw_speed'], head_pressure=current_params['current_head_pressure'], process_speed=adjustment_result['new_process_speed'], screw_temperature=current_params['current_screw_temperature'], rear_barrel_temperature=current_params['current_rear_barrel_temperature'], front_barrel_temperature=current_params['current_front_barrel_temperature'], head_temperature=current_params['current_head_temperature'] ) # 检查模型预测是否成功 if predicted_weight is None: print(f"模型预测失败,终止迭代调整") iteration_history.append({ 'iteration': i + 1, 'current_screw_speed': current_params['current_screw_speed'], 'current_process_speed': current_params['current_process_speed'], 'adjusted_screw_speed': adjustment_result['new_screw_speed'], 'adjusted_process_speed': adjustment_result['new_process_speed'], 'predicted_weight': None, 'predicted_deviation': None, 'predicted_deviation_percent': None, 'screw_speed_adjustment': adjustment_result['screw_speed_adjustment'], 'process_speed_adjustment': adjustment_result['process_speed_adjustment'] }) break # 计算预测偏差 predicted_deviation = predicted_weight - current_params['standard_weight'] predicted_deviation_percent = (predicted_deviation / current_params['standard_weight']) * 100 # 保存迭代历史 iteration_history.append({ 'iteration': i + 1, 'current_screw_speed': current_params['current_screw_speed'], 'current_process_speed': current_params['current_process_speed'], 'adjusted_screw_speed': adjustment_result['new_screw_speed'], 'adjusted_process_speed': adjustment_result['new_process_speed'], 'predicted_weight': predicted_weight, 'predicted_deviation': predicted_deviation, 'predicted_deviation_percent': predicted_deviation_percent, 'screw_speed_adjustment': adjustment_result['screw_speed_adjustment'], 'process_speed_adjustment': adjustment_result['process_speed_adjustment'] }) # 检查是否收敛 if abs(predicted_deviation_percent) <= tolerance: converged = True break # 更新当前参数,准备下一次迭代 current_params.update({ 'real_time_weight': predicted_weight, 'current_screw_speed': adjustment_result['new_screw_speed'], 'current_process_speed': adjustment_result['new_process_speed'] }) # 获取最终调整后的参数 final_screw_speed = iteration_history[-1]['adjusted_screw_speed'] final_process_speed = iteration_history[-1]['adjusted_process_speed'] final_predicted_weight = iteration_history[-1]['predicted_weight'] # 计算从初始参数到最终参数的总调整 # 创建一个包含初始参数的调整结果 final_result = { 'status': '正常范围' if abs((final_predicted_weight - original_params['standard_weight']) / original_params['standard_weight'] * 100) <= tolerance else '超上限' if final_predicted_weight > original_params['upper_limit'] else '超下限', 'real_time_weight': original_params['real_time_weight'], # 初始实时米重 'standard_weight': original_params['standard_weight'], 'upper_limit': original_params['upper_limit'], 'lower_limit': original_params['lower_limit'], 'deviation': original_params['real_time_weight'] - original_params['standard_weight'], # 初始偏差 'deviation_percentage': (original_params['real_time_weight'] - original_params['standard_weight']) / original_params['standard_weight'] * 100, # 初始偏差百分比 'current_screw_speed': original_params['current_screw_speed'], # 初始螺杆转速 'current_process_speed': original_params['current_process_speed'], # 初始流程主速 'new_screw_speed': final_screw_speed, # 最终调整后的螺杆转速 'new_process_speed': final_process_speed, # 最终调整后的流程主速 'screw_speed_adjustment': final_screw_speed - original_params['current_screw_speed'], # 总调整量 'process_speed_adjustment': final_process_speed - original_params['current_process_speed'], # 总调整量 'screw_speed_adjust_percent': ((final_screw_speed - original_params['current_screw_speed']) / original_params['current_screw_speed']) * 100 if original_params['current_screw_speed'] != 0 else 0, # 总调整百分比 'process_speed_adjust_percent': ((final_process_speed - original_params['current_process_speed']) / original_params['current_process_speed']) * 100 if original_params['current_process_speed'] != 0 else 0, # 总调整百分比 'predicted_weight': final_predicted_weight # 最终预测米重 } # 生成调整建议文本 final_deviation_percent = (final_predicted_weight - original_params['standard_weight']) / original_params['standard_weight'] * 100 final_result['recommendation'] = self._generate_recommendation( final_deviation_percent, final_result['screw_speed_adjust_percent'], final_screw_speed, final_result['process_speed_adjust_percent'], final_process_speed ) # 添加温度相关参数(用于预测) final_result['current_screw_temperature'] = original_params['current_screw_temperature'] final_result['current_rear_barrel_temperature'] = original_params['current_rear_barrel_temperature'] final_result['current_front_barrel_temperature'] = original_params['current_front_barrel_temperature'] final_result['current_head_temperature'] = original_params['current_head_temperature'] # 添加最终预测相关信息 final_predicted_deviation = final_predicted_weight - original_params['standard_weight'] final_predicted_deviation_percent = (final_predicted_deviation / original_params['standard_weight']) * 100 final_result['final_predicted_deviation'] = final_predicted_deviation final_result['final_predicted_deviation_percent'] = final_predicted_deviation_percent return { 'final_result': final_result, 'iteration_history': iteration_history, 'converged': converged, 'total_iterations': len(iteration_history), 'initial_params': original_params # 保存初始参数,用于结果展示 } def analyze_historical_adjustments(self, df): """ 分析历史调整数据,优化调整系数 :param df: 包含历史调整数据的DataFrame,需要包含以下列: - real_time_weight: 实时米重 - standard_weight: 标准米重 - current_screw_speed: 当前螺杆转速 - current_process_speed: 当前流程主速 - adjusted_screw_speed: 调整后的螺杆转速 - adjusted_process_speed: 调整后的流程主速 - result_weight: 调整后的米重 :return: 优化后的系数 """ if df is None or df.empty: return self.default_coefficients try: # 计算米重偏差 df['weight_deviation'] = df['real_time_weight'] - df['standard_weight'] df['deviation_percentage'] = (df['weight_deviation'] / df['standard_weight']) * 100 # 计算参数调整百分比 df['screw_speed_adjust_percent'] = ((df['adjusted_screw_speed'] - df['current_screw_speed']) / df['current_screw_speed']) * 100 df['process_speed_adjust_percent'] = ((df['adjusted_process_speed'] - df['current_process_speed']) / df['current_process_speed']) * 100 # 计算调整效果 df['adjustment_effect'] = df['result_weight'] - df['real_time_weight'] # 使用线性回归计算优化系数 # 系数 = 参数调整百分比 / 米重偏差百分比 screw_speed_coeff = df['screw_speed_adjust_percent'].sum() / df['deviation_percentage'].sum() if df['deviation_percentage'].sum() != 0 else self.default_coefficients['screw_speed'] process_speed_coeff = df['process_speed_adjust_percent'].sum() / df['deviation_percentage'].sum() if df['deviation_percentage'].sum() != 0 else self.default_coefficients['process_main_speed'] # 取绝对值,确保方向正确 optimized_coeffs = { 'screw_speed': abs(screw_speed_coeff), 'process_main_speed': -abs(process_speed_coeff) # 流程主速与米重负相关 } return optimized_coeffs except Exception as e: print(f"分析历史调整数据失败: {e}") return self.default_coefficients