言: MQTT廣泛應用于工業物聯網、智能家居、各類智能制造或各類自動化場景等。MQTT是一個基于客戶端-服務器的消息發布/訂閱傳輸協議,在很多受限的環境下,比如說機器與機器通信、機器與物聯網通信等。好了,科普的廢話不多說,下面直接通過.NET環境來實現一套MQTT通信demo,實現服務端與客戶端的雙邊消息發布與訂閱的功能和演示。
開發環境:
VS2022 + .NET 6 + Webapi / 控制臺
1、新建一個webapi項目,用來后面做測試使用
2、新建一個繼承自IHostedService的服務,用于隨著webapi程序的啟動而自動執行。(最終代碼在文末)
3、引入 MQTTNet 包,該項目提供了.net環境下的MQTT通信協議支持,這款框架很優秀,此處直接引用它來進行使用。
4、在上面的MqttHostService類里面,開始方法里面新增初始化MQTT服務端的一些功能,例如 IP、端口號、事件等等。
5、mqtt服務端支持的一系列功能很多,大佬們可以自行去嘗試一些新發現,此處只使用若干個簡單功能。
6、添加客戶端連接事件、連接關閉事件
7、由于事件要用的可能有點多,此處就不一一例舉了,可以直接看以下的代碼,以及有關注釋來理解。
8、事件觸發時候,打印輸出
9、輸出之前,記錄一個當前事件名稱標記一下,用于可以更加清楚看出是哪個事件輸出的。
10、對MqttHostService類進行注冊,用于程序啟動時候跟隨啟動。
11、上面貌似設計的不是很友好,所以把mqtt服務實例單獨弄出來,寫入到單獨的類里面做成屬性,供方便調用。
12、把先前的一些東西改一下,換成使用上面步驟的屬性來直接調用使用。
13、運行一下,看看是否可以成功,顯示服務已啟動,說明服務啟動時OK的了.
14、新增一個控制臺程序 MqttClient,用于模擬客戶端。
15、創建客戶端啟動以及有關配置信息和有關事件,如圖。具體使用可以看代碼注釋,就不過多解釋了。
16、在program類里面,調用客戶端啟動方法,用于測試使用。
17、上面客戶端對應的三個事件的實現如圖,同時進行有關信息的打印輸出。
18、啟動服務端,然后啟動客戶端,可以看到服務端有一個連接失敗的消息,這個是因為上面配置的客戶端用戶名是admin,密碼是1234567,而服務端配置的規則是,用戶名是admin 密碼是123456
19、密碼改回正常匹配項以后,再重新運行試試看,可以看到客戶端與服務端連接上了。
20、如果關閉客戶端,也可以看到服務端會進入客戶端關閉事件內。
21、把上面主題訂閱的內容寫到連接成功以后的事件里面,不然客戶端連接期間,可能就執行了主題訂閱,會存在訂閱失敗的情況。改為寫入到連接成功以后的事件里面,可以保證主題訂閱肯定是在客戶端連接成功以后才執行的。
22、接下來測試服務端消息推送,在MqttService服務里面,新增一個方法,用來執行mqtt服務端發布主題消息使用。有關配置信息和消息格式,如圖所示。
23、新增一個API控制器,用來測試使用。API參數直接拿來進行消息的推送使用。
24、運行服務端和客戶端,并訪問剛剛新增的api接口,手動隨意輸入一條消息,可以看到客戶端訂閱的主題消息已經被實時接收到了。
25、接下來對客戶端新增一個消息推送的方法,用來測試客戶端消息發布的功能。有關消息格式和調用,如圖所示,以及注釋部分的說明。
26、客戶端program類里面,客戶端連接以后,通過手動回車,來執行客戶端發布消息。
27、再次啟動服務端和客戶端
28、然后客戶端內按一下回車,執行消息發布功能。可以看到,服務端成功接收到了客戶端發過來的主題消息。
29、接下來測試客戶端與客戶端之間的消息發布與訂閱,為了模擬多客戶端效果,把上面客戶端已經編譯好的文件拷貝一份出來。
30、然后本地的代碼進行一些修改,用來當做第二個客戶端程序。所以客戶端id也進行變更為 testclient02
31、對客戶端訂閱的主題,也改成 topic_02
32、啟動服務端,以及拷貝出來的客戶端1,和上面修改了部分代碼的客戶端2,保證都已經連接上服務端。
33、調用服務端的api接口,由于服務端發布的消息是發布給topic_01的,所以只有客戶端1可以接收到消息。
34、客戶端1執行回車,用于發布一段消息給主題 topic_02,可以看到客戶端01發布的消息,同時被服務端和客戶端02接收到了。因為服務端是總指揮,所以客戶端發布的消息都會經過服務端,從而服務端都可以接收到連接的客戶端發布的所有消息。
35、測試數據保持,下面先對客戶端1進行斷開,然后再重新連接客戶端1,可以看到客戶端1直接接收到了它訂閱的主題的上一次最新的消息內容,這個就是消息里面,Retain屬性設為True的結果,用于讓服務端記憶該主題消息使用的。如果設為false,就沒有這個效果了,大佬們也可以自己嘗試。
36、最終的服務端代碼:
MqttHostService:
public class MqttHostService : IHostedService, IDisposable
{
public void Dispose()
{
}
const string ServerClientId="SERVER";
public Task StartAsync(CancellationToken cancellationToken)
{
MqttServerOptionsBuilder optionsBuilder=new MqttServerOptionsBuilder();
optionsBuilder.WithDefaultEndpoint();
optionsBuilder.WithDefaultEndpointPort(10086); // 設置 服務端 端口號
optionsBuilder.WithConnectionBacklog(1000); // 最大連接數
MqttServerOptions options=optionsBuilder.Build();
MqttService._mqttServer=new MqttFactory().CreateMqttServer(options);
MqttService._mqttServer.ClientConnectedAsync +=_mqttServer_ClientConnectedAsync; //客戶端連接事件
MqttService._mqttServer.ClientDisconnectedAsync +=_mqttServer_ClientDisconnectedAsync; // 客戶端關閉事件
MqttService._mqttServer.ApplicationMessageNotConsumedAsync +=_mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件
MqttService._mqttServer.ClientSubscribedTopicAsync +=_mqttServer_ClientSubscribedTopicAsync; // 客戶端訂閱主題事件
MqttService._mqttServer.ClientUnsubscribedTopicAsync +=_mqttServer_ClientUnsubscribedTopicAsync; // 客戶端取消訂閱事件
MqttService._mqttServer.StartedAsync +=_mqttServer_StartedAsync; // 啟動后事件
MqttService._mqttServer.StoppedAsync +=_mqttServer_StoppedAsync; // 關閉后事件
MqttService._mqttServer.InterceptingPublishAsync +=_mqttServer_InterceptingPublishAsync; // 消息接收事件
MqttService._mqttServer.ValidatingConnectionAsync +=_mqttServer_ValidatingConnectionAsync; // 用戶名和密碼驗證有關
MqttService._mqttServer.StartAsync();
return Task.CompletedTask;
}
/// <summary>
/// 客戶端訂閱主題事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientSubscribedTopicAsync:客戶端ID=【{arg.ClientId}】訂閱的主題=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 關閉后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StoppedAsync(EventArgs arg)
{
Console.WriteLine($"StoppedAsync:MQTT服務已關閉……");
return Task.CompletedTask;
}
/// <summary>
/// 用戶名和密碼驗證有關
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
arg.ReasonCode=MqttConnectReasonCode.Success;
if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456")
{
arg.ReasonCode=MqttConnectReasonCode.Banned;
Console.WriteLine($"ValidatingConnectionAsync:客戶端ID=【{arg.ClientId}】用戶名或密碼驗證錯誤 ");
}
return Task.CompletedTask;
}
/// <summary>
/// 消息接收事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
if (string.Equals(arg.ClientId, ServerClientId))
{
return Task.CompletedTask;
}
Console.WriteLine($"InterceptingPublishAsync:客戶端ID=【{arg.ClientId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 啟動后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StartedAsync(EventArgs arg)
{
Console.WriteLine($"StartedAsync:MQTT服務已啟動……");
return Task.CompletedTask;
}
/// <summary>
/// 客戶端取消訂閱事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientUnsubscribedTopicAsync:客戶端ID=【{arg.ClientId}】已取消訂閱的主題=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageNotConsumedAsync:發送端ID=【{arg.SenderId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 客戶端斷開時候觸發
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.WriteLine($"ClientDisconnectedAsync:客戶端ID=【{arg.ClientId}】已斷開, 地址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 客戶端連接時候觸發
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.WriteLine($"ClientConnectedAsync:客戶端ID=【{arg.ClientId}】已連接, 用戶名=【{arg.UserName}】地址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
MqttService:
public class MqttService
{
public static MqttServer _mqttServer { get; set; }
public static void PublishData(string data)
{
var message=new MqttApplicationMessage
{
Topic="topic_01",
Payload=Encoding.Default.GetBytes(data),
QualityOfServiceLevel=MqttQualityOfServiceLevel.AtLeastOnce,
Retain=true // 服務端是否保留消息。true為保留,如果有新的訂閱者連接,就會立馬收到該消息。
};
_mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 發送消息給有訂閱 topic_01的客戶端
{
SenderClientId="Server_01"
}).GetAwaiter().GetResult();
}
}
37、最終的客戶端代碼:
MqttClientService:
public class MqttClientService
{
public static IMqttClient _mqttClient;
public void MqttClientStart()
{
var optionsBuilder=new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1", 10086) // 要訪問的mqtt服務端的 ip 和 端口號
.WithCredentials("admin", "123456") // 要訪問的mqtt服務端的用戶名和密碼
.WithClientId("testclient02") // 設置客戶端id
.WithCleanSession()
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls=false // 是否使用 tls加密
});
var clientOptions=optionsBuilder.Build();
_mqttClient=new MqttFactory().CreateMqttClient();
_mqttClient.ConnectedAsync +=_mqttClient_ConnectedAsync; // 客戶端連接成功事件
_mqttClient.DisconnectedAsync +=_mqttClient_DisconnectedAsync; // 客戶端連接關閉事件
_mqttClient.ApplicationMessageReceivedAsync +=_mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件
_mqttClient.ConnectAsync(clientOptions);
}
/// <summary>
/// 客戶端連接關閉事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine($"客戶端已斷開與服務端的連接……");
return Task.CompletedTask;
}
/// <summary>
/// 客戶端連接成功事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine($"客戶端已連接服務端……");
// 訂閱消息主題
// MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不確認收到消息,并且消息不被發送者存儲和重新發送提供與底層 TCP 協議相同的保證。
// 1: 保證一條消息至少有一次會傳遞給接收方。發送方存儲消息,直到它從接收方收到確認收到消息的數據包。一條消息可以多次發送或傳遞。
// 2: 保證每條消息僅由預期的收件人接收一次。級別2是最安全和最慢的服務質量級別,保證由發送方和接收方之間的至少兩個請求/響應(四次握手)。
_mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce);
return Task.CompletedTask;
}
/// <summary>
/// 收到消息事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageReceivedAsync:客戶端ID=【{arg.ClientId}】接收到消息。 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
public void Publish(string data)
{
var message=new MqttApplicationMessage
{
Topic="topic_02",
Payload=Encoding.Default.GetBytes(data),
QualityOfServiceLevel=MqttQualityOfServiceLevel.AtLeastOnce,
Retain=true // 服務端是否保留消息。true為保留,如果有新的訂閱者連接,就會立馬收到該消息。
};
_mqttClient.PublishAsync(message);
}
}
38、后記:MQTT以上演示已經完畢,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加靈活。其實,實際上MQTT的客戶端在現實生產環境場景下,并不需要咱們開發者進行開發,很多硬件設備都支持提供MQTT協議的通信客戶端,所以只需要自己搭建一個服務端,就可以實現實時監控各種設備推送過來的各種信號數據。同時客戶端支持發布消息給其他客戶端,所以就實現了設備與設備之間的一對一信號通信的效果了。如果需要下發信號給硬件設備,MQTT服務端也可以直接下發給某個指定設備來進行實現即可。上面案例只提供入門方案,如果有感興趣的大佬,可以自己去拓展一下,來達到更好的效果。
原文:https://www.cnblogs.com/weskynet/p/16441219.html
發部署在云端的設備接入網關服務就不得不提到MQTT,使用MQTT不論是從設備到設備,還是設備到云端服務的雙向通訊,都可以獲得較好的支持。
MQTT的起源和我的理解
這里基于mosquitto,以一組實際的訂閱、發布,使用tcpdump來觀察MQTT的通訊。
# 使用tcpdump打印指定MQTT服務器所在端口的報文
tcpdump -n -XX -i eth0 port 1883
# 訂閱指定主題的消息
mosquitto_sub -h 172.16.0.12 -t "topic/#" -u user -P password -i "client1"
# 發布消息到指定主題
mosquitto_pub -h 172.16.0.12 -t "topic/" -u user -P password -i "client3" -m "1234567890123456789012345678901234567890"
tcpdump抓取的報文如下:
啟動mosquitto_sub命令獲得如下報文:==================================16:33:28.668926 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [S], seq 1177792969, win 29200, options [mss 1424,sackOK,TS val 530570732 ecr 0,nop,wscale 7], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 003c 5872 4000 3806 556b 5147 9713 ac10 .<Xr@.8.UkQG....
0x0020: 000c a05a 075b 4633 b1c9 0000 0000 a002 ...Z.[F3........
0x0030: 7210 a45e 0000 0204 0590 0402 080a 1f9f r..^............
0x0040: ddec 0000 0000 0103 0307 ..........
16:33:28.669001 IP MQTT.ibm-mqisdp >SUB.41050: Flags [S.], seq 1053629863, ack 1177792970, win 28960, options [mss 1460,sackOK,TS val 3885354501 ecr 530570732,nop,wscale 7], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 003c 0000 4000 4006 a645 ac10 000c 5147 .<..@.@..E....QG
0x0020: 9713 075b a05a 3ecd 1da7 4633 b1ca a012 ...[.Z>...F3....
0x0030: 7120 9309 0000 0204 05b4 0402 080a e795 q...............
0x0040: ce05 1f9f ddec 0103 0307 ..........
16:33:28.672229 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 1, win 229, options [nop,nop,TS val 530570735 ecr 3885354501], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5873 4000 3806 5572 5147 9713 ac10 .4Xs@.8.UrQG....
0x0020: 000c a05a 075b 4633 b1ca 3ecd 1da8 8010 ...Z.[F3..>.....
0x0030: 00e5 320e 0000 0101 080a 1f9f ddef e795 ..2.............
0x0040: ce05 ..
16:33:28.672280 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [P.], seq 1:42, ack 1, win 229, options [nop,nop,TS val 530570735 ecr 3885354501], length 41
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 005d 5874 4000 3806 5548 5147 9713 ac10 .]Xt@.8.UHQG....
0x0020: 000c a05a 075b 4633 b1ca 3ecd 1da8 8018 ...Z.[F3..>.....
0x0030: 00e5 37b7 0000 0101 080a 1f9f ddef e795 ..7.............
0x0040: ce05 1027 0006 4d51 4973 6470 03c2 003c ...'..MQIsdp...<
0x0050: 0007 636c 6965 6e74 3100 0563 636e 6574 ..client1..ccnet
0x0060: 0009 6363 6e65 7430 3930 31 ..ccnet0901
16:33:28.672313 IP MQTT.ibm-mqisdp > SUB.41050: Flags [.], ack 42, win 227, options [nop,nop,TS val 3885354505 ecr 530570735], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 d3a1 4000 4006 d2ab ac10 000c 5147 .4..@.@.......QG
0x0020: 9713 075b a05a 3ecd 1da8 4633 b1f3 8010 ...[.Z>...F3....
0x0030: 00e3 31e3 0000 0101 080a e795 ce09 1f9f ..1.............
0x0040: ddef ..
16:33:28.672562 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 1:5, ack 42, win 227, options [nop,nop,TS val 3885354505 ecr 530570735], length 4
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0038 d3a2 4000 4006 d2a6 ac10 000c 5147 .8..@.@.......QG
0x0020: 9713 075b a05a 3ecd 1da8 4633 b1f3 8018 ...[.Z>...F3....
0x0030: 00e3 11d5 0000 0101 080a e795 ce09 1f9f ................
0x0040: ddef 2002 0000 ......
16:33:28.675753 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 5, win 229, options [nop,nop,TS val 530570739 ecr 3885354505], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5875 4000 3806 5570 5147 9713 ac10 .4Xu@.8.UpQG....
0x0020: 000c a05a 075b 4633 b1f3 3ecd 1dac 8010 ...Z.[F3..>.....
0x0030: 00e5 31d9 0000 0101 080a 1f9f ddf3 e795 ..1.............
0x0040: ce09 ..
16:33:28.675772 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [P.], seq 42:56, ack 5, win 229, options [nop,nop,TS val 530570739 ecr 3885354505], length 14
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0042 5876 4000 3806 5561 5147 9713 ac10 .BXv@.8.UaQG....
0x0020: 000c a05a 075b 4633 b1f3 3ecd 1dac 8018 ...Z.[F3..>.....
0x0030: 00e5 46b6 0000 0101 080a 1f9f ddf3 e795 ..F.............
0x0040: ce09 820c 0001 0007 6363 6e65 742f 2300 ........ccnet/#.
16:33:28.675823 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 5:10, ack 56, win 227, options [nop,nop,TS val 3885354508 ecr 530570739], length 5
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0039 d3a3 4000 4006 d2a4 ac10 000c 5147 .9..@.@.......QG
0x0020: 9713 075b a05a 3ecd 1dac 4633 b201 8018 ...[.Z>...F3....
0x0030: 00e3 a1b8 0000 0101 080a e795 ce0c 1f9f ................
0x0040: ddf3 9003 0001 00 .......
16:33:28.719501 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 10, win 229, options [nop,nop,TS val 530570783 ecr 3885354508], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5877 4000 3806 556e 5147 9713 ac10 .4Xw@.8.UnQG....
0x0020: 000c a05a 075b 4633 b201 3ecd 1db1 8010 ...Z.[F3..>.....
0x0030: 00e5 3197 0000 0101 080a 1f9f de1f e795 ..1.............
0x0040: ce0c ..
終止mosquitto_sub命令獲得如下報文:=================================16:29:54.745664 IP 81.71.151.19.40510 > 172.16.0.12.ibm-mqisdp: Flags [F.], seq 58, ack 12, win 229, options [nop,nop,TS val 530356809 ecr 3885119146], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 e1fc 4000 3806 cbe8 5147 9713 ac10 .4..@.8...QG....
0x0020: 000c 9e3e 075b 0e6c ea6f 989b b7f8 8011 ...>.[.l.o......
0x0030: 00e5 1a34 0000 0101 080a 1f9c 9a49 e792 ...4.........I..
0x0040: 36aa 6.
16:29:54.745761 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.40510: Flags [F.], seq 12, ack 59, win 227, options [nop,nop,TS val 3885140578 ecr 530356809], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 fbe4 4000 4006 aa68 ac10 000c 5147 .4..@.@..h....QG
0x0020: 9713 075b 9e3e 989b b7f8 0e6c ea70 8011 ...[.>.....l.p..
0x0030: 00e3 c67c 0000 0101 080a e792 8a62 1f9c ...|.........b..
0x0040: 9a49 .I
16:29:54.751445 IP 81.71.151.19.40510 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 13, win 229, options [nop,nop,TS val 530356812 ecr 3885140578], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 e1fd 4000 3806 cbe7 5147 9713 ac10 .4..@.8...QG....
0x0020: 000c 9e3e 075b 0e6c ea70 989b b7f9 8010 ...>.[.l.p......
0x0030: 00e5 c677 0000 0101 080a 1f9c 9a4c e792 ...w.........L..
0x0040: 8a62 .b
# 執行mosquitto_pub后得到的報文如下:
16:36:40.721513 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [S], seq 2807988728, win 29200, options [mss 1424,sackOK,TS val 530762785 ecr 0,nop,wscale 7], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 003c 5dd0 4000 3806 500d 5147 9713 ac10 .<].@.8.P.QG....
0x0020: 000c a1ae 075b a75e 81f8 0000 0000 a002 .....[.^........
0x0030: 7210 8378 0000 0204 0590 0402 080a 1fa2 r..x............
0x0040: cc21 0000 0000 0103 0307 .!........
16:36:40.721585 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [S.], seq 4236199326, ack 2807988729, win 28960, options [mss 1460,sackOK,TS val 3885546554 ecr 530762785,nop,wscale 7], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 003c 0000 4000 4006 a645 ac10 000c 5147 .<..@.@..E....QG
0x0020: 9713 075b a1ae fc7f 459e a75e 81f9 a012 ...[....E..^....
0x0030: 7120 9e41 0000 0204 05b4 0402 080a e798 q..A............
0x0040: bc3a 1fa2 cc21 0103 0307 .:...!....
16:36:40.724732 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 1, win 229, options [nop,nop,TS val 530762788 ecr 3885546554], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5dd1 4000 3806 5014 5147 9713 ac10 .4].@.8.P.QG....
0x0020: 000c a1ae 075b a75e 81f9 fc7f 459f 8010 .....[.^....E...
0x0030: 00e5 3d46 0000 0101 080a 1fa2 cc24 e798 ..=F.........$..
0x0040: bc3a .:
16:36:40.724750 IP PUB.41390 > MQTT.ibm-mqisdp: Flags [P.], seq 1:42, ack 1, win 229, options [nop,nop,TS val 530762788 ecr 3885546554], length 41
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 005d 5dd2 4000 3806 4fea 5147 9713 ac10 .]].@.8.O.QG....
0x0020: 000c a1ae 075b a75e 81f9 fc7f 459f 8018 .....[.^....E...
0x0030: 00e5 40ef 0000 0101 080a 1fa2 cc24 e798 ..@..........$..
0x0040: bc3a 1027 0006 4d51 4973 6470 03c2 003c .:.'..MQIsdp...<
0x0050: 0007 636c 6965 6e74 3300 0563 636e 6574 ..client3..ccnet
0x0060: 0009 6363 6e65 7430 3930 31 ..ccnet0901
16:36:40.724787 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [.], ack 42, win 227, options [nop,nop,TS val 3885546557 ecr 530762788], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 f9a2 4000 4006 acaa ac10 000c 5147 .4..@.@.......QG
0x0020: 9713 075b a1ae fc7f 459f a75e 8222 8010 ...[....E..^."..
0x0030: 00e3 3d1c 0000 0101 080a e798 bc3d 1fa2 ..=..........=..
0x0040: cc24 .$
16:36:40.724978 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [P.], seq 1:5, ack 42, win 227, options [nop,nop,TS val 3885546557 ecr 530762788], length 4
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0038 f9a3 4000 4006 aca5 ac10 000c 5147 .8..@.@.......QG
0x0020: 9713 075b a1ae fc7f 459f a75e 8222 8018 ...[....E..^."..
0x0030: 00e3 1d0e 0000 0101 080a e798 bc3d 1fa2 .............=..
0x0040: cc24 2002 0000 .$....
16:36:40.728114 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5dd3 4000 3806 5012 5147 9713 ac10 .4].@.8.P.QG....
0x0020: 000c a1ae 075b a75e 8222 fc7f 45a3 8010 .....[.^."..E...
0x0030: 00e5 3d13 0000 0101 080a 1fa2 cc27 e798 ..=..........'..
0x0040: bc3d .=16:36:40.728144 IP PUB.41390 > MQTT.ibm-mqisdp: Flags [P.], seq 42:82, ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 40
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 005c 5dd4 4000 3806 4fe9 5147 9713 ac10 .\].@.8.O.QG....
0x0020: 000c a1ae 075b a75e 8222 fc7f 45a3 8018 .....[.^."..E...
0x0030: 00e5 a8af 0000 0101 080a 1fa2 cc27 e798 .............'..
0x0040: bc3d 3026 0006 6363 6e65 742f 3132 3334 .=0&..ccnet/1234
0x0050: 3536 3738 3930 3132 3334 3536 3738 3930 5678901234567890
0x0060: 3132 3334 3536 3738 3930 1234567890
16:36:40.728146 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [FP.], seq 82:84, ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 2
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0036 5dd5 4000 3806 500e 5147 9713 ac10 .6].@.8.P.QG....
0x0020: 000c a1ae 075b a75e 824a fc7f 45a3 8019 .....[.^.J..E...
0x0030: 00e5 5cdf 0000 0101 080a 1fa2 cc27 e798 ..\..........'..
0x0040: bc3d e000 .=..
16:36:40.728230 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 96:136, ack 62, win 227, options [nop,nop,TS val 3885546560 ecr 530753903], length 40
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 005c d3a9 4000 4006 d27b ac10 000c 5147 .\..@.@..{....QG
0x0020: 9713 075b a05a 3ecd 1e07 4633 b207 8018 ...[.Z>...F3....
0x0030: 00e3 e34f 0000 0101 080a e798 bc40 1fa2 ...O.........@..
0x0040: a96f 3026 0006 6363 6e65 742f 3132 3334 .o0&..ccnet/1234
0x0050: 3536 3738 3930 3132 3334 3536 3738 3930 5678901234567890
0x0060: 3132 3334 3536 3738 3930 1234567890
16:36:40.728288 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [.], ack 85, win 227, options [nop,nop,TS val 3885546561 ecr 530762791], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 f9a4 4000 4006 aca8 ac10 000c 5147 .4..@.@.......QG
0x0020: 9713 075b a1ae fc7f 45a3 a75e 824d 8010 ...[....E..^.M..
0x0030: 00e3 3ce6 0000 0101 080a e798 bc41 1fa2 ..<..........A..
0x0040: cc27 .'
16:36:40.728367 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [F.], seq 5, ack 85, win 227, options [nop,nop,TS val 3885546561 ecr 530762791], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 f9a5 4000 4006 aca7 ac10 000c 5147 .4..@.@.......QG
0x0020: 9713 075b a1ae fc7f 45a3 a75e 824d 8011 ...[....E..^.M..
0x0030: 00e3 3ce5 0000 0101 080a e798 bc41 1fa2 ..<..........A..
0x0040: cc27 .'
16:36:40.731431 IP 81.71.151.19.41050 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 136, win 229, options [nop,nop,TS val 530762794 ecr 3885546560], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5880 4000 3806 5565 5147 9713 ac10 .4X.@.8.UeQG....
0x0020: 000c a05a 075b 4633 b207 3ecd 1e2f 8010 ...Z.[F3..>../..
0x0030: 00e5 54ce 0000 0101 080a 1fa2 cc2a e798 ..T..........*..
0x0040: bc40 .@
16:36:40.731575 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 6, win 229, options [nop,nop,TS val 530762795 ecr 3885546561], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 0000 4000 3806 ade5 5147 9713 ac10 .4..@.8...QG....
0x0020: 000c a1ae 075b a75e 824d fc7f 45a4 8010 .....[.^.M..E...
0x0030: 00e5 3cdf 0000 0101 080a 1fa2 cc2b e798 ..<..........+..
0x0040: bc41 .A
去掉TCP握手和收報報文的常規交互,可知道MQTT訂閱發布的完整時序如下:
MQTT發布訂閱的實際交互時序圖
可以看到AUHT、SUB、PUB、SEND_MESSAGE的報文非常精簡,基本只是對必須傳送的字段增加了兩個字節的長度,因此其對帶寬是非常節省的。
# 啟動對9001端口的監視
tcpdump -n -XX -i eth0 port 9001
# 訂閱指定主題的消息
mosquitto_sub -h 172.16.0.12 -t "topic/#" -u user -P password -i "client1"
#編寫一個基于websockets訪問mqtt的頁面,如下親測通過; 點擊init按鈕
<html>
<head>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.2/mqttws31.js"></script>
</head>
<body>
<script>
var mqtt;
var host='81.71.87.37';
var port=9001;
// onConnect 事件
function onConnect() {
console.log('connected.');
/*
var raw_message='Hello World!';
message=new Paho.MQTT.Message(raw_message);
message.destinationName='ccnet/up/abc';
console.log('sending message: ' + raw_message );
mqtt.send(message);
*/
// 訂閱 download topic
var subOptions={
qos: 1,
onSuccess: onSubscribe
};
mqtt.subscribe('ccnet/#', subOptions);
}
function onSend() {
console.log('onSend.');
var raw_message='Hello World!';
message=new Paho.MQTT.Message(raw_message);
message.destinationName='ccnet/up/abc';
console.log('sending message: ' + raw_message );
mqtt.send(message);
}
// 訂閱主題成功事件
function onSubscribe(context) {
console.log('subscribe success');
console.log(context);
}
// 連接失敗事件
function onFailure(message) {
console.log('connect failed.');
}
// onMessageArrived 事件
function onMessageArrived(message) {
console.log('new message arrived...');
console.log(message.payloadString);
}
// 建立 MQTT websocket 連接
function MQTTconnect() {
console.log('connecting to ' + host + ':' + port);
mqtt=new Paho.MQTT.Client(host, port, 'clientid');
var options={
timeout: 3,
onSuccess: onConnect,
onFailure: onFailure,
userName: 'ccnet',
password: 'ccnet0901',
mqttVersion: 4
};
mqtt.onMessageArrived=onMessageArrived;
mqtt.connect(options);
}
</script>
<input type="button" value="Init" onclick="javascript:MQTTconnect();"/>
<input type="button" value="Send" onclick="javascript:onSend();"/>
</body>
</html>
# 發布消息到指定主題
mosquitto_pub -h 172.16.0.12 -t "topic/" -u user -P password -i "client3" -m "1234567890123456789012345678901234567890"
TCPDUMP抓取的報文如下:
# 點擊WEB頁面上的Init按鈕之后
18:11:49.183098 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [S], seq 3659468570, win 64240, options [mss 1360,nop,wscale 8,nop,nop,sackOK], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 7b4e 4000 7206 65a0 0e96 6cbb ac10 .4{N@.r.e...l...
0x0020: 000c c737 2329 da1f 0f1a 0000 0000 8002 ...7#)..........
0x0030: faf0 797b 0000 0204 0550 0103 0308 0101 ..y{.....P......
0x0040: 0402 ..
18:11:49.183188 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [S.], seq 3730879023, ack 3659468571, win 29200, options [mss 1460,nop,nop,sackOK,nop,wscale 7], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 0000 4000 4006 1357 ac10 000c 0e96 .4..@.@..W......
0x0020: 6cbb 2329 c737 de60 b22f da1f 0f1b 8012 l.#).7.`./......
0x0030: 7210 7157 0000 0204 05b4 0101 0402 0103 r.qW............
0x0040: 0307 ..
18:11:49.223085 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 1, win 515, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 7b4f 4000 7206 65ab 0e96 6cbb ac10 .({O@.r.e...l...
0x0020: 000c c737 2329 da1f 0f1b de60 b230 5010 ...7#).....`.0P.
0x0030: 0203 2237 0000 .."7..
18:11:49.247331 IP Browser.50999 > MQTT.etlservicemgr: Flags [P.], seq 1:511, ack 1, win 515, length 510
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0226 7b50 4000 7206 63ac 0e96 6cbb ac10 .&{P@.r.c...l...
0x0020: 000c c737 2329 da1f 0f1b de60 b230 5018 ...7#).....`.0P.
0x0030: 0203 d122 0000 4745 5420 2f6d 7174 7420 ..."..GET./mqtt.
0x0040: 4854 5450 2f31 2e31 0d0a 486f 7374 3a20 HTTP/1.1..Host:.
0x0050: 3831 2e37 312e 3837 2e33 373a 3930 3031 81.71.87.37:9001
0x0060: 0d0a 436f 6e6e 6563 7469 6f6e 3a20 5570 ..Connection:.Up
0x0070: 6772 6164 650d 0a50 7261 676d 613a 206e grade..Pragma:.n
0x0080: 6f2d 6361 6368 650d 0a43 6163 6865 2d43 o-cache..Cache-C
0x0090: 6f6e 7472 6f6c 3a20 6e6f 2d63 6163 6865 ontrol:.no-cache
0x00a0: 0d0a 5573 6572 2d41 6765 6e74 3a20 4d6f ..User-Agent:.Mo
0x00b0: 7a69 6c6c 612f 352e 3020 2857 696e 646f zilla/5.0.(Windo
0x00c0: 7773 204e 5420 3130 2e30 3b20 5769 6e36 ws.NT.10.0;.Win6
0x00d0: 343b 2078 3634 2920 4170 706c 6557 6562 4;.x64).AppleWeb
0x00e0: 4b69 742f 3533 372e 3336 2028 4b48 544d Kit/537.36.(KHTM
0x00f0: 4c2c 206c 696b 6520 4765 636b 6f29 2043 L,.like.Gecko).C
0x0100: 6872 6f6d 652f 3130 312e 302e 3439 3531 hrome/101.0.4951
0x0110: 2e36 3720 5361 6661 7269 2f35 3337 2e33 .67.Safari/537.3
0x0120: 360d 0a55 7067 7261 6465 3a20 7765 6273 6..Upgrade:.webs
0x0130: 6f63 6b65 740d 0a4f 7269 6769 6e3a 206e ocket..Origin:.n
0x0140: 756c 6c0d 0a53 6563 2d57 6562 536f 636b ull..Sec-WebSock
0x0150: 6574 2d56 6572 7369 6f6e 3a20 3133 0d0a et-Version:.13..
0x0160: 4163 6365 7074 2d45 6e63 6f64 696e 673a Accept-Encoding:
0x0170: 2067 7a69 702c 2064 6566 6c61 7465 0d0a .gzip,.deflate..
0x0180: 4163 6365 7074 2d4c 616e 6775 6167 653a Accept-Language:
0x0190: 207a 682d 434e 2c7a 683b 713d 302e 390d .zh-CN,zh;q=0.9.
0x01a0: 0a53 6563 2d57 6562 536f 636b 6574 2d4b .Sec-WebSocket-K
0x01b0: 6579 3a20 7938 796a 3756 7858 624b 576f ey:.y8yj7VxXbKWo
0x01c0: 4f45 6962 3248 7176 4b51 3d3d 0d0a 5365 OEib2HqvKQ==..Se
0x01d0: 632d 5765 6253 6f63 6b65 742d 4578 7465 c-WebSocket-Exte
0x01e0: 6e73 696f 6e73 3a20 7065 726d 6573 7361 nsions:.permessa
0x01f0: 6765 2d64 6566 6c61 7465 3b20 636c 6965 ge-deflate;.clie
0x0200: 6e74 5f6d 6178 5f77 696e 646f 775f 6269 nt_max_window_bi
0x0210: 7473 0d0a 5365 632d 5765 6253 6f63 6b65 ts..Sec-WebSocke
0x0220: 742d 5072 6f74 6f63 6f6c 3a20 6d71 7474 t-Protocol:.mqtt
0x0230: 0d0a 0d0a ....
18:11:49.247360 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [.], ack 511, win 237, length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0028 653f 4000 4006 ae23 ac10 000c 0e96 .(e?@.@..#......
0x0020: 6cbb 2329 c737 de60 b230 da1f 1119 5010 l.#).7.`.0....P.
0x0030: 00ed 214f 0000 ..!O..
18:11:49.323395 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 1:160, ack 511, win 237, length 159
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 00c7 6540 4000 4006 ad83 ac10 000c 0e96 ..e@@.@.........
0x0020: 6cbb 2329 c737 de60 b230 da1f 1119 5018 l.#).7.`.0....P.
0x0030: 00ed df2c 0000 4854 5450 2f31 2e31 2031 ...,..HTTP/1.1.1
0x0040: 3031 2053 7769 7463 6869 6e67 2050 726f 01.Switching.Pro
0x0050: 746f 636f 6c73 0d0a 5570 6772 6164 653a tocols..Upgrade:
0x0060: 2057 6562 536f 636b 6574 0d0a 436f 6e6e .WebSocket..Conn
0x0070: 6563 7469 6f6e 3a20 5570 6772 6164 650d ection:.Upgrade.
0x0080: 0a53 6563 2d57 6562 536f 636b 6574 2d41 .Sec-WebSocket-A
0x0090: 6363 6570 743a 204a 6845 6544 4534 6468 ccept:.JhEeDE4dh
0x00a0: 6462 2b4b 6a47 4f52 452b 595a 7176 424b db+KjGORE+YZqvBK
0x00b0: 2f34 3d0d 0a53 6563 2d57 6562 536f 636b /4=..Sec-WebSock
0x00c0: 6574 2d50 726f 746f 636f 6c3a 206d 7174 et-Protocol:.mqt
0x00d0: 740d 0a0d 0a t....
18:11:49.392070 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [P.], seq 511:557, ack 160, win 514, length 46
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0056 7b54 4000 7206 6578 0e96 6cbb ac10 .V{T@.r.ex..l...
0x0020: 000c c737 2329 da1f 1119 de60 b2cf 5018 ...7#).....`..P.
0x0030: 0202 7640 0000 82a8 a399 adbd b3bf adb9 ..v@............
0x0040: eec8 f9e9 a75b ad81 a391 ced1 cafc c3c9 .....[..........
0x0050: cafd adb8 c0fa c3d8 d799 a4de c0f7 c8c9 ................
0x0060: 93a0 9d8c ....
18:11:49.392100 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [.], ack 557, win 237, length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0028 6541 4000 4006 ae21 ac10 000c 0e96 .(eA@.@..!......
0x0020: 6cbb 2329 c737 de60 b2cf da1f 1147 5010 l.#).7.`.....GP.
0x0030: 00ed 2082 0000 ......
18:11:49.392352 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 160:166, ack 557, win 237, length 6
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 002e 6542 4000 4006 ae1a ac10 000c 0e96 ..eB@.@.........
0x0020: 6cbb 2329 c737 de60 b2cf da1f 1147 5018 l.#).7.`.....GP.
0x0030: 00ed 7e6d 0000 8204 2002 0000 ..~m........
18:11:49.443110 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 166, win 514, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 7b56 4000 7206 65a4 0e96 6cbb ac10 .({V@.r.e...l...
0x0020: 000c c737 2329 da1f 1147 de60 b2d5 5010 ...7#)...G.`..P.
0x0030: 0202 1f67 0000 ...g..
18:11:49.451571 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [P.], seq 557:577, ack 166, win 514, length 20
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 003c 7b57 4000 7206 658f 0e96 6cbb ac10 .<{W@.r.e...l...
0x0020: 000c c737 2329 da1f 1147 de60 b2d5 5018 ...7#)...G.`..P.
0x0030: 0202 5568 0000 828e 2670 37fd a47c 37fc ..Uh....&p7..|7.
0x0040: 2677 549e 4815 43d2 0571 &wT.H.C..q
18:11:49.451667 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 166:173, ack 577, win 237, length 7
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 002f 6543 4000 4006 ae18 ac10 000c 0e96 ./eC@.@.........
0x0020: 6cbb 2329 c737 de60 b2d5 da1f 115b 5018 l.#).7.`.....[P.
0x0030: 00ed 0d4f 0000 8205 9003 0001 01 ...O.........
18:11:49.543376 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 173, win 514, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 7b59 4000 7206 65a1 0e96 6cbb ac10 .({Y@.r.e...l...
0x0020: 000c c737 2329 da1f 115b de60 b2dc 5010 ...7#)...[.`..P.
0x0030: 0202 1f4c 0000 ...L..
# 點擊Send按鈕之后
18:16:59.642995 IP Browser.50999 > MQTT.etlservicemgr: Flags [P.], seq 677:711, ack 249, win 514, length 34
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 004a 7c2e 4000 7206 64aa 0e96 6cbb ac10 .J|.@.r.d...l...
0x0020: 000c c737 2329 da1f 11bf de60 b328 5018 ...7#).....`.(P.
0x0030: 0202 736b 0000 829c 69c7 926a 59dd 9266 ..sk....i..jY..f
0x0040: 0aa4 fc0f 1de8 e71a 46a6 f009 21a2 fe06 ........F...!...
0x0050: 06e7 c505 1bab f64b .......K
18:16:59.643148 IP MQTT.etlservicemgr > Browser.50999: Flags [P.], seq 249:279, ack 711, win 237, length 30
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0046 654b 4000 4006 adf9 ac10 000c 0e96 .FeK@.@.........
0x0020: 6cbb 2329 c737 de60 b328 da1f 11e1 5018 l.#).7.`.(....P.
0x0030: 00ed ce09 0000 821c 301a 000c 6363 6e65 ........0...ccne
0x0040: 742f 7570 2f61 6263 4865 6c6c 6f20 576f t/up/abcHello.Wo
0x0050: 726c 6421 rld!
18:16:59.743220 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 279, win 514, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 7c30 4000 7206 64ca 0e96 6cbb ac10 .(|0@.r.d...l...
0x0020: 000c c737 2329 da1f 11e1 de60 b346 5010 ...7#).....`.FP.
0x0030: 0202 1e5c 0000 ...\..
# 執行發布消息命令之后
17:56:50.992944 IP MQTT.etlservicemgr > Browser.50939: Flags [P.], seq 205:247, ack 675, win 237, length 42
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0052 e2f9 4000 4006 303f ac10 000c 0e96 .R..@.@.0?......
0x0020: 6cbb 2329 c6fb c3b7 103a 2e40 d9e6 5018 l.#).....:.@..P.
0x0030: 00ed aaae 0000 8228 3026 0006 6363 6e65 .......(0&..ccne
0x0040: 742f 3132 3334 3536 3738 3930 3132 3334 t/12345678901234
0x0050: 3536 3738 3930 3132 3334 3536 3738 3930 5678901234567890
17:56:51.100524 IP 14.150.108.187.50939 > 172.16.0.12.etlservicemgr: Flags [.], ack 247, win 514, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 77db 4000 7206 691f 0e96 6cbb ac10 .(w.@.r.i...l...
0x0020: 000c c6fb 2329 2e40 d9e6 c3b7 1064 5010 ....#).@.....dP.
0x0030: 0202 bffd 0000 ......
可看到基于websockets的通訊整體協作圖如下:
MQTT在websockets下的工作模
可以看到除了在建立websockets時,有較大的http頭信息,其余仍然很精簡。而且websocket已經對Browser上送的AUTH、SUB、PUB進行了加密處理,但是對于下發到Browser的信息并沒有進行加密處理
這里使用了mosquitto-2.0.11,下載源碼后,修改下config.mk文件,將WITH_WEBSOCKETS打開,如下
# Build with websockets support on the broker.
WITH_WEBSOCKETS:=yes
這里是在centos下工作的,如果沒有websockets庫,需要安裝
yum install libwebsockets* -y
然后就是一路的make,make install,之后配置下LD_LIBRARY_PATH的環境變量,就可以開始工作了。
LD_LIBRARY_PATH=:/root/setup/mosquitto-2.0.11/lib:/root/setup/cJSON-1.7.14
配置mosquitto環境,可建立一個專門的目錄,將mosquitto.conf文件拷貝過來修改,目錄結構如下:
<root@VM-0-12-centos:/mqtt>tree
.
|-- aclfile # 訪問控制文件
|-- log # 日志文件目錄
| `-- mosquitto.log
|-- mosquitto # mqtt運行目錄
| |-- mosquitto.db
| `-- pid.pid
|-- mosquitto.conf #配置文件!!!IMPORTANT
`-- pwfile # 用戶的密碼文件
#cat aclfile===========================================================# This affects access control for clients with no username.
topic read $SYS/#
# This only affects clients with username "roger".
user roger
topic foo/bar
# This affects all clients.
pattern write $SYS/broker/connection/%c/state
# Add 20210722
user ccnet
topic read ccnet/#
topic write ccnet/#=========================================================# 創建用戶密碼文件pwfile
touch pwfile
mosquitto_passwd pwfile user
#cat mosquitto.conf |grep -v "#"=========================================================pid_file /home/release/soft/mqtt/mosquitto/pid.pid
user release
port 1883
listener 9001
protocol websockets
bind_interface eth0
persistence true
persistence_location /home/release/soft/mqtt/mosquitto
allow_anonymous false
password_file /home/release/soft/mqtt/pwfile
acl_file /home/release/soft/mqtt/aclfile
運行Mosquitto
mosquitto -c mosquitto.conf -d
MQTT有很多實現,Mosquitto的魅力在于小而美。有很多開源的實現,基于nodejs、java等,也有商業版本的,更有一些云服務商直接將之PaaS化了。
例如,阿里云將MQTT定義為這樣子。一些公司直接基于mqtt搭建其上的物模型,超贊!
阿里云的MQTT服務
要提示:第二節內容中所有操作使用時需要把對應的產品、設備證書等替換為個人對應的數據。為了方便讀者更方便直觀的了解協議報文形成過程,數據沒有隱藏和加密處理,但請勿直接使用文中數據,造成損失將進行法律追究。
MQTT最初是由IBM的Andy Stanford-Clark以及Arcom公司ArlenNipper 為解決石油和天然氣管道傳感器數據傳輸問題而創造,他們通過衛星網絡傳輸輸油管道傳感器數據,當時衛星通信存在費用昂貴、網絡中斷、低帶寬、高延遲等特點,而且傳感器數量很多,MQTT就由此而誕生。近年來,物聯網發展迅猛,傳感器數量爆發式增長,mqtt也被重新拾起。
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基于發布/訂閱(Publish/Subscribe)模式的輕量級通訊協議,該協議構建于TCP/IP協議上,由IBM在1999年發布,目前最新版本為v3.1.1。MQTT最大的優點在于可以以極少的代碼和有限的帶寬,為遠程設備提供實時可靠的消息服務。做為一種低開銷、低帶寬占用的即時通訊協議,MQTT在物聯網、小型設備、移動應用等方面有廣泛的應用。
MQTT是一種基于訂閱和發布模式的通信方式,通過以主題為核心的方式進行數據或信息的傳輸。MQTT協議幾個重要的關鍵詞:
mqtt報文結構
mqtt有發布、訂閱、連接、心跳、請求等報文,但基本結構都類似,報文結構如下:
固定報頭 +可變報頭+有效載荷
報文類型值有以下幾種:
本節中使用阿里云的物聯網平臺,結合一下調試工具來實現通過MQTT的通信等操作。需要具備以下環境:
1、申請注冊阿里云物聯網平臺,并創建產品、設備,參考阿里云操作手冊即可:https://help.aliyun.com/document_detail/30528.html
2、(可選)linux系統環境
3、工具軟件:mqtt.fx、網絡調試助手
阿里云提供全面的物聯網連接服務,可以在其物聯網平臺做一些簡單的入門測試,官方有詳細的操作文檔,不再贅述,參考操作指導:https://help.aliyun.com/document_detail/73708.html
以下為部分測試效果圖:
阿里云物聯網平臺在線調試
客戶端SDK demo程序運行
結合MQTT.fx測試
結合MQTT.fx測試
為接近實際應用,本小節通過網絡調試助手和阿里云平臺來更加詳細的分析MQTT的幾個重要報文。報文內容使用16進制方式展示,需要特別注意一下報文內容有些采用UTF-8編碼,以及剩余長度的編碼格式。
UTF-8 編碼字符對 ASCII 字符的編碼做了優化。每一個字符串都有兩字節的長度字段作為前綴,它標識了這個字符串 UTF-8 編碼的字節數。
剩余長度(Remaining Length)表示當前報文剩余部分的字節數,包括可變報頭和負載的數據。剩余長度 不包括用于編碼剩余長度字段本身的字節數。剩余長度字段使用一個變長度編碼方案,對小于 128 的值它使用單字節編碼。更大的值按下面的方式處理。低 7 位有效位用于編碼數據,最高有效位用于指示是否有更多的字節。因此每個字節可以編碼 128 個數值和一個 延續位( continuation bit ) 。剩余長度字段最大 4 個字節。
得到連接報文的 有效載荷 過程如下: ? 1、代替 ? 客戶端ID:IOT_F1-01|securemode=3,signmethod=hmacsha1| ? 用戶名:IOT_F1-01&gnuyAXcWtth ? 密碼:clientIdIOT_F1-01deviceNameIOT_F1-01productKeygnuyAXcWtth
2、轉16進制
客戶端ID:49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C
用戶名:49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68
密碼(哈希加密):a73628d4a216e570bdd3997767e827ef4ae836e7
密碼:61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37
3、utf-8編碼(加兩個字節的前綴表示長度) ? 客戶端ID:00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C ? 用戶名:00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 ? 密碼:00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37 ? 最終有效載荷: 00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C 00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37
計算得到可變報頭加有效載荷的長度為120(0x78),把固定報頭的剩余長度補充得到最終 連接報文: 10 78 00 04 4D 51 54 54 04 C2 00 64 00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C 00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37
打開軟件調試助手,以TCP CLIENT的模式連接阿里云平臺,其目的IP、端口為阿里云的物聯網平臺連接參數,可以在平臺設備的mqtt連接參數中找到地址和端口參數。把這個地址填到網絡調試助手連接即可。 ? 連接后以hex方式發送上面得到的連接報文內容,可以看到已經收到了服務器返回的信息。
連接確認報文格式:固定報頭+可變報頭 上述測試中,服務器返回20 02 00 00,查詢MQTT協議的連接確認報文可知道服務器接受了連接。 報文格式如下:
1、發布消息 發布消息的報文格式為:固定報頭+可變報頭+有效載荷+響應+動作
主題在這里使用阿里云產品中的物模型topic,也可以使用自定義主題等。
(1)把deviceName替換為設備名稱,得到可變報頭內容為: /sys/gnuyAXcWtth/IOT_F1-01/thing/event/property/post 轉為16進制并左utf-8編碼得到可變報頭為: 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74
轉16進制: 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D
(2)這里使用QoS為0,所以就不包含響應和動作兩個字段,得到 固定報頭+可變報頭+有效載荷 內容: 31 ?? 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D 求得剩余長度為:190(0xBE) 注意剩余長度的表示規則,按規則處理后表示為:0xBE 0x01。
(3)最終得到的發布報文為: 31 BE 01 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D
(4)用網絡助手向服務器發送報文后,可以看到服務器有回復,在阿里云平臺看到了刷新的設置數據。
2、發布確認 報文格式:固定報頭+可變報頭 只有QoS等級為1時才會有發布確認報文,其構成如下,本次測試中暫時不用。
訂閱報文格式:固定報頭+可變報頭+有效載荷+響應
轉為16進制:2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 Mqtt協議要求主題過濾器要采用 UTF-8編碼,所以得到最終內容為: 00 35 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 服務質量要求設置為:QoS0 ,因此取值為 00 得到最終的有效載荷為: 00 35 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 00
最后把得到的報文通過網絡調試助手發送給服務器,得到服務器的返回信息如下:
訂閱確認報文格式:固定報頭+可變報頭+有效載荷 上述測試中,發送訂閱報文后服務器返回 90 03 00 0A 01 根據確認報文格式可知,已經訂閱成功。
提前在該產品下創建好第兩個設備。
1、自定義設備的可訂閱和發布主題
首先看到設備1發布主題,但設備2是接收不到設備1的消息的。
2、設備2訂閱自定義的主題:/gnuyAXcWtth/IOT_F1-02/user/post/info,訂閱成功后可在設備2的界面看到已訂閱主題列表中多了該topic。
3、設備之間需要通信,則需要通過云平臺對消息進行轉發,所以需要在云平臺做流轉配置,根據文檔建立流轉解析器,然后啟動運行解析器即可。參考文檔: https://help.aliyun.com/document_detail/68677.html
4、設備1用MQTT.fx連接平臺,而設備2則用網絡調試助手與平臺建立好連接。用MQTT.fx發布設備1的主題”/gnuyAXcWtth/IOT_F1-02/user/post/info”消息,可以看到設備2已經收到了設備1發布的”/gnuyAXcWtth/IOT_F1-02/user/post/info”消息。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。