#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_Client.h"
至于怎么样去彻底了解这个库的使用方法,请看博主下面的源码分析。
/***
* 函数1:创建MQTT客户端
* @param client 来源客户端,比如Wificlient eth以太网
* @param server mqtt服务器地址
* @param port mqtt服务器端口
* @param cid 客户端id,如果是8266,可以设置为芯片id之类的,每个端都是独一无二
* @param user mqtt服务器账号
* @param pass mqtt服务器密码
*/
Adafruit_MQTT_Client(Client *client, const char *server, uint16_t port,
const char *cid, const char *user, const char *pass):
Adafruit_MQTT(server, port, cid, user, pass),
client(client)
{}
/***
* 函数2:创建MQTT客户端
* @param client 来源客户端,比如Wificlient eth以太网
* @param server mqtt服务器地址
* @param port mqtt服务器端口
* @param user mqtt服务器账号
* @param pass mqtt服务器密码
*/
Adafruit_MQTT_Client(Client *client, const char *server, uint16_t port,
const char *user="", const char *pass=""):
Adafruit_MQTT(server, port, user, pass),
client(client)
{}
这里用到了 Adafruit_MQTT类的初始化构造方法:
/***
* 函数1:配置mqtt信息
* @param server mqtt服务器地址
* @param port mqtt服务器端口
* @param cid 客户端id,如果是8266,可以设置为芯片id之类的,每个端都是独一无二
* @param user mqtt服务器账号
* @param pass mqtt服务器密码
*/
Adafruit_MQTT::Adafruit_MQTT(const char *server,
uint16_t port,
const char *cid,
const char *user,
const char *pass) {
servername = server;
portnum = port;
clientid = cid;
username = user;
password = pass;
// 重置订阅主题
for (uint8_t i=0; i<MAXSUBSCRIPTIONS; i++) {
subscriptions[i] = 0;
}
will_topic = 0;
will_payload = 0;
will_qos = 0;
will_retain = 0;
packet_id_counter = 0;
}
/***
* 函数2:配置mqtt信息
* @param server mqtt服务器地址
* @param port mqtt服务器端口
* @param user mqtt服务器账号
* @param pass mqtt服务器密码
*/
Adafruit_MQTT::Adafruit_MQTT(const char *server,
uint16_t port,
const char *user,
const char *pass) {
servername = server;
portnum = port;
clientid = "";// 默认cid是空字符串
username = user;
password = pass;
// 重置订阅主题
for (uint8_t i=0; i<MAXSUBSCRIPTIONS; i++) {
subscriptions[i] = 0;
}
will_topic = 0;
will_payload = 0;
will_qos = 0;
will_retain = 0;
packet_id_counter = 0;
}
3.1.2 will —— 配置遗嘱消息
/**
* 配置遗嘱消息
* @param topic 遗嘱主题
* @param payload 遗嘱内容
* @param qos 遗嘱质量等级
* @param retain
*/
bool Adafruit_MQTT::will(const char *topic, const char *payload, uint8_t qos, uint8_t retain) {
if (connected()) {
DEBUG_PRINT(F("Will defined after connect"));
return false;
}
will_topic = topic;
will_payload = payload;
will_qos = qos;
will_retain = retain;
return true;
}
/**
* 功能描述: 连接服务器
* @param user mqtt服务器账号
* @param pass mqtt服务器密码
*/
int8_t Adafruit_MQTT::connect(const char *user, const char *pass)
{
username = user;
password = pass;
return connect();
}
/**
* 功能描述:连接服务器
*/
int8_t Adafruit_MQTT::connect() {
// 判断是否连接到服务器
if (!connectServer())
return -1;
// 构建连接报文.
uint8_t len = connectPacket(buffer);
// 发送连接报文
if (!sendPacket(buffer, len))
return -1;
// 读取响应报文信息并且做校验
len = readFullPacket(buffer, MAXBUFFERSIZE, CONNECT_TIMEOUT_MS);
if (len != 4)
return -1;
if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2))
return -1;
if (buffer[3] != 0)
return buffer[3];
// 判断是否注册了订阅主题 subscriptions 由类 Adafruit_MQTT_Subscribe负责
/**
* #if defined (__AVR_ATmega32U4__) || defined(__AVR_ATmega328P__)
#define MAXSUBSCRIPTIONS 5
#define SUBSCRIPTIONDATALEN 20
#else
#define MAXSUBSCRIPTIONS 15
#define SUBSCRIPTIONDATALEN 100
#endif
*/
for (uint8_t i=0; i<MAXSUBSCRIPTIONS; i++) {
// Ignore subscriptions that aren't defined.
if (subscriptions[i] == 0) continue;
boolean success = false;
for (uint8_t retry=0; (retry<3) && !success; retry++) { // retry until we get a suback
// 构建订阅主题报文.
uint8_t len = subscribePacket(buffer, subscriptions[i]->topic, subscriptions[i]->qos);
// 发送报文
if (!sendPacket(buffer, len))
return -1;
if(MQTT_PROTOCOL_LEVEL < 3) // older versions didn't suback
break;
// Check for SUBACK if using MQTT 3.1.1 or higher
// TODO: The Server is permitted to start sending PUBLISH packets matching the
// Subscription before the Server sends the SUBACK Packet. (will really need to use callbacks - ada)
//Serial.println("\t**looking for suback");
// 等待订阅响应
if (processPacketsUntil(buffer, MQTT_CTRL_SUBACK, SUBACK_TIMEOUT_MS)) {
success = true;
break;
}
}
if (! success) return -2; // failed to sub for some reason
}
return 0;
}
/**
* 功能描述:是否连接到服务器 (servername、portnum)
* @return bool true or false
*/
bool Adafruit_MQTT_Client::connectServer() {
// Grab server name from flash and copy to buffer for name resolution.
memset(buffer, 0, sizeof(buffer));
strcpy((char *)buffer, servername);
DEBUG_PRINT(F("Connecting to: ")); DEBUG_PRINTLN((char *)buffer);
// Connect and check for success (0 result).
int r = client->connect((char *)buffer, portnum);
DEBUG_PRINT(F("Connect result: ")); DEBUG_PRINTLN(r);
return r != 0;
}
/**
* 构建连接报文
*/
uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) {
uint8_t *p = packet;
uint16_t len;
// fixed header, connection messsage no flags
p[0] = (MQTT_CTRL_CONNECT << 4) | 0x0;
p+=2;
// fill in packet[1] last
#if MQTT_PROTOCOL_LEVEL == 3
p = stringprint(p, "MQIsdp");
#elif MQTT_PROTOCOL_LEVEL == 4
p = stringprint(p, "MQTT");
#else
#error "MQTT level not supported"
#endif
p[0] = MQTT_PROTOCOL_LEVEL; //mqtt协议版本
p++;
// always clean the session
p[0] = MQTT_CONN_CLEANSESSION;
// set the will flags if needed 设置遗嘱消息
if (will_topic && pgm_read_byte(will_topic) != 0) {
p[0] |= MQTT_CONN_WILLFLAG;
if(will_qos == 1)
p[0] |= MQTT_CONN_WILLQOS_1;
else if(will_qos == 2)
p[0] |= MQTT_CONN_WILLQOS_2;
if(will_retain == 1)
p[0] |= MQTT_CONN_WILLRETAIN;
}
if (pgm_read_byte(username) != 0)
p[0] |= MQTT_CONN_USERNAMEFLAG;
if (pgm_read_byte(password) != 0)
p[0] |= MQTT_CONN_PASSWORDFLAG;
p++;
p[0] = MQTT_CONN_KEEPALIVE >> 8;
p++;
p[0] = MQTT_CONN_KEEPALIVE & 0xFF;
p++;
if(MQTT_PROTOCOL_LEVEL == 3) {
p = stringprint(p, clientid, 23); // Limit client ID to first 23 characters.
} else {
if (pgm_read_byte(clientid) != 0) {
p = stringprint(p, clientid);
} else {
p[0] = 0x0;
p++;
p[0] = 0x0;
p++;
DEBUG_PRINTLN(F("SERVER GENERATING CLIENT ID"));
}
}
if (will_topic && pgm_read_byte(will_topic) != 0) {
p = stringprint(p, will_topic);
p = stringprint(p, will_payload);
}
if (pgm_read_byte(username) != 0) {
p = stringprint(p, username);
}
if (pgm_read_byte(password) != 0) {
p = stringprint(p, password);
}
len = p - packet;
packet[1] = len-2; // don't include the 2 bytes of fixed header data
DEBUG_PRINTLN(F("MQTT connect packet:"));
DEBUG_PRINTBUFFER(buffer, len);
return len;
}
/**
* 通过客户端发送信息,主要是通过client发送内容出去
*/
bool Adafruit_MQTT_Client::sendPacket(uint8_t *buffer, uint16_t len) {
uint16_t ret = 0;
while (len > 0) {
if (client->connected()) {
// send 250 bytes at most at a time, can adjust this later based on Client
uint16_t sendlen = len > 250 ? 250 : len;
//Serial.print("Sending: "); Serial.println(sendlen);
ret = client->write(buffer, sendlen);
DEBUG_PRINT(F("Client sendPacket returned: ")); DEBUG_PRINTLN(ret);
len -= ret;
if (ret != sendlen) {
DEBUG_PRINTLN("Failed to send packet.");
return false;
}
} else {
DEBUG_PRINTLN(F("Connection failed!"));
return false;
}
}
return true;
}
/**
* 读取响应内容
* @param buffer 内容缓冲区
* @param maxsize 最大大小
* @param timeout 读取超时时间
*/
uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t maxsize, uint16_t timeout) {
// will read a packet and Do The Right Thing with length
uint8_t *pbuff = buffer;
uint8_t rlen;
// read the packet type:
rlen = readPacket(pbuff, 1, timeout);
if (rlen != 1) return 0;
DEBUG_PRINT(F("Packet Type:\t")); DEBUG_PRINTBUFFER(pbuff, rlen);
pbuff++;
uint32_t value = 0;
uint32_t multiplier = 1;
uint8_t encodedByte;
do {
rlen = readPacket(pbuff, 1, timeout);
if (rlen != 1) return 0;
encodedByte = pbuff[0]; // save the last read val
pbuff++; // get ready for reading the next byte
uint32_t intermediate = encodedByte & 0x7F;
intermediate *= multiplier;
value += intermediate;
multiplier *= 128;
if (multiplier > (128UL*128UL*128UL)) {
DEBUG_PRINT(F("Malformed packet len\n"));
return 0;
}
} while (encodedByte & 0x80);
DEBUG_PRINT(F("Packet Length:\t")); DEBUG_PRINTLN(value);
if (value > (maxsize - (pbuff-buffer) - 1)) {
DEBUG_PRINTLN(F("Packet too big for buffer"));
rlen = readPacket(pbuff, (maxsize - (pbuff-buffer) - 1), timeout);
} else {
rlen = readPacket(pbuff, value, timeout);
}
//DEBUG_PRINT(F("Remaining packet:\t")); DEBUG_PRINTBUFFER(pbuff, rlen);
return ((pbuff - buffer)+rlen);
}
/***
* 构建订阅主题信息
* @param packet 构建包
* @param topic 主题
* @param qos 订阅主题信息等级
*/
uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic,
uint8_t qos) {
uint8_t *p = packet;
uint16_t len;
// 订阅命令
p[0] = MQTT_CTRL_SUBSCRIBE << 4 | MQTT_QOS_1 << 1;
// fill in packet[1] last
p+=2;
// packet identifier. used for checking SUBACK
p[0] = (packet_id_counter >> 8) & 0xFF;
p[1] = packet_id_counter & 0xFF;
p+=2;
// increment the packet id
packet_id_counter++;
p = stringprint(p, topic);
p[0] = qos;
p++;
len = p - packet;
packet[1] = len-2; // don't include the 2 bytes of fixed header data
DEBUG_PRINTLN(F("MQTT subscription packet:"));
DEBUG_PRINTBUFFER(buffer, len);
return len;
}
bool Adafruit_MQTT_Client::connected() {
// Return true if connected, false if not connected.
return client->connected();
}
bool Adafruit_MQTT::disconnect() {
// Construct and send disconnect packet.
uint8_t len = disconnectPacket(buffer);
if (! sendPacket(buffer, len))
DEBUG_PRINTLN(F("Unable to send disconnect packet"));
return disconnectServer();
}
bool Adafruit_MQTT_Client::disconnectServer() {
// Stop connection if connected and return success (stop has no indication of
// failure).
if (client->connected()) {
client->stop();
}
return true;
}
// 类定义
class Adafruit_MQTT_Subscribe {
public:
Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feedname, uint8_t q=0);
void setCallback(SubscribeCallbackUInt32Type callb);
void setCallback(SubscribeCallbackDoubleType callb);
void setCallback(SubscribeCallbackBufferType callb);
void setCallback(AdafruitIO_MQTT *io, SubscribeCallbackIOType callb);
void removeCallback(void);
const char *topic;
uint8_t qos;
uint8_t lastread[SUBSCRIPTIONDATALEN];
// Number valid bytes in lastread. Limited to SUBSCRIPTIONDATALEN-1 to
// ensure nul terminating lastread.
uint16_t datalen;
SubscribeCallbackUInt32Type callback_uint32t;
SubscribeCallbackDoubleType callback_double;
SubscribeCallbackBufferType callback_buffer;
SubscribeCallbackIOType callback_io;
AdafruitIO_MQTT *io_mqtt;
private:
Adafruit_MQTT *mqtt;
};
/**
* 创建主题对象
* @param mqttserver 主题对应的mqtt服务器
* @param feed 主题名字
* @param q 主题等级
*/
Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
const char *feed, uint8_t q) {
mqtt = mqttserver;
topic = feed;
qos = q;
datalen = 0;
callback_uint32t = 0;
callback_buffer = 0;
callback_double = 0;
callback_io = 0;
io_mqtt = 0;
}
/**
* 以下是三个回调函数 可以设置也可以不设置
*/
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) {
callback_uint32t = cb;
}
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackDoubleType cb) {
callback_double = cb;
}
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackBufferType cb) {
callback_buffer = cb;
}
void Adafruit_MQTT_Subscribe::setCallback(AdafruitIO_MQTT *io, SubscribeCallbackIOType cb) {
callback_io = cb;
io_mqtt= io;
}
void Adafruit_MQTT_Subscribe::removeCallback(void) {
callback_uint32t = 0;
callback_buffer = 0;
callback_double = 0;
callback_io = 0;
io_mqtt = 0;
}
/**
* 注册订阅主题
* @param Adafruit_MQTT_Subscribe 具体主题对象
*/
bool Adafruit_MQTT::subscribe(Adafruit_MQTT_Subscribe *sub) {
uint8_t i;
// see if we are already subscribed
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] == sub) {
DEBUG_PRINTLN(F("Already subscribed"));
return true;
}
}
if (i==MAXSUBSCRIPTIONS) { // add to subscriptionlist
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] == 0) {
DEBUG_PRINT(F("Added sub ")); DEBUG_PRINTLN(i);
subscriptions[i] = sub;
return true;
}
}
}
DEBUG_PRINTLN(F("no more subscription space :("));
return false;
}
/**
* 取消订阅主题
* @param sub 某一个主题
*/
bool Adafruit_MQTT::unsubscribe(Adafruit_MQTT_Subscribe *sub) {
uint8_t i;
// see if we are already subscribed
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] == sub) {
DEBUG_PRINTLN(F("Found matching subscription and attempting to unsubscribe."));
// Construct and send unsubscribe packet.
uint8_t len = unsubscribePacket(buffer, subscriptions[i]->topic);
// sending unsubscribe failed
if (! sendPacket(buffer, len))
return false;
// if QoS for this subscription is 1 or 2, we need
// to wait for the unsuback to confirm unsubscription
if(subscriptions[i]->qos > 0 && MQTT_PROTOCOL_LEVEL > 3) {
// wait for UNSUBACK
len = readFullPacket(buffer, MAXBUFFERSIZE, CONNECT_TIMEOUT_MS);
DEBUG_PRINT(F("UNSUBACK:\t"));
DEBUG_PRINTBUFFER(buffer, len);
if ((len != 5) || (buffer[0] != (MQTT_CTRL_UNSUBACK << 4))) {
return false; // failure to unsubscribe
}
}
subscriptions[i] = 0;
return true;
}
}
// subscription not found, so we are unsubscribed
return true;
}
// 类定义
class Adafruit_MQTT_Publish {
public:
Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver, const char *feed, uint8_t qos = 0);
bool publish(const char *s);
bool publish(double f, uint8_t precision=2); // Precision controls the minimum number of digits after decimal.
// This might be ignored and a higher precision value sent.
bool publish(int32_t i);
bool publish(uint32_t i);
bool publish(uint8_t *b, uint16_t bLen);
private:
Adafruit_MQTT *mqtt;
const char *topic;
uint8_t qos;
};
// 类实现
/**
* @param mqttserver Adafruit_MQTT 实例
* @param feed 主题
* @param q 主题质量等级
*/
Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver,
const char *feed, uint8_t q) {
mqtt = mqttserver;
topic = feed;
qos = q;
}
// 以下方法用于发布消息,最终还是调用 Adafruit_MQTT的publish方法
bool Adafruit_MQTT_Publish::publish(int32_t i) {
char payload[12];
ltoa(i, payload, 10);
return mqtt->publish(topic, payload, qos);
}
bool Adafruit_MQTT_Publish::publish(uint32_t i) {
char payload[11];
ultoa(i, payload, 10);
return mqtt->publish(topic, payload, qos);
}
bool Adafruit_MQTT_Publish::publish(double f, uint8_t precision) {
char payload[41]; // Need to technically hold float max, 39 digits and minus sign.
dtostrf(f, 0, precision, payload);
return mqtt->publish(topic, payload, qos);
}
bool Adafruit_MQTT_Publish::publish(const char *payload) {
return mqtt->publish(topic, payload, qos);
}
//publish buffer of arbitrary length
bool Adafruit_MQTT_Publish::publish(uint8_t *payload, uint16_t bLen) {
return mqtt->publish(topic, payload, bLen, qos);
}
bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos) {
return publish(topic, (uint8_t*)(data), strlen(data), qos);
}
/**
* 发布消息
* @param topic 主题
* @param data 消息内容
* @param blen内容长度
* @param qos 消息质量等级
* @return 返回是否发送成功
*/
bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen, uint8_t qos) {
// 构建发布报文.
uint16_t len = publishPacket(buffer, topic, data, bLen, qos);
// 发送报文
if (!sendPacket(buffer, len))
return false;
// If QOS level is high enough verify the response packet.
if (qos > 0) {
// 读取响应信息
len = readFullPacket(buffer, MAXBUFFERSIZE, PUBLISH_TIMEOUT_MS);
DEBUG_PRINT(F("Publish QOS1+ reply:\t"));
DEBUG_PRINTBUFFER(buffer, len);
if (len != 4)
return false;
if ((buffer[0] >> 4) != MQTT_CTRL_PUBACK)
return false;
uint16_t packnum = buffer[2];
packnum <<= 8;
packnum |= buffer[3];
// we increment the packet_id_counter right after publishing so inc here too to match
packnum++;
if (packnum != packet_id_counter)
return false;
}
return true;
}
注意:
/**
* 保持心跳
*/
bool Adafruit_MQTT::ping(uint8_t num = 1) {
//flushIncoming(100);
while (num--) {
// 构建心跳报文.
uint8_t len = pingPacket(buffer);
// 发送报文
if (!sendPacket(buffer, len))
continue;
// Process ping reply.
len = processPacketsUntil(buffer, MQTT_CTRL_PINGRESP, PING_TIMEOUT_MS);
if (buffer[0] == (MQTT_CTRL_PINGRESP << 4))
return true;
}
return false;
}
/**
* 读取最近有响应的主题
*/
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
uint16_t i, topiclen, datalen;
// Check if data is available to read.
uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet
if (!len)
return NULL; // No data available, just quit.
DEBUG_PRINT("Packet len: "); DEBUG_PRINTLN(len);
DEBUG_PRINTBUFFER(buffer, len);
if (len<3) return NULL;
if ((buffer[0] & 0xF0) != (MQTT_CTRL_PUBLISH) << 4) return NULL;
// Parse out length of packet.
topiclen = buffer[3];
DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINTLN(topiclen);
// Find subscription associated with this packet.
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i]) {
// Skip this subscription if its name length isn't the same as the
// received topic name.
if (strlen(subscriptions[i]->topic) != topiclen)
continue;
// Stop if the subscription topic matches the received topic. Be careful
// to make comparison case insensitive.
if (strncasecmp((char*)buffer+4, subscriptions[i]->topic, topiclen) == 0) {
DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i);
break;
}
}
}
if (i==MAXSUBSCRIPTIONS) return NULL; // matching sub not found ???
uint8_t packet_id_len = 0;
uint16_t packetid = 0;
// Check if it is QoS 1, TODO: we dont support QoS 2
if ((buffer[0] & 0x6) == 0x2) {
packet_id_len = 2;
packetid = buffer[topiclen+4];
packetid <<= 8;
packetid |= buffer[topiclen+5];
}
// zero out the old data
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);
datalen = len - topiclen - packet_id_len - 4;
if (datalen > SUBSCRIPTIONDATALEN) {
datalen = SUBSCRIPTIONDATALEN-1; // cut it off
}
// extract out just the data, into the subscription object itself
memmove(subscriptions[i]->lastread, buffer+4+topiclen+packet_id_len, datalen);
subscriptions[i]->datalen = datalen;
DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen);
DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread);
if ((MQTT_PROTOCOL_LEVEL > 3) &&(buffer[0] & 0x6) == 0x2) {
uint8_t ackpacket[4];
// Construct and send puback packet.
uint8_t len = pubackPacket(ackpacket, packetid);
if (!sendPacket(ackpacket, len))
DEBUG_PRINT(F("Failed"));
}
// return the valid matching subscription
return subscriptions[i];
}
/**
* 处理主题消息
* @param timeout 超时时间
*/
void Adafruit_MQTT::processPackets(int16_t timeout) {
uint32_t elapsed = 0, endtime, starttime = millis();
while (elapsed < (uint32_t)timeout) {
// 获取主题消息内容
Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed);
// 开始处理
if (sub) {
//Serial.println("**** sub packet received");
if (sub->callback_uint32t != NULL) {
// huh lets do the callback in integer mode
uint32_t data = 0;
data = atoi((char *)sub->lastread);
//Serial.print("*** calling int callback with : "); Serial.println(data);
sub->callback_uint32t(data);
}
else if (sub->callback_double != NULL) {
// huh lets do the callback in doublefloat mode
double data = 0;
data = atof((char *)sub->lastread);
//Serial.print("*** calling double callback with : "); Serial.println(data);
sub->callback_double(data);
}
else if (sub->callback_buffer != NULL) {
// huh lets do the callback in buffer mode
//Serial.print("*** calling buffer callback with : "); Serial.println((char *)sub->lastread);
sub->callback_buffer((char *)sub->lastread, sub->datalen);
}
else if (sub->callback_io != NULL) {
// huh lets do the callback in io mode
//Serial.print("*** calling io instance callback with : "); Serial.println((char *)sub->lastread);
((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, sub->datalen);
}
}
// keep track over elapsed time
endtime = millis();
if (endtime < starttime) {
starttime = endtime; // looped around!")
}
elapsed += (endtime - starttime);
}
}
/**
* 获取错误code对应的信息
*/
const __FlashStringHelper* Adafruit_MQTT::connectErrorString(int8_t code) {
switch (code) {
case 1: return F("The Server does not support the level of the MQTT protocol requested");
case 2: return F("The Client identifier is correct UTF-8 but not allowed by the Server");
case 3: return F("The MQTT service is unavailable");
case 4: return F("The data in the user name or password is malformed");
case 5: return F("Not authorized to connect");
case 6: return F("Exceeded reconnect rate limit. Please try again later.");
case 7: return F("You have been banned from connecting. Please contact the MQTT server administrator for more details.");
case -1: return F("Connection failed");
case -2: return F("Failed to subscribe");
default: return F("Unknown error");
}
}
// Uncomment/comment to turn on/off debug output messages.
//#define MQTT_DEBUG
// Uncomment/comment to turn on/off error output messages.
#define MQTT_ERROR
// Set where debug messages will be printed.
#define DEBUG_PRINTER Serial
// If using something like Zero or Due, change the above to SerialUSB
// Define actual debug output functions when necessary.
#ifdef MQTT_DEBUG
#define DEBUG_PRINT(...) { DEBUG_PRINTER.print(__VA_ARGS__); }
#define DEBUG_PRINTLN(...) { DEBUG_PRINTER.println(__VA_ARGS__); }
#define DEBUG_PRINTBUFFER(buffer, len) { printBuffer(buffer, len); }
#else
#define DEBUG_PRINT(...) {}
#define DEBUG_PRINTLN(...) {}
#define DEBUG_PRINTBUFFER(buffer, len) {}
#endif
#ifdef MQTT_ERROR
#define ERROR_PRINT(...) { DEBUG_PRINTER.print(__VA_ARGS__); }
#define ERROR_PRINTLN(...) { DEBUG_PRINTER.println(__VA_ARGS__); }
#define ERROR_PRINTBUFFER(buffer, len) { printBuffer(buffer, len); }
#else
#define ERROR_PRINT(...) {}
#define ERROR_PRINTLN(...) {}
#define ERROR_PRINTBUFFER(buffer, len) {}
#endif
正常情况下,debug不打开,如果开发者需要显示详细内容,可以手动修改库文件或者自己手动加方法处理。
/***************************************************
Adafruit MQTT Library ESP8266 Example
Must use ESP8266 Arduino from:
https://github.com/esp8266/Arduino
Works great with Adafruit's Huzzah ESP board & Feather
----> https://www.adafruit.com/product/2471
----> https://www.adafruit.com/products/2821
Adafruit invests time and resources providing this open source code,
please support Adafruit and open-source hardware by purchasing
products from Adafruit!
Written by Tony DiCola for Adafruit Industries.
MIT license, all text above must be included in any redistribution
****************************************************/
#include <ESP8266WiFi.h>
#include "Adafruit_MQTT.h" // https://github.com/adafruit/Adafruit_MQTT_Library
#include "Adafruit_MQTT_Client.h" //https://github.com/adafruit/Adafruit_MQTT_Library
/************************* WiFi Access Point *********************************/
#define WLAN_SSID "...your SSID..." // wifi账号
#define WLAN_PASS "...your password..." // wifi密码
/************************* Adafruit.io Setup *********************************/
// 下面信息是mqtt服务器的配置信息 不过博哥没有该账号
#define AIO_SERVER "io.adafruit.com"
#define AIO_SERVERPORT 1883 // use 8883 for SSL
#define AIO_USERNAME "...your AIO username (see https://accounts.adafruit.com)..."
#define AIO_KEY "...your AIO key..."
/************ Global State (you don't need to change this!) ******************/
// Create an ESP8266 WiFiClient class to connect to the MQTT server.
WiFiClient client;
// or... use WiFiFlientSecure for SSL
//WiFiClientSecure client;
// mqtt客户端对象
// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details.
Adafruit_MQTT_Client mqtt(&client, AIO_SERVER, AIO_SERVERPORT, AIO_USERNAME, AIO_KEY);
/****************************** Feeds ***************************************/
// 发布主题消息对象
// Setup a feed called 'photocell' for publishing.
// Notice MQTT paths for AIO follow the form: <username>/feeds/<feedname>
Adafruit_MQTT_Publish photocell = Adafruit_MQTT_Publish(&mqtt, AIO_USERNAME "/feeds/photocell");
// 订阅主题消息对象
// Setup a feed called 'onoff' for subscribing to changes.
Adafruit_MQTT_Subscribe onoffbutton = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/onoff");
/*************************** Sketch Code ************************************/
// Bug workaround for Arduino 1.6.6, it seems to need a function declaration
// for some reason (only affects ESP8266, likely an arduino-builder bug).
void MQTT_connect();
void setup() {
Serial.begin(115200);
delay(10);
Serial.println(F("Adafruit MQTT demo"));
// Connect to WiFi access point.
Serial.println();
Serial.println();
Serial.print("Connecting to ");
Serial.println(WLAN_SSID);
WiFi.begin(WLAN_SSID, WLAN_PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
// Setup MQTT subscription for onoff feed.
mqtt.subscribe(&onoffbutton);
}
uint32_t x=0;
void loop() {
// Ensure the connection to the MQTT server is alive (this will make the first
// connection and automatically reconnect when disconnected). See the MQTT_connect
// function definition further below.
MQTT_connect();
// this is our 'wait for incoming subscription packets' busy subloop
// try to spend your time here
Adafruit_MQTT_Subscribe *subscription;
// 在5s内判断是否有订阅消息进来
while ((subscription = mqtt.readSubscription(5000))) {
// 判断是否是我们对应的主题
if (subscription == &onoffbutton) {
Serial.print(F("Got: "));
// 打印主题信息内容
Serial.println((char *)onoffbutton.lastread);
}
}
// Now we can publish stuff!
Serial.print(F("\nSending photocell val "));
Serial.print(x);
Serial.print("...");
// 发布消息
if (! photocell.publish(x++)) {
Serial.println(F("Failed"));
} else {
Serial.println(F("OK!"));
}
// ping the server to keep the mqtt connection alive
// NOT required if you are publishing once every KEEPALIVE seconds
// 心跳请求
if(! mqtt.ping()) {
mqtt.disconnect();
}
}
// Function to connect and reconnect as necessary to the MQTT server.
// Should be called in the loop function and it will take care if connecting.
void MQTT_connect() {
int8_t ret;
// Stop if already connected.
if (mqtt.connected()) {
return;
}
Serial.print("Connecting to MQTT... ");
uint8_t retries = 3;
// 连接mqtt服务器
while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
// 连接错误时显示连接错误内容
Serial.println(mqtt.connectErrorString(ret));
Serial.println("Retrying MQTT connection in 5 seconds...");
// 断开连接 重试三次
mqtt.disconnect();
delay(5000); // wait 5 seconds
retries--;
if (retries == 0) {
// basically die and wait for WDT to reset me
while (1);
}
}
Serial.println("MQTT Connected!");
}
/***************************************************
Adafruit MQTT Library ESP8266 Example
Must use ESP8266 Arduino from:
https://github.com/esp8266/Arduino
Works great with Adafruit's Huzzah ESP board:
----> https://www.adafruit.com/product/2471
Adafruit invests time and resources providing this open source code,
please support Adafruit and open-source hardware by purchasing
products from Adafruit!
Written by Tony DiCola for Adafruit Industries.
MIT license, all text above must be included in any redistribution
****************************************************/
#include <ESP8266WiFi.h>
#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_Client.h"
/************************* WiFi Access Point *********************************/
#define WLAN_SSID "network"
#define WLAN_PASS "password"
/************************* Adafruit.io Setup *********************************/
#define AIO_SERVER "io.adafruit.com"
#define AIO_SERVERPORT 1883
#define AIO_USERNAME "user"
#define AIO_KEY "key"
/************ Global State (you don't need to change this!) ******************/
// Create an ESP8266 WiFiClient class to connect to the MQTT server.
WiFiClient client;
// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details.
Adafruit_MQTT_Client mqtt(&client, AIO_SERVER, AIO_SERVERPORT, AIO_USERNAME, AIO_USERNAME, AIO_KEY);
/****************************** Feeds ***************************************/
// Setup a feed called 'time' for subscribing to current time
Adafruit_MQTT_Subscribe timefeed = Adafruit_MQTT_Subscribe(&mqtt, "time/seconds");
// Setup a feed called 'slider' for subscribing to changes on the slider
Adafruit_MQTT_Subscribe slider = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/slider", MQTT_QOS_1);
// Setup a feed called 'onoff' for subscribing to changes to the button
Adafruit_MQTT_Subscribe onoffbutton = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/onoff", MQTT_QOS_1);
/*************************** Sketch Code ************************************/
int sec;
int min;
int hour;
int timeZone = -4; // utc-4 eastern daylight time (nyc)
void timecallback(uint32_t current) {
// adjust to local time zone
current += (timeZone * 60 * 60);
// calculate current time
sec = current % 60;
current /= 60;
min = current % 60;
current /= 60;
hour = current % 24;
// print hour
if(hour == 0 || hour == 12)
Serial.print("12");
if(hour < 12)
Serial.print(hour);
else
Serial.print(hour - 12);
// print mins
Serial.print(":");
if(min < 10) Serial.print("0");
Serial.print(min);
// print seconds
Serial.print(":");
if(sec < 10) Serial.print("0");
Serial.print(sec);
if(hour < 12)
Serial.println(" am");
else
Serial.println(" pm");
}
void slidercallback(double x) {
Serial.print("Hey we're in a slider callback, the slider value is: ");
Serial.println(x);
}
void onoffcallback(char *data, uint16_t len) {
Serial.print("Hey we're in a onoff callback, the button value is: ");
Serial.println(data);
}
void setup() {
Serial.begin(115200);
delay(10);
Serial.println(F("Adafruit MQTT demo"));
// Connect to WiFi access point.
Serial.println(); Serial.println();
Serial.print("Connecting to ");
Serial.println(WLAN_SSID);
WiFi.begin(WLAN_SSID, WLAN_PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.println("WiFi connected");
Serial.println("IP address: "); Serial.println(WiFi.localIP());
timefeed.setCallback(timecallback);
slider.setCallback(slidercallback);
onoffbutton.setCallback(onoffcallback);
// Setup MQTT subscription for time feed.
mqtt.subscribe(&timefeed);
mqtt.subscribe(&slider);
mqtt.subscribe(&onoffbutton);
}
uint32_t x=0;
void loop() {
// Ensure the connection to the MQTT server is alive (this will make the first
// connection and automatically reconnect when disconnected). See the MQTT_connect
// function definition further below.
MQTT_connect();
// this is our 'wait for incoming subscription packets and callback em' busy subloop
// try to spend your time here:
mqtt.processPackets(10000);
// ping the server to keep the mqtt connection alive
// NOT required if you are publishing once every KEEPALIVE seconds
if(! mqtt.ping()) {
mqtt.disconnect();
}
}
// Function to connect and reconnect as necessary to the MQTT server.
// Should be called in the loop function and it will take care if connecting.
void MQTT_connect() {
int8_t ret;
// Stop if already connected.
if (mqtt.connected()) {
return;
}
Serial.print("Connecting to MQTT... ");
uint8_t retries = 3;
while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
Serial.println(mqtt.connectErrorString(ret));
Serial.println("Retrying MQTT connection in 10 seconds...");
mqtt.disconnect();
delay(10000); // wait 10 seconds
retries--;
if (retries == 0) {
// basically die and wait for WDT to reset me
while (1);
}
}
Serial.println("MQTT Connected!");
}
#include <ESP8266WiFi.h>
#include <Adafruit_MQTT.h>
#include <Adafruit_MQTT_Client.h>
#include <ESP8266HTTPClient.h>
#include <ArduinoJson.h>
#include <EEPROM.h>
#include <Ticker.h>
#include "H_project.h"
#define MAGIC_NUMBER 0xAA
int state;
WiFiClient espClient;
// 订阅主题消息对象
Adafruit_MQTT_Client *mqtt = NULL;
Adafruit_MQTT_Subscribe *onoffLED = NULL;
//声明方法
void initSystem();
void initOneNetMqtt();
void onoffcallback(char *payload, uint16_t length);
void saveConfig();
void loadConfig();
bool parseRegisterResponse();
void parseOneNetMqttResponse(char* payload);
/**
* 初始化
*/
void setup() {
initSystem();
initOneNetMqtt();
}
void loop() {
ESP.wdtFeed();
connectToOneNetMqtt();
mqtt->processPackets(5000);
if(!(mqtt->ping())) {
mqtt->disconnect();
}
}
void initSystem(){
int cnt = 0;
Serial.begin (115200);
Serial.println("\r\n\r\nStart ESP8266 MQTT");
Serial.print("Firmware Version:");
Serial.println(VER);
Serial.print("SDK Version:");
Serial.println(ESP.getSdkVersion());
wifi_station_set_auto_connect(0);//关闭自动连接
ESP.wdtEnable(5000);
WiFi.disconnect();
delay(100);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
cnt++;
Serial.print(".");
if(cnt>=40){
cnt = 0;
//重启系统
delayRestart(1);
}
}
pinMode(LED_BUILTIN, OUTPUT);
loadConfig();
//还没有注册
if(strcmp(config.deviceid,DEFAULT_ID) == 0){
int tryAgain = 0;
while(!registerDeviceToOneNet()){
Serial.print(".");
delay(500);
tryAgain++;
if(tryAgain == 5){
//尝试5次
tryAgain = 0;
//重启系统
delayRestart(1);
}
}
if(!parseRegisterResponse()){
//重启系统
delayRestart(1);
while(1);
}
}
}
void initOneNetMqtt(){
if(mqtt != NULL){
delete(mqtt);
}
if(onoffLED != NULL){
delete(onoffLED);
}
/**
* 参考OneNet mqtt连接报文
* ClientIdentifier: 创建设备时得到的设备ID
* UserName: 注册产品时,平台分配的产品ID
* UserPassword: 为设备的鉴权信息(即唯一设备编号,SN),或者为apiKey
*/
mqtt = new Adafruit_MQTT_Client(&espClient, mqttServer, mqttPort, config.deviceid, PRODUCT_ID, API_KEY);
onoffLED = new Adafruit_MQTT_Subscribe(mqtt, TOPIC);
onoffLED->setCallback(onoffcallback);
mqtt->subscribe(onoffLED);
}
void onoffcallback(char *payload, uint16_t length) {
Serial.print("Message arrived =");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
parseOneNetMqttResponse((char *)payload);
}
/*
* 保存参数到EEPROM
*/
void saveConfig()
{
Serial.println("Save OneNet config!");
Serial.print("deviceId:");
Serial.println(config.deviceid);
EEPROM.begin(150);
uint8_t *p = (uint8_t*)(&config);
for (int i = 0; i < sizeof(config); i++)
{
EEPROM.write(i, *(p + i));
}
EEPROM.commit();
}
/*
* 从EEPROM加载参数
*/
void loadConfig()
{
EEPROM.begin(150);
uint8_t *p = (uint8_t*)(&config);
for (int i = 0; i < sizeof(config); i++)
{
*(p + i) = EEPROM.read(i);
}
EEPROM.commit();
if (config.magic != MAGIC_NUMBER)
{
strcpy(config.deviceid, DEFAULT_ID);
config.magic = MAGIC_NUMBER;
saveConfig();
Serial.println("Restore config!");
}
Serial.println("-----Read config-----");
Serial.print("deviceId:");
Serial.println(config.deviceid);
Serial.println("-------------------");
}
/**
* 解析mqtt数据
*/
void parseOneNetMqttResponse(char* payload){
Serial.println("start parseOneNetMqttResponse");
StaticJsonBuffer<100> jsonBuffer;
// StaticJsonBuffer 在栈区分配内存 它也可以被 DynamicJsonBuffer(内存在堆区分配) 代替
// DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.parseObject(payload);
// Test if parsing succeeds.
if (!root.success()) {
Serial.println("parseObject() failed");
return ;
}
String deviceId = root["Did"];
int status = root["sta"];
if(strcmp(config.deviceid,deviceId.c_str()) == 0){
if (status == 1) {
digitalWrite(LED_BUILTIN, LOW);
} else {
digitalWrite(LED_BUILTIN, HIGH);
}
}
}
/**
* 解析注册返回结果
*/
bool parseRegisterResponse(){
Serial.println("start parseRegisterResponse");
StaticJsonBuffer<200> jsonBuffer;
// StaticJsonBuffer 在栈区分配内存 它也可以被 DynamicJsonBuffer(内存在堆区分配) 代替
// DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.parseObject(response);
// Test if parsing succeeds.
if (!root.success()) {
Serial.println("parseObject() failed");
return false;
}
int errno = root["errno"];
if(errno !=0){
Serial.println("register failed!");
return false;
}else{
Serial.println("register sucess!");
strcpy(config.deviceid, root["data"]["device_id"]);
saveConfig();
return true;
}
}
GPIO 是指单片机(微控制器)主板上的一组引脚,这些引脚可以发送或接收电信号,但它们不是为任何特定目的而设计的,这就是为什么它们被称为“通用”IO。
本文对比了几款适合物联网开发的盒子硬件参数,供大家参考。
umqtt 是 MicroPython 的一个轻量级 MQTT 客户端库,使得在微控制器上使用 MQTT 协议变得简单易行。本文将介绍 umqtt 的实用方法,帮助您更好地在项目中应用这一技术。
本方案是一个基于ESP32-CAM + 物联网的图像采集方案。
相信很多人都有把绿植给养死的经历,可能是浇水过多、忘记浇水、较长时间不在家不能浇水等,本文介绍一种可以灵活定制的智能浇花方案。
本文将介绍如何将 ESP-Touch 协议用于基于 ESP32 的物联网项目/设备,使用 ESP-Touch,您将不再需要对 Wi-Fi 凭据进行硬编码,因为您可以随时轻松更改它。
ESP-HaloPanel 是一款基于 ESP32-C2 模组开发的超低成本智能家居面板,面板中央配备一个 1.28 寸圆形屏用于显示信息,屏幕周围均匀分布了 6 个触摸按键,用于控制操作。
Hi3861开发板是一片大约2cm*5cm大小的开发板,是一款高度集成的2.4GHz WLAN SoC芯片,集成IEEE 802.11b/g/n基带和RF(Radio Frequency)电路。支持OpenHarmony,并配套提供开放、易用的开发和调试运行环境。
ESP-NOW2MQTT库提供了一种轻量级、高效的通信方案,使电池供电的ESP32设备能够快速、稳定地与MQTT服务器通信。
本文将向您展示一些关于2023年机器人和物联网的最有趣和创新的micro:bit项目。
本文档作为UNO R4 WiFi的技术概览,您将找到一系列资源和指南链接,帮助您开始下一个项目。