解决方案
智慧农业大棚环境监测与自动调节系统解决方案
A. 整体架构图与设计图
1. 系统整体架构图
graph TB
%% 感知层
A1[温湿度传感器] -->|有线连接| B1[数据采集器1]
A2[光照传感器] -->|ZigBee无线| B2[协调器]
%% 传输层
B1 -->|RS485/Modbus| C[网关]
B2 -->|ZigBee| C
C -->|以太网/WiFi| D[电脑监控中心]
%% 控制层
C -->|控制信号| E1[卷膜电机]
C -->|控制信号| E2[补光灯]
C -->|控制信号| E3[声光报警器]
%% 应用层
D --> F[Web可视化界面]
D --> G[移动APP]
D --> H[数据存储与分析]
%% 样式定义
class A1,A2 sensor;
class B1,B2 collector;
class C gateway;
class D computer;
class E1,E2,E3 actuator;
classDef sensor fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
classDef collector fill:#f3e5f5,stroke:#4a148c,stroke-width:2px;
classDef gateway fill:#fff3e0,stroke:#e65100,stroke-width:2px;
classDef computer fill:#e8f5e8,stroke:#1b5e20,stroke-width:2px;
classDef actuator fill:#ffebee,stroke:#b71c1c,stroke-width:2px;
```**架构说明:**
- **感知层**:温湿度传感器(有线)+ 光照传感器(无线)
- **传输层**:数据采集器 + 协调器 + 网关组成的数据传输网络
- **控制层**:网关作为边缘计算节点,执行自动控制逻辑
- **应用层**:电脑端实现数据可视化、分析和远程监控
### 2. 温湿度超标自动报警业务流程图
graph TD
Start[系统启动] --> Init[设备初始化]
Init --> Net[组建传感器网络]
Net --> LoopStart[开始监控循环]
LoopStart --> ReadTemp[读取温湿度数据]
ReadTemp --> Validate[数据有效性验证]
Validate --> CheckTemp{温度>28°C?}
CheckTemp -->|是| HighTemp[高温报警处理]
CheckTemp -->|否| CheckHumi{湿度>85%?}
CheckHumi -->|是| HighHumi[高湿报警处理]
CheckHumi -->|否| Normal[正常数据记录]
HighTemp --> AlertProc[报警处理流程]
HighHumi --> AlertProc
AlertProc --> Display[网关屏幕显示报警]
Display --> Log[记录报警事件]
Log --> Notify[通知电脑监控端]
Normal --> UpdateDisplay[更新正常显示]
UpdateDisplay --> Wait[等待5秒]
Notify --> Wait
Wait --> LoopStart
%% 子流程定义
subgraph AlertProc [报警处理流程]
A1[确定报警级别] --> A2[触发声光报警]
A2 --> A3[执行自动控制]
A3 --> A4[生成报警记录]
end- **数据验证**:检查传感器读数是否在合理范围内(-10°C~60°C,0%~100%)
- **多级判断**:支持温度、湿度的独立阈值判断
- **报警分级**:可根据超标程度设置不同报警级别
- **闭环控制**:报警后自动执行相应的控制动作
## B. 最小单元的设计验证和数据采集验证
### 1. 硬件连接与配置步骤
#### 物理连接实施
步骤1:温湿度传感器连接
├─ 传感器电源线(红) → 数据采集器1的VCC(5V)
├─ 传感器地线(黑) → 数据采集器1的GND
├─ 传感器数据线(绿) → 数据采集器1的DATA1端口
└─ 检查连接牢固性
步骤2:数据采集器网络连接
├─ 数据采集器1的RS485-A → 网关RS485接口A
├─ 数据采集器1的RS485-B → 网关RS485接口B
└─ 接通12V直流电源
步骤3:网关与电脑连接
├─ 网线连接网关ETH口和电脑网口
├─ 设置网关IP:192.168.1.100
├─ 设置电脑IP:192.168.1.101(同网段)
└─ 测试网络连通性
gateway_config.py - 网关和设备配置脚本
import serial
import time
import json
class DeviceConfigurator:
def init(self):
self.serial_port = '/dev/ttyUSB0'
self.baud_rate = 9600
def configure_data_collector(self):
"""配置数据采集器参数"""
config_commands = [
'AT+ADDR=01', # 设置设备地址01
'AT+INTERVAL=5000', # 采样间隔5秒
'AT+FORMAT=JSON', # 数据格式JSON
'AT+SAVE' # 保存配置
]
try:
with serial.Serial(self.serial_port, self.baud_rate, timeout=1) as ser:
for cmd in config_commands:
ser.write(f"{cmd}\r\n".encode())
time.sleep(0.5)
response = ser.readline().decode().strip()
print(f"发送: {cmd} -> 响应: {response}")
if 'OK' not in response:
raise Exception(f"配置失败: {response}")
print("数据采集器配置成功")
return True
except Exception as e:
print(f"配置错误: {e}")
return False
def test_sensor_communication(self):
"""测试传感器通信"""
test_commands = ['AT+READ=01', 'AT+STATUS']
try:
with serial.Serial(self.serial_port, self.baud_rate, timeout=2) as ser:
for cmd in test_commands:
ser.write(f"{cmd}\r\n".encode())
response = ser.readline().decode().strip()
if 'TEMP' in response or 'HUMI' in response:
# 解析传感器数据
data = self.parse_sensor_data(response)
print(f"传感器测试成功: 温度={data['temp']}°C, 湿度={data['humi']}%")
return True
elif 'OK' in response:
print("通信正常")
continue
return False
except Exception as e:
print(f"通信测试失败: {e}")
return False
def parse_sensor_data(self, response):
"""解析传感器数据响应"""
# 示例响应: "TEMP=25.6,HUMI=65.2"
try:
parts = response.split(',')
temp = float(parts[0].split('=')[1])
humi = float(parts[1].split('=')[1])
return {'temp': temp, 'humi': humi}
except:
return {'temp': 0, 'humi': 0}执行配置
if name == "main":
configurator = DeviceConfigurator()
if configurator.configure_data_collector():
print("开始传感器通信测试...")
configurator.test_sensor_communication()
gateway_monitor.py - 网关端监控主程序
import time
import json
import serial
from datetime import datetime
import requests
class GreenhouseMonitor:
def init(self):
硬件配置
self.serial_port = '/dev/ttyUSB0'
self.baud_rate = 9600
# 报警阈值
self.temp_threshold = 28.0 # 高温阈值
self.humi_threshold = 85.0 # 高湿阈值
# 监控参数
self.read_interval = 5 # 读取间隔5秒
self.alert_count = 0
self.data_history = []
def read_sensor_data(self):
"""从数据采集器读取温湿度数据"""
try:
with serial.Serial(self.serial_port, self.baud_rate, timeout=2) as ser:
ser.write(b"AT+READ=01\r\n")
response = ser.readline().decode().strip()
if response:
return self.parse_sensor_response(response)
else:
return None, "传感器无响应"
except Exception as e:
return None, f"读取错误: {str(e)}"
def parse_sensor_response(self, response):
"""解析传感器响应数据"""
try:
if 'TEMP' in response and 'HUMI' in response:
# 格式: TEMP=25.6,HUMI=65.2
parts = response.split(',')
temp = float(parts[0].split('=')[1])
humi = float(parts[1].split('=')[1])
# 数据合理性验证
if self.validate_sensor_data(temp, humi):
return {'temp': temp, 'humi': humi, 'timestamp': datetime.now()}, None
else:
return None, "传感器数据异常"
else:
return None, "数据格式错误"
except Exception as e:
return None, f"数据解析错误: {str(e)}"
def validate_sensor_data(self, temp, humi):
"""验证传感器数据的合理性"""
# 温度范围检查
if not (-10 <= temp <= 60):
return False
# 湿度范围检查
if not (0 <= humi <= 100):
return False
# 变化率检查(防止突变)
if self.data_history:
last_temp = self.data_history[-1]['temp']
last_humi = self.data_history[-1]['humi']
if abs(temp - last_temp) > 5: # 5°C/min变化过大
return False
if abs(humi - last_humi) > 10: # 10%/min变化过大
return False
return True
def check_alert_conditions(self, temp, humi):
"""检查报警条件"""
alerts = []
if temp > self.temp_threshold:
alerts.append({
'type': 'high_temperature',
'message': f'高温预警: {temp}°C > {self.temp_threshold}°C',
'level': 'warning',
'value': temp
})
if humi > self.humi_threshold:
alerts.append({
'type': 'high_humidity',
'message': f'高湿预警: {humi}% > {self.humi_threshold}%',
'level': 'warning',
'value': humi
})
return alerts
def display_on_gateway(self, temp, humi, alerts):
"""在网关屏幕上显示信息"""
display_lines = [
"大棚环境监控系统",
f"温度: {temp}°C",
f"湿度: {humi}%",
f"时间: {datetime.now().strftime('%H:%M:%S')}"
]
if alerts:
# 报警显示模式
display_lines[0] = "!!! 环境报警 !!!"
for i, alert in enumerate(alerts[:2]): # 最多显示2条报警
if i == 0:
display_lines[1] = alert['message'][:16] # 截取前16字符
elif i == 1:
display_lines[2] = alert['message'][:16]
# 模拟在网关屏幕显示(实际调用网关显示SDK)
print("=== 网关显示屏 ===")
for line in display_lines:
print(line)
print("==================")
def send_to_computer(self, data, alerts):
"""发送数据到电脑监控端"""
payload = {
'timestamp': datetime.now().isoformat(),
'temperature': data['temp'],
'humidity': data['humi'],
'alerts': alerts,
'gateway_id': 'GW001'
}
try:
response = requests.post(
'http://192.168.1.101:5000/api/data',
json=payload,
timeout=3
)
return response.status_code == 200
except Exception as e:
print(f"网络发送失败: {e}")
# 本地存储,网络恢复后重发
self.store_locally(payload)
return False
def store_locally(self, data):
"""本地存储数据(网络异常时)"""
if len(self.data_history) > 1000: # 限制存储量
self.data_history = self.data_history[-500:]
self.data_history.append(data)
def main_loop(self):
"""主监控循环"""
print("启动智慧农业大棚监控系统...")
print(f"报警阈值 - 温度: {self.temp_threshold}°C, 湿度: {self.humi_threshold}%")
while True:
try:
# 1. 读取传感器数据
data, error = self.read_sensor_data()
if data:
temp = data['temp']
humi = data['humi']
# 2. 检查报警条件
alerts = self.check_alert_conditions(temp, humi)
# 3. 网关屏幕显示
self.display_on_gateway(temp, humi, alerts)
# 4. 发送到电脑端
self.send_to_computer(data, alerts)
# 5. 记录报警
if alerts:
self.alert_count += len(alerts)
print(f"检测到报警,总数: {self.alert_count}")
else:
print(f"数据读取失败: {error}")
except Exception as e:
print(f"监控循环异常: {e}")
# 6. 等待5秒
time.sleep(self.read_interval)启动监控系统
if name == "main":
monitor = GreenhouseMonitor()
monitor.main_loop()
### 1. 应用流程图
graph TD
A[传感器数据采集] --> B[网关边缘计算]
B --> C[数据预处理]
C --> D[阈值判断]
D --> E{是否报警?}
E -->|是| F[报警处理]
E -->|否| G[正常记录]
F --> H[本地报警显示]
H --> I[网络传输]
G --> I
I --> J[电脑端接收]
J --> K[数据解析]
K --> L[实时显示]
L --> M[历史存储]
M --> N[趋势分析]
N --> O[报警记录]
O --> P[报表生成]
subgraph 数据流
A --> B --> C --> D --> E
I --> J --> K --> L --> M --> N --> O --> P
end
subgraph 报警处理
F --> H --> I
end
subgraph 用户交互
L --> Q[用户操作]
Q --> R[参数设置]
R --> S[控制指令]
S --> T[设备控制]
T --> A
end
computer_dashboard.py - 电脑端监控界面
import tkinter as tk
from tkinter import ttk, messagebox
import requests
import json
from datetime import datetime, timedelta
import threading
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.dates as mdates
class GreenhouseDashboard:
def init(self, root):
self.root = root
self.root.title("智慧农业大棚监控系统 - 黔东南州")
self.root.geometry("1200x800")
# 数据存储
self.temperature_data = []
self.humidity_data = []
self.timestamps = []
self.alert_log = []
# 服务器配置
self.server_url = "http://192.168.1.100:5000"
self.running = True
self.setup_ui()
self.start_data_thread()
def setup_ui(self):
"""创建监控界面"""
# 创建主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 顶部标题栏
self.create_title_bar(main_frame)
# 实时数据面板
self.create_realtime_panel(main_frame)
# 图表区域
self.create_charts_area(main_frame)
# 报警日志
self.create_alert_log(main_frame)
# 控制面板
self.create_control_panel(main_frame)
def create_title_bar(self, parent):
"""创建标题栏"""
title_frame = ttk.Frame(parent)
title_frame.pack(fill=tk.X, pady=(0, 10))
# 标题
title_label = ttk.Label(
title_frame,
text="🌱 智慧农业大棚环境监控系统",
font=("微软雅黑", 16, "bold"),
foreground="darkgreen"
)
title_label.pack(side=tk.LEFT)
# 状态指示
self.status_label = ttk.Label(
title_frame,
text="● 系统运行中",
foreground="green",
font=("微软雅黑", 10)
)
self.status_label.pack(side=tk.RIGHT)
# 最后更新时间
self.update_label = ttk.Label(
title_frame,
text="最后更新: --:--:--",
font=("微软雅黑", 9)
)
self.update_label.pack(side=tk.RIGHT, padx=20)
def create_realtime_panel(self, parent):
"""创建实时数据面板"""
realtime_frame = ttk.LabelFrame(parent, text="实时环境数据", padding="15")
realtime_frame.pack(fill=tk.X, pady=5)
# 温度显示
temp_frame = ttk.Frame(realtime_frame)
temp_frame.pack(fill=tk.X, pady=8)
ttk.Label(temp_frame, text="温度:", font=("微软雅黑", 12)).pack(side=tk.LEFT)
self.temp_value = ttk.Label(
temp_frame,
text="-- °C",
font=("微软雅黑", 20, "bold"),
foreground="red"
)
self.temp_value.pack(side=tk.LEFT, padx=10)
self.temp_status = ttk.Label(
temp_frame,
text="正常",
foreground="green",
font=("微软雅黑", 10)
)
self.temp_status.pack(side=tk.LEFT)
# 湿度显示
humi_frame = ttk.Frame(realtime_frame)
humi_frame.pack(fill=tk.X, pady=8)
ttk.Label(humi_frame, text="湿度:", font=("微软雅黑", 12)).pack(side=tk.LEFT)
self.humi_value = ttk.Label(
humi_frame,
text="-- %",
font=("微软雅黑", 20, "bold"),
foreground="blue"
)
self.humi_value.pack(side=tk.LEFT, padx=10)
self.humi_status = ttk.Label(
humi_frame,
text="正常",
foreground="green",
font=("微软雅黑", 10)
)
self.humi_status.pack(side=tk.LEFT)
# 报警指示器
alert_frame = ttk.Frame(realtime_frame)
alert_frame.pack(fill=tk.X, pady=10)
self.alert_indicator = ttk.Label(
alert_frame,
text="● 系统正常",
foreground="green",
font=("微软雅黑", 12, "bold")
)
self.alert_indicator.pack()
def create_charts_area(self, parent):
"""创建图表区域"""
chart_frame = ttk.LabelFrame(parent, text="环境趋势图", padding="15")
chart_frame.pack(fill=tk.BOTH, expand=True, pady=5)
# 创建Matplotlib图形
self.fig = Figure(figsize=(10, 6), dpi=100)
self.ax1 = self.fig.add_subplot(211) # 温度图表
self.ax2 = self.fig.add_subplot(212) # 湿度图表
# 设置图表样式
self.setup_chart_style()
# 创建Canvas
self.canvas = FigureCanvasTkAgg(self.fig, chart_frame)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def setup_chart_style(self):
"""设置图表样式"""
# 温度图表
self.ax1.set_title('温度变化趋势', fontsize=12, fontweight='bold')
self.ax1.set_ylabel('温度 (°C)', fontsize=10)
self.ax1.grid(True, linestyle='--', alpha=0.7)
self.ax1.axhline(y=28, color='red', linestyle='--', alpha=0.8, label='报警阈值')
# 湿度图表
self.ax2.set_title('湿度变化趋势', fontsize=12, fontweight='bold')
self.ax2.set_ylabel('湿度 (%)', fontsize=10)
self.ax2.set_xlabel('时间', fontsize=10)
self.ax2.grid(True, linestyle='--', alpha=0.7)
self.ax2.axhline(y=85, color='red', linestyle='--', alpha=0.8, label='报警阈值')
# 初始化数据线
self.temp_line, = self.ax1.plot([], [], 'r-', linewidth=2, label='温度')
self.humi_line, = self.ax2.plot([], [], 'b-', linewidth=2, label='湿度')
self.ax1.legend()
self.ax2.legend()
def create_alert_log(self, parent):
"""创建报警日志区域"""
log_frame = ttk.LabelFrame(parent, text="报警日志", padding="15")
log_frame.pack(fill=tk.X, pady=5)
# 创建树形视图
columns = ("时间", "类型", "信息", "级别")
self.alert_tree = ttk.Treeview(log_frame, columns=columns, show="headings", height=6)
# 设置列属性
column_widths = {"时间": 150, "类型": 100, "信息": 300, "级别": 80}
for col in columns:
self.alert_tree.heading(col, text=col)
self.alert_tree.column(col, width=column_widths[col])
# 添加滚动条
scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.alert_tree.yview)
self.alert_tree.configure(yscrollcommand=scrollbar.set)
self.alert_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
def create_control_panel(self, parent):
"""创建控制面板"""
control_frame = ttk.Frame(parent)
control_frame.pack(fill=tk.X, pady=10)
# 控制按钮
ttk.Button(control_frame, text="📊 历史数据",
command=self.show_history_data).pack(side=tk.LEFT, padx=5)
ttk.Button(control_frame, text="📋 导出报告",
command=self.export_report).pack(side=tk.LEFT, padx=5)
ttk.Button(control_frame, text="⚙️ 系统设置",
command=self.system_settings).pack(side=tk.LEFT, padx=5)
ttk.Button(control_frame, text="🔄 手动刷新",
command=self.manual_refresh).pack(side=tk.LEFT, padx=5)
ttk.Button(control_frame, text="⏹️ 停止监控",
command=self.stop_monitoring).pack(side=tk.RIGHT, padx=5)
def update_charts(self):
"""更新图表数据"""
if len(self.timestamps) > 0:
# 只显示最近20个数据点
display_count = min(20, len(self.timestamps))
recent_timestamps = self.timestamps[-display_count:]
recent_temps = self.temperature_data[-display_count:]
recent_humis = self.h```
最后编辑:yang 更新时间:2025-12-06 19:00