智慧农业大棚环境监测与自动调节系统解决方案

A. 整体架构图与设计图

1. 系统整体架构图

graph TB
    %% 感知层
    A1[温湿度传感器] -->|有线连接| B1[数据采集器1]
    A2[光照传感器] -->|ZigBee无线| B2[协调器]
    
    %% 传输层
    B1 -->|RS485/Modbus| C[网关]
    B2 -->|ZigBee| C
    C -->|以太网/WiFi| D[电脑监控中心]
    
    %% 控制层
    C -->|控制信号| E1[卷膜电机]
    C -->|控制信号| E2[补光灯]
    C -->|控制信号| E3[声光报警器]
    
    %% 应用层
    D --> F[Web可视化界面]
    D --> G[移动APP]
    D --> H[数据存储与分析]
    
    %% 样式定义
    class A1,A2 sensor;
    class B1,B2 collector;
    class C gateway;
    class D computer;
    class E1,E2,E3 actuator;
    
    classDef sensor fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
    classDef collector fill:#f3e5f5,stroke:#4a148c,stroke-width:2px;
    classDef gateway fill:#fff3e0,stroke:#e65100,stroke-width:2px;
    classDef computer fill:#e8f5e8,stroke:#1b5e20,stroke-width:2px;
    classDef actuator fill:#ffebee,stroke:#b71c1c,stroke-width:2px;
```**架构说明:**
- **感知层**:温湿度传感器(有线)+ 光照传感器(无线) 
- **传输层**:数据采集器 + 协调器 + 网关组成的数据传输网络 
- **控制层**:网关作为边缘计算节点,执行自动控制逻辑 
- **应用层**:电脑端实现数据可视化、分析和远程监控 

### 2. 温湿度超标自动报警业务流程图

graph TD
Start[系统启动] --> Init[设备初始化]
Init --> Net[组建传感器网络]
Net --> LoopStart[开始监控循环]

LoopStart --> ReadTemp[读取温湿度数据]
ReadTemp --> Validate[数据有效性验证]
Validate --> CheckTemp{温度>28°C?}

CheckTemp -->|是| HighTemp[高温报警处理]
CheckTemp -->|否| CheckHumi{湿度>85%?}

CheckHumi -->|是| HighHumi[高湿报警处理]
CheckHumi -->|否| Normal[正常数据记录]

HighTemp --> AlertProc[报警处理流程]
HighHumi --> AlertProc

AlertProc --> Display[网关屏幕显示报警]
Display --> Log[记录报警事件]
Log --> Notify[通知电脑监控端]

Normal --> UpdateDisplay[更新正常显示]
UpdateDisplay --> Wait[等待5秒]

Notify --> Wait
Wait --> LoopStart

%% 子流程定义
subgraph AlertProc [报警处理流程]
    A1[确定报警级别] --> A2[触发声光报警]
    A2 --> A3[执行自动控制]
    A3 --> A4[生成报警记录]
end
- **数据验证**:检查传感器读数是否在合理范围内(-10°C~60°C0%~100%
- **多级判断**:支持温度、湿度的独立阈值判断 
- **报警分级**:可根据超标程度设置不同报警级别 
- **闭环控制**:报警后自动执行相应的控制动作 

## B. 最小单元的设计验证和数据采集验证

### 1. 硬件连接与配置步骤

#### 物理连接实施

步骤1:温湿度传感器连接
├─ 传感器电源线(红) → 数据采集器1的VCC(5V)
├─ 传感器地线(黑) → 数据采集器1的GND
├─ 传感器数据线(绿) → 数据采集器1的DATA1端口
└─ 检查连接牢固性

步骤2:数据采集器网络连接
├─ 数据采集器1的RS485-A → 网关RS485接口A
├─ 数据采集器1的RS485-B → 网关RS485接口B
└─ 接通12V直流电源

步骤3:网关与电脑连接
├─ 网线连接网关ETH口和电脑网口
├─ 设置网关IP:192.168.1.100
├─ 设置电脑IP:192.168.1.101(同网段)
└─ 测试网络连通性

gateway_config.py - 网关和设备配置脚本

import serial
import time
import json

class DeviceConfigurator:
def init(self):
self.serial_port = '/dev/ttyUSB0'
self.baud_rate = 9600

def configure_data_collector(self):
    """配置数据采集器参数"""
    config_commands = [
        'AT+ADDR=01',          # 设置设备地址01
        'AT+INTERVAL=5000',    # 采样间隔5秒
        'AT+FORMAT=JSON',      # 数据格式JSON
        'AT+SAVE'              # 保存配置
    ]
    
    try:
        with serial.Serial(self.serial_port, self.baud_rate, timeout=1) as ser:
            for cmd in config_commands:
                ser.write(f"{cmd}\r\n".encode())
                time.sleep(0.5)
                response = ser.readline().decode().strip()
                print(f"发送: {cmd} -> 响应: {response}")
                
                if 'OK' not in response:
                    raise Exception(f"配置失败: {response}")
        
        print("数据采集器配置成功")
        return True
        
    except Exception as e:
        print(f"配置错误: {e}")
        return False

def test_sensor_communication(self):
    """测试传感器通信"""
    test_commands = ['AT+READ=01', 'AT+STATUS']
    
    try:
        with serial.Serial(self.serial_port, self.baud_rate, timeout=2) as ser:
            for cmd in test_commands:
                ser.write(f"{cmd}\r\n".encode())
                response = ser.readline().decode().strip()
                
                if 'TEMP' in response or 'HUMI' in response:
                    # 解析传感器数据
                    data = self.parse_sensor_data(response)
                    print(f"传感器测试成功: 温度={data['temp']}°C, 湿度={data['humi']}%")
                    return True
                elif 'OK' in response:
                    print("通信正常")
                    continue
        
        return False
        
    except Exception as e:
        print(f"通信测试失败: {e}")
        return False

def parse_sensor_data(self, response):
    """解析传感器数据响应"""
    # 示例响应: "TEMP=25.6,HUMI=65.2"
    try:
        parts = response.split(',')
        temp = float(parts[0].split('=')[1])
        humi = float(parts[1].split('=')[1])
        return {'temp': temp, 'humi': humi}
    except:
        return {'temp': 0, 'humi': 0}

执行配置

if name == "main":
configurator = DeviceConfigurator()

if configurator.configure_data_collector():
    print("开始传感器通信测试...")
    configurator.test_sensor_communication()

gateway_monitor.py - 网关端监控主程序

import time
import json
import serial
from datetime import datetime
import requests

class GreenhouseMonitor:
def init(self):

硬件配置

self.serial_port = '/dev/ttyUSB0'
self.baud_rate = 9600

    # 报警阈值
    self.temp_threshold = 28.0    # 高温阈值
    self.humi_threshold = 85.0    # 高湿阈值
    
    # 监控参数
    self.read_interval = 5         # 读取间隔5秒
    self.alert_count = 0
    self.data_history = []
    
def read_sensor_data(self):
    """从数据采集器读取温湿度数据"""
    try:
        with serial.Serial(self.serial_port, self.baud_rate, timeout=2) as ser:
            ser.write(b"AT+READ=01\r\n")
            response = ser.readline().decode().strip()
            
            if response:
                return self.parse_sensor_response(response)
            else:
                return None, "传感器无响应"
                
    except Exception as e:
        return None, f"读取错误: {str(e)}"

def parse_sensor_response(self, response):
    """解析传感器响应数据"""
    try:
        if 'TEMP' in response and 'HUMI' in response:
            # 格式: TEMP=25.6,HUMI=65.2
            parts = response.split(',')
            temp = float(parts[0].split('=')[1])
            humi = float(parts[1].split('=')[1])
            
            # 数据合理性验证
            if self.validate_sensor_data(temp, humi):
                return {'temp': temp, 'humi': humi, 'timestamp': datetime.now()}, None
            else:
                return None, "传感器数据异常"
        else:
            return None, "数据格式错误"
            
    except Exception as e:
        return None, f"数据解析错误: {str(e)}"

def validate_sensor_data(self, temp, humi):
    """验证传感器数据的合理性"""
    # 温度范围检查
    if not (-10 <= temp <= 60):
        return False
    
    # 湿度范围检查  
    if not (0 <= humi <= 100):
        return False
        
    # 变化率检查(防止突变)
    if self.data_history:
        last_temp = self.data_history[-1]['temp']
        last_humi = self.data_history[-1]['humi']
        
        if abs(temp - last_temp) > 5:  # 5°C/min变化过大
            return False
        if abs(humi - last_humi) > 10:  # 10%/min变化过大
            return False
            
    return True

def check_alert_conditions(self, temp, humi):
    """检查报警条件"""
    alerts = []
    
    if temp > self.temp_threshold:
        alerts.append({
            'type': 'high_temperature',
            'message': f'高温预警: {temp}°C > {self.temp_threshold}°C',
            'level': 'warning',
            'value': temp
        })
    
    if humi > self.humi_threshold:
        alerts.append({
            'type': 'high_humidity',
            'message': f'高湿预警: {humi}% > {self.humi_threshold}%', 
            'level': 'warning',
            'value': humi
        })
        
    return alerts

def display_on_gateway(self, temp, humi, alerts):
    """在网关屏幕上显示信息"""
    display_lines = [
        "大棚环境监控系统",
        f"温度: {temp}°C",
        f"湿度: {humi}%", 
        f"时间: {datetime.now().strftime('%H:%M:%S')}"
    ]
    
    if alerts:
        # 报警显示模式
        display_lines[0] = "!!! 环境报警 !!!"
        for i, alert in enumerate(alerts[:2]):  # 最多显示2条报警
            if i == 0:
                display_lines[1] = alert['message'][:16]  # 截取前16字符
            elif i == 1:
                display_lines[2] = alert['message'][:16]
    
    # 模拟在网关屏幕显示(实际调用网关显示SDK)
    print("=== 网关显示屏 ===")
    for line in display_lines:
        print(line)
    print("==================")

def send_to_computer(self, data, alerts):
    """发送数据到电脑监控端"""
    payload = {
        'timestamp': datetime.now().isoformat(),
        'temperature': data['temp'],
        'humidity': data['humi'],
        'alerts': alerts,
        'gateway_id': 'GW001'
    }
    
    try:
        response = requests.post(
            'http://192.168.1.101:5000/api/data',
            json=payload,
            timeout=3
        )
        return response.status_code == 200
    except Exception as e:
        print(f"网络发送失败: {e}")
        # 本地存储,网络恢复后重发
        self.store_locally(payload)
        return False

def store_locally(self, data):
    """本地存储数据(网络异常时)"""
    if len(self.data_history) > 1000:  # 限制存储量
        self.data_history = self.data_history[-500:]
    
    self.data_history.append(data)

def main_loop(self):
    """主监控循环"""
    print("启动智慧农业大棚监控系统...")
    print(f"报警阈值 - 温度: {self.temp_threshold}°C, 湿度: {self.humi_threshold}%")
    
    while True:
        try:
            # 1. 读取传感器数据
            data, error = self.read_sensor_data()
            
            if data:
                temp = data['temp']
                humi = data['humi']
                
                # 2. 检查报警条件
                alerts = self.check_alert_conditions(temp, humi)
                
                # 3. 网关屏幕显示
                self.display_on_gateway(temp, humi, alerts)
                
                # 4. 发送到电脑端
                self.send_to_computer(data, alerts)
                
                # 5. 记录报警
                if alerts:
                    self.alert_count += len(alerts)
                    print(f"检测到报警,总数: {self.alert_count}")
                    
            else:
                print(f"数据读取失败: {error}")
                
        except Exception as e:
            print(f"监控循环异常: {e}")
        
        # 6. 等待5秒
        time.sleep(self.read_interval)

启动监控系统

if name == "main":
monitor = GreenhouseMonitor()
monitor.main_loop()


### 1. 应用流程图

graph TD
A[传感器数据采集] --> B[网关边缘计算]
B --> C[数据预处理]
C --> D[阈值判断]
D --> E{是否报警?}

E -->|是| F[报警处理]
E -->|否| G[正常记录]

F --> H[本地报警显示]
H --> I[网络传输]
G --> I

I --> J[电脑端接收]
J --> K[数据解析]
K --> L[实时显示]
L --> M[历史存储]

M --> N[趋势分析]
N --> O[报警记录]
O --> P[报表生成]

subgraph 数据流
    A --> B --> C --> D --> E
    I --> J --> K --> L --> M --> N --> O --> P
end

subgraph 报警处理
    F --> H --> I
end

subgraph 用户交互
    L --> Q[用户操作]
    Q --> R[参数设置]
    R --> S[控制指令]
    S --> T[设备控制]
    T --> A
end

computer_dashboard.py - 电脑端监控界面

import tkinter as tk
from tkinter import ttk, messagebox
import requests
import json
from datetime import datetime, timedelta
import threading
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.dates as mdates

class GreenhouseDashboard:
def init(self, root):
self.root = root
self.root.title("智慧农业大棚监控系统 - 黔东南州")
self.root.geometry("1200x800")

    # 数据存储
    self.temperature_data = []
    self.humidity_data = []
    self.timestamps = []
    self.alert_log = []
    
    # 服务器配置
    self.server_url = "http://192.168.1.100:5000"
    self.running = True
    
    self.setup_ui()
    self.start_data_thread()

def setup_ui(self):
    """创建监控界面"""
    # 创建主框架
    main_frame = ttk.Frame(self.root, padding="10")
    main_frame.pack(fill=tk.BOTH, expand=True)
    
    # 顶部标题栏
    self.create_title_bar(main_frame)
    
    # 实时数据面板
    self.create_realtime_panel(main_frame)
    
    # 图表区域
    self.create_charts_area(main_frame)
    
    # 报警日志
    self.create_alert_log(main_frame)
    
    # 控制面板
    self.create_control_panel(main_frame)

def create_title_bar(self, parent):
    """创建标题栏"""
    title_frame = ttk.Frame(parent)
    title_frame.pack(fill=tk.X, pady=(0, 10))
    
    # 标题
    title_label = ttk.Label(
        title_frame, 
        text="🌱 智慧农业大棚环境监控系统", 
        font=("微软雅黑", 16, "bold"),
        foreground="darkgreen"
    )
    title_label.pack(side=tk.LEFT)
    
    # 状态指示
    self.status_label = ttk.Label(
        title_frame, 
        text="● 系统运行中",
        foreground="green",
        font=("微软雅黑", 10)
    )
    self.status_label.pack(side=tk.RIGHT)
    
    # 最后更新时间
    self.update_label = ttk.Label(
        title_frame,
        text="最后更新: --:--:--",
        font=("微软雅黑", 9)
    )
    self.update_label.pack(side=tk.RIGHT, padx=20)

def create_realtime_panel(self, parent):
    """创建实时数据面板"""
    realtime_frame = ttk.LabelFrame(parent, text="实时环境数据", padding="15")
    realtime_frame.pack(fill=tk.X, pady=5)
    
    # 温度显示
    temp_frame = ttk.Frame(realtime_frame)
    temp_frame.pack(fill=tk.X, pady=8)
    
    ttk.Label(temp_frame, text="温度:", font=("微软雅黑", 12)).pack(side=tk.LEFT)
    self.temp_value = ttk.Label(
        temp_frame, 
        text="-- °C", 
        font=("微软雅黑", 20, "bold"),
        foreground="red"
    )
    self.temp_value.pack(side=tk.LEFT, padx=10)
    
    self.temp_status = ttk.Label(
        temp_frame, 
        text="正常",
        foreground="green",
        font=("微软雅黑", 10)
    )
    self.temp_status.pack(side=tk.LEFT)
    
    # 湿度显示
    humi_frame = ttk.Frame(realtime_frame)
    humi_frame.pack(fill=tk.X, pady=8)
    
    ttk.Label(humi_frame, text="湿度:", font=("微软雅黑", 12)).pack(side=tk.LEFT)
    self.humi_value = ttk.Label(
        humi_frame, 
        text="-- %", 
        font=("微软雅黑", 20, "bold"),
        foreground="blue"
    )
    self.humi_value.pack(side=tk.LEFT, padx=10)
    
    self.humi_status = ttk.Label(
        humi_frame, 
        text="正常",
        foreground="green",
        font=("微软雅黑", 10)
    )
    self.humi_status.pack(side=tk.LEFT)
    
    # 报警指示器
    alert_frame = ttk.Frame(realtime_frame)
    alert_frame.pack(fill=tk.X, pady=10)
    
    self.alert_indicator = ttk.Label(
        alert_frame, 
        text="● 系统正常",
        foreground="green",
        font=("微软雅黑", 12, "bold")
    )
    self.alert_indicator.pack()

def create_charts_area(self, parent):
    """创建图表区域"""
    chart_frame = ttk.LabelFrame(parent, text="环境趋势图", padding="15")
    chart_frame.pack(fill=tk.BOTH, expand=True, pady=5)
    
    # 创建Matplotlib图形
    self.fig = Figure(figsize=(10, 6), dpi=100)
    self.ax1 = self.fig.add_subplot(211)  # 温度图表
    self.ax2 = self.fig.add_subplot(212)  # 湿度图表
    
    # 设置图表样式
    self.setup_chart_style()
    
    # 创建Canvas
    self.canvas = FigureCanvasTkAgg(self.fig, chart_frame)
    self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)

def setup_chart_style(self):
    """设置图表样式"""
    # 温度图表
    self.ax1.set_title('温度变化趋势', fontsize=12, fontweight='bold')
    self.ax1.set_ylabel('温度 (°C)', fontsize=10)
    self.ax1.grid(True, linestyle='--', alpha=0.7)
    self.ax1.axhline(y=28, color='red', linestyle='--', alpha=0.8, label='报警阈值')
    
    # 湿度图表
    self.ax2.set_title('湿度变化趋势', fontsize=12, fontweight='bold')
    self.ax2.set_ylabel('湿度 (%)', fontsize=10)
    self.ax2.set_xlabel('时间', fontsize=10)
    self.ax2.grid(True, linestyle='--', alpha=0.7)
    self.ax2.axhline(y=85, color='red', linestyle='--', alpha=0.8, label='报警阈值')
    
    # 初始化数据线
    self.temp_line, = self.ax1.plot([], [], 'r-', linewidth=2, label='温度')
    self.humi_line, = self.ax2.plot([], [], 'b-', linewidth=2, label='湿度')
    
    self.ax1.legend()
    self.ax2.legend()

def create_alert_log(self, parent):
    """创建报警日志区域"""
    log_frame = ttk.LabelFrame(parent, text="报警日志", padding="15")
    log_frame.pack(fill=tk.X, pady=5)
    
    # 创建树形视图
    columns = ("时间", "类型", "信息", "级别")
    self.alert_tree = ttk.Treeview(log_frame, columns=columns, show="headings", height=6)
    
    # 设置列属性
    column_widths = {"时间": 150, "类型": 100, "信息": 300, "级别": 80}
    for col in columns:
        self.alert_tree.heading(col, text=col)
        self.alert_tree.column(col, width=column_widths[col])
    
    # 添加滚动条
    scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.alert_tree.yview)
    self.alert_tree.configure(yscrollcommand=scrollbar.set)
    
    self.alert_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
    scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

def create_control_panel(self, parent):
    """创建控制面板"""
    control_frame = ttk.Frame(parent)
    control_frame.pack(fill=tk.X, pady=10)
    
    # 控制按钮
    ttk.Button(control_frame, text="📊 历史数据", 
              command=self.show_history_data).pack(side=tk.LEFT, padx=5)
    ttk.Button(control_frame, text="📋 导出报告", 
              command=self.export_report).pack(side=tk.LEFT, padx=5)
    ttk.Button(control_frame, text="⚙️ 系统设置", 
              command=self.system_settings).pack(side=tk.LEFT, padx=5)
    ttk.Button(control_frame, text="🔄 手动刷新", 
              command=self.manual_refresh).pack(side=tk.LEFT, padx=5)
    ttk.Button(control_frame, text="⏹️ 停止监控", 
              command=self.stop_monitoring).pack(side=tk.RIGHT, padx=5)

def update_charts(self):
    """更新图表数据"""
    if len(self.timestamps) > 0:
        # 只显示最近20个数据点
        display_count = min(20, len(self.timestamps))
        recent_timestamps = self.timestamps[-display_count:]
        recent_temps = self.temperature_data[-display_count:]
        recent_humis = self.h

```

作者:yang  创建时间:2025-12-06 18:59
最后编辑:yang  更新时间:2025-12-06 19:00