用规则引擎实现业务功能


本小节我们用规则引擎来实现一个可真实运行的物联网项目 - 智能浇花

一、了解规则引擎

规则引擎的主要思想是将应用程序中的业务决策部分分离出来,并使用预定义的语义模块编写业务决策(业务规则),由用户在需要时进行配置、管理。
规则引擎实现了将业务决策从应用程序代码中分离出来,接收数据输入,解释业务规则,并根据业务规则做出业务决策。

小鹏AIoT物联网系统实现了一套可视化的规则引擎系统,无需编写代码即可实现各类业务流程。

二、需求梳理

本案例的需求是根据花盆土壤湿度情况来浇水。显然,如果长时间不浇水,植物就旱死了,如果水浇多了,就涝死了。

需求分析:

1. 设备上需要持续收集土壤湿度数据并发送给物联网系统(例如5分钟收集1次数据),土壤湿度可用土壤湿度传感器来获取读数,用ESP32的话,这个值介于1-4095之间;

2. 由物联网系统对采集到的土壤湿度数据做相关规则检查,如一段时间内(时间窗口)连续值是否大于给定阀值(使用连续值来判定可以防止误触发),如果是则向小水泵继电器的订阅主题推送控制数据(包含闭合值,以及保持秒数);

3. 设备收到订阅消息后,控制继电器闭合指定秒数(不要一直闭合哦),完成小水泵浇水动作。

三、电路搭建

用规则引擎实现业务功能
所有部件均无特殊要求,可自行淘宝购买。

四、规则设计与配置

针对本案例需求,可以从数据流的角度来分析和设计规则,首先是数据采集、然后是数据判断、最后是动作执行等。

下面是配置骤和说明:

1.首先创建项目,然后添加土壤湿度传感器和小水泵继电器设备。

2.创建一个名为“智能浇水”的规则,按照规则设计把对应的规则节点拖入编辑器内,进行串联,并设置规则节点参数。

用规则引擎实现业务功能
数据窗口节点
本节点用来收集指定时间(30分钟)内的所有上报数据,用于下一步操作,并且需要有足够量(8)的数据值,防止引发误操作。
值列表比较
本节点用来判断窗口期内所有上报数据是否都大于阀值(3000),这可以规避因某个误报值导致的误操作。
这里可以根据不同绿植的习性来设置阀值。
继电器数据发布
本节点用来发布控制继电器的数据,包括打开与否(由上一节点的输出来定)和保持时间(3秒)。
这里可以根据不同绿植的种类和盆来设置保持时间。
更多规则节点的使用,请参考“规则引擎”小节。

五、编写ESP32 上的程序

本方案使用MicroPython编程,请先在ESP32上写入MicroPython的固件,具体请参考站内文件。

用Tonny连接ESP32开发板,将以下程序复制到开发板环境根目录下。

wifi.py
import network
import time

def connect():
	ssid = '***your_ssid***'
	password = '***your_passwd***'
	wlan = network.WLAN(network.STA_IF)
	wlan.active(True)
	wlan.connect(ssid, password)
	while wlan.isconnected() == False:
		print('Waiting for connection...')
		time.sleep(1)
	print('Connected on {ip}'.format(ip = wlan.ifconfig()[0]))


mqtt.py


from umqtt.simple import MQTTClient

MQTT_SERVER = 'iot.xpstem.com'
MQTT_PORT = 1883
MQTT_USER = 'ss10001'
MQTT_PASSWD = '***passwd***'
MQTT_CLIENT_ID = 'ESP32-S3-001'

def connect():
    client = MQTTClient(MQTT_CLIENT_ID, MQTT_SERVER, MQTT_PORT, MQTT_USER, MQTT_PASSWD, 300)
    client.connect()
    print('Connected to MQTT Broker "{server}"'.format(server = MQTT_SERVER))
    return client


main程序

###################################
# 智能浇花程序
#
# 通过检测土壤温度情况来控制浇水。
# 要结合物联网系统来使用
#
# author: billy_zh@126.com
###################################
import wifi
import mqtt
import json
import random
import time
import _thread
from machine import ADC, Pin

# 土壤温度数据上报主题
PUB_TOPIC = b'user/******/data'
# 水泵继电器数据订阅主题
SUB_TOPIC = 'user/******/ctrl'

# 土壤温度传感器数据端口
adc = ADC(Pin(2))
adc.width(ADC.WIDTH_12BIT)  # 位宽,取值0-4095
adc.atten(ADC.ATTN_11DB)  # 3.3v基准
# 水泵继电器控制端口
pin = Pin(6, Pin.OUT)

# wifi 连接
wifi.connect()
# mqtt 连接
mqtt_client = mqtt.connect()

####################
# 读取土壤温度数据并上报到服务器
####################
def read_and_publish():
    t = time.localtime()
    value = adc.read()
    msg_dict = {
            'id': 't{year}{month}{day}{id}'.format(year=t[0],month=t[1],day=t[2],id=random.randint(100000, 999999)),
            'data': {'temp':value}
        }
    msg = json.dumps(msg_dict)
    mqtt_client.publish(PUB_TOPIC, msg)
    print('消息已发送到{topic}, data:{data}'.format(topic=PUB_TOPIC.decode(), data=value))


####################
# 控制消息回调
####################
def msg_callback(topic, msg):
    try:
        payload = msg.decode();
        print('从{topic}接收到消息, data:{data}'.format(topic=topic.decode(), data=payload))
        
        dict = json.loads(payload)
        state = dict['data']['state']
        keepSeconds = dict['data']['keepSeconds']
        if state==1:
            pin.on()
            print('pin 6 on...')
            time.sleep(keepSeconds)
            
            pin.off()
            print('pin 6 off...')
    except Exception as e:
        print(f"消息接收出现错误:{e}")

        
def send_task():
    while True:
        try:
            read_and_publish()
            time.sleep(60)
        except Exception as e:
            print(f"消息发送出现错误:{e}")
            mqtt_client = mqtt.connect()


# mqtt 订阅
mqtt_client.set_callback(msg_callback)
mqtt_client.subscribe(SUB_TOPIC)
    
# 发送线程        
_thread.start_new_thread(send_task, ())


####################
# 主循环
####################
while True:
    # 检查是否有消息回调
    mqtt_client.check_msg()
    time.sleep(1)

五、运行

用规则引擎实现业务功能

上报数据查看

用规则引擎实现业务功能
下发数据查看

用规则引擎实现业务功能