MQTT協議代碼實現詳解二

程序員有二十年 2024-05-13 19:39:20

MQTT的每個消息都有一個可變頭結構,但這個可變頭的結構標准定可以說定得非常糟糕的....在但在某個消息裏擴展自己的可變頭屬性都會導致無法可以標准的協議兼容。5.0版本引入了一個屬性集來解決用戶擴展定義屬性的問題,雖然標准定義的規範表;但由于屬性集是基于K-V結構,因此自己添加一些標准以外的屬性都可以得到兼容。

參考文檔 中文:vitsumoc.github.io/mqtt-v5-0-chinese.html 英文: docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html編程語言: C#完整代碼: //github.com/beetlex-io/mqtt

屬性表規範

Identifier

Name (usage)

Type

Packet / Will Properties

Dec

Hex

1

0x01

Payload Format Indicator

Byte

PUBLISH, Will Properties

2

0x02

Message Expiry Interval

Four Byte Integer

PUBLISH, Will Properties

3

0x03

Content Type

UTF-8 Encoded String

PUBLISH, Will Properties

8

0x08

Response Topic

UTF-8 Encoded String

PUBLISH, Will Properties

9

0x09

Correlation Data

Binary Data

PUBLISH, Will Properties

11

0x0B

Subscription Identifier

Variable Byte Integer

PUBLISH, SUBSCRIBE

17

0x11

Session Expiry Interval

Four Byte Integer

CONNECT, CONNACK, DISCONNECT

18

0x12

Assigned Client Identifier

UTF-8 Encoded String

CONNACK

19

0x13

Server Keep Alive

Two Byte Integer

CONNACK

21

0x15

Authentication Method

UTF-8 Encoded String

CONNECT, CONNACK, AUTH

22

0x16

Authentication Data

Binary Data

CONNECT, CONNACK, AUTH

23

0x17

Request Problem Information

Byte

CONNECT

24

0x18

Will Delay Interval

Four Byte Integer

Will Properties

25

0x19

Request Response Information

Byte

CONNECT

26

0x1A

Response Information

UTF-8 Encoded String

CONNACK

28

0x1C

Server Reference

UTF-8 Encoded String

CONNACK, DISCONNECT

31

0x1F

Reason String

UTF-8 Encoded String

CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH

33

0x21

Receive Maximum

Two Byte Integer

CONNECT, CONNACK

34

0x22

Topic Alias Maximum

Two Byte Integer

CONNECT, CONNACK

35

0x23

Topic Alias

Two Byte Integer

PUBLISH

36

0x24

Maximum QoS

Byte

CONNACK

37

0x25

Retain Available

Byte

CONNACK

38

0x26

User Property

UTF-8 String Pair

CONNECT, CONNACK, PUBLISH, Will Properties, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, AUTH

39

0x27

Maximum Packet Size

Four Byte Integer

CONNECT, CONNACK

40

0x28

Wildcard Subscription Available

Byte

CONNACK

41

0x29

Subscription Identifier Available

Byte

CONNACK

42

0x2A

Shared Subscription Available

Byte

CONNACK

屬性ID占用一個字節,每個屬性都有專門的類型對應;由于一個字節可以存儲255種情況而表中只有42個,因此有足夠的空間可以讓用戶定義自己的屬性類型。在這裏有個屬性是比較特別的,那就是38的UserProperty,它內部是一個K-V結構,所以在消息傳遞時是可以存在多個UserProperty項的。

屬性結構定義 由于每種屬性的都有著不同的值類型,因此只是簡單地定義這樣一個結構顯然是不可能的;爲了規範這些不同類型屬性的數據轉換需要定義以下接口:

public interface IHeaderProperty { HeaderType Type { get; } void Read(MQTTParse mqttParse, System.IO.Stream stream); void Write(MQTTParse mqttParse, System.IO.Stream stream); } public interface IHeaderPropertyExpend<T> : IHeaderProperty { T Value { get; set; } }

可以有同學會問爲什麽不只定義一個接口,如果定義一個泛型接口那這種情況就無法更好使用了

protected List<IHeaderProperty> mWriteProperties = new List<IHeaderProperty>();protected List<?> mWriteProperties = new List<?>();

如果接口上要明確泛型那有些集合規範就無法定義了!有了接口的規範就可以實現具體的屬性類型。

public ContentType : IHeaderPropertyExpend<string>{ public string Value { get; set; } public HeaderType Type => HeaderType.ContentType; public void Read(MQTTParse mqttParse, Stream stream){ Value = mqttParse.ReadString(stream); } public void Write(MQTTParse mqttParse, Stream stream){ mqttParse.WriteString(stream, Value); } public static implicit operator ContentType(PropertyStream b) => b.To<ContentType>(HeaderType.ContentType);}public MessageExpiryInterval : IHeaderPropertyExpend<int>{ public int Value { get; set; } public HeaderType Type => HeaderType.MessageExpiryInterval; public void Read(MQTTParse mqttParse, Stream stream){ Value = mqttParse.ReadInt(stream); } public void Write(MQTTParse mqttParse, Stream stream){ mqttParse.WriteInt(stream, Value); } public static implicit operator MessageExpiryInterval(PropertyStream b) => b.To<MessageExpiryInterval>(HeaderType.MessageExpiryInterval);}public UserProperty : IHeaderPropertyExpend<string>{ public string Value { get; set; } public string Name { get; set; } public HeaderType Type => HeaderType.UserProperty; public void Read(MQTTParse mqttParse, Stream stream){ Name = mqttParse.ReadString(stream); Value = mqttParse.ReadString(stream); } public void Write(MQTTParse mqttParse, Stream stream){ mqttParse.WriteString(stream, Name); mqttParse.WriteString(stream, Value); } public static implicit operator UserProperty(PropertyStream b) => b.To<UserProperty>(HeaderType.UserProperty);}

每個屬性都重載了一個類型轉換,後面在使用的時候就會方便很多。

PropertyStream 每個消息都帶有一個可變數量的屬性集,該結構也是通過一個可變長度整數來定義其內容。文檔描述如下:屬性長度是一個變長整數。長度的值不包括自己所占用的字節數,其長度是所有屬性占用的字節數量。如果沒有屬性,必須通過一個 0 值的屬性長度來明確表示。有了以上規範就可以實現這個Stream了

public PropertyStream : System.IO.MemoryStream{ private Dictionary<HeaderType, object> mReadProperties = new Dictionary<HeaderType, object>(); public List<UserProperty> UserProperties { get; set; } protected List<IHeaderProperty> mWriteProperties = new List<IHeaderProperty>(); public void Refresh() { mReadProperties.Clear(); mWriteProperties.Clear(); UserProperties = ; SetLength(0); } public T To<T>(HeaderType type) where T : IHeaderProperty { mReadProperties.TryGetValue(type, out var property); return (T)property; } public PropertyStream Add(IHeaderProperty property) { if (property != ) mWriteProperties.Add(property); return this; } public void Read(MQTTParse parse, Stream stream, Action<IHeaderProperty> handler = ) { var len = parse.Int7BitHandler.Read(stream); if (len > 0) { SetLength(0); parse.CopyStream(stream, this, (int)len); this.Position = 0; while (this.Position < this.Length) { HeaderType htype = (HeaderType)this.ReadByte(); var property = HeaderFactory.GetHeader(htype); property.Read(parse, this); handler?.Invoke(property); if (property is UserProperty) { if (UserProperties == ) UserProperties = new List<UserProperty>(); UserProperties.Add((UserProperty)property); } else { mReadProperties.Add(property.Type, property); } } } } public void Write(MQTTParse parse, Stream stream) { foreach (var item in mWriteProperties) { WriteByte((byte)item.Type); item.Write(parse, this); } parse.Int7BitHandler.Write(stream, (int)Length); this.Position = 0; CopyTo(stream); } public static PropertyStream operator +(PropertyStream stream, Tuple<IHeaderProperty, IHeaderProperty, IHeaderProperty> properties) { stream.Add(properties.Item1).Add(properties.Item2).Add(properties.Item3); return stream; } public static PropertyStream operator +(PropertyStream stream, Tuple<IHeaderProperty, IHeaderProperty> properties) { stream.Add(properties.Item1).Add(properties.Item2); return stream; } public static PropertyStream operator +(PropertyStream stream, IHeaderProperty property) => stream.Add(property); public static PropertyStream operator +(PropertyStream stream, IEnumerable<IHeaderProperty> properties) { if (properties != ) foreach (var item in properties) { stream.Add(item); } return stream; } public static implicit operator List<UserProperty>(PropertyStream d) => d.UserProperties;}

由于是一個內存的讀寫塊,因此從MemoryStream派生下來;考慮到對象複用實現Refresh方法重置應該對象的一些屬性。同樣這個類也重載了運算符方便操作,有了這些重載代碼編寫就相對簡單很多

protected override void OnRead(MQTTParse parse, Stream stream, ISession session){ base.OnRead(parse, stream, session); SessionFlag = (byte)stream.ReadByte(); Status = (ReturnType)stream.ReadByte(); var ps = GetPropertiesStream(); ps.Read(parse, stream); SessionExpiryInterval = ps; ReceiveMaximum = ps; MaximumQoS = ps; RetainAvailable = ps; AssignedClientIdentifier = ps; TopicAliasMaximum = ps; ReasonString = ps; UserProperties = ps; WildcardSubscriptionAvailable = ps; SubscriptionIdentifierAvailable = ps; SharedSubscriptionAvailable = ps; ServerKeepAlive = ps; ResponseInformation = ps; ServerReference = ps; AuthenticationData = ps;}protected override void OnWrite(MQTTParse parse, Stream stream, ISession session){ base.OnWrite(parse, stream, session); stream.WriteByte(SessionFlag); stream.WriteByte((byte)Status); var ps = GetPropertiesStream() + SessionExpiryInterval + ReceiveMaximum + MaximumQoS + RetainAvailable + AssignedClientIdentifier + TopicAliasMaximum + ReasonString + WildcardSubscriptionAvailable + SubscriptionIdentifierAvailable + SharedSubscriptionAvailable + ServerKeepAlive + ResponseInformation + ServerReference + AuthenticationData + UserProperties; ps.Write(parse, stream);}

通過運算符重載就可以讓數據流和對象之間做個自動轉換,後續相關代碼編寫起來也方便多了。 到這裏協議解釋的基礎封裝都完成了,後面的章節就使用這些基礎功能封裝具體的消息了。

BeetleX

開源跨平台通訊框架(支持TLS)

提供HTTP,Websocket,MQTT,Redis,RPC和服務網關開源組件

個人微信:henryfan128 QQ:28304340

關注公衆號

https://github.com/beetlex-io/http://beetlex-io.com

0 阅读:0

程序員有二十年

簡介:感謝大家的關注