MQTT的每個消息都有一個可變頭結構,但這個可變頭的結構標准定可以說定得非常糟糕的....在但在某個消息裏擴展自己的可變頭屬性都會導致無法可以標准的協議兼容。5.0版本引入了一個屬性集來解決用戶擴展定義屬性的問題,雖然標准定義的規範表;但由于屬性集是基于K-V結構,因此自己添加一些標准以外的屬性都可以得到兼容。
參考文檔 中文:vitsumoc.github.io/mqtt-v5-0-chinese.html 英文: docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html編程語言: C#完整代碼: //github.com/beetlex-io/mqtt
屬性表規範
Identifier
Name (usage)
Type
Packet / Will Properties
Dec
Hex
1
0x01
Payload Format Indicator
Byte
PUBLISH, Will Properties
2
0x02
Message Expiry Interval
Four Byte Integer
PUBLISH, Will Properties
3
0x03
Content Type
UTF-8 Encoded String
PUBLISH, Will Properties
8
0x08
Response Topic
UTF-8 Encoded String
PUBLISH, Will Properties
9
0x09
Correlation Data
Binary Data
PUBLISH, Will Properties
11
0x0B
Subscription Identifier
Variable Byte Integer
PUBLISH, SUBSCRIBE
17
0x11
Session Expiry Interval
Four Byte Integer
CONNECT, CONNACK, DISCONNECT
18
0x12
Assigned Client Identifier
UTF-8 Encoded String
CONNACK
19
0x13
Server Keep Alive
Two Byte Integer
CONNACK
21
0x15
Authentication Method
UTF-8 Encoded String
CONNECT, CONNACK, AUTH
22
0x16
Authentication Data
Binary Data
CONNECT, CONNACK, AUTH
23
0x17
Request Problem Information
Byte
CONNECT
24
0x18
Will Delay Interval
Four Byte Integer
Will Properties
25
0x19
Request Response Information
Byte
CONNECT
26
0x1A
Response Information
UTF-8 Encoded String
CONNACK
28
0x1C
Server Reference
UTF-8 Encoded String
CONNACK, DISCONNECT
31
0x1F
Reason String
UTF-8 Encoded String
CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH
33
0x21
Receive Maximum
Two Byte Integer
CONNECT, CONNACK
34
0x22
Topic Alias Maximum
Two Byte Integer
CONNECT, CONNACK
35
0x23
Topic Alias
Two Byte Integer
PUBLISH
36
0x24
Maximum QoS
Byte
CONNACK
37
0x25
Retain Available
Byte
CONNACK
38
0x26
User Property
UTF-8 String Pair
CONNECT, CONNACK, PUBLISH, Will Properties, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, AUTH
39
0x27
Maximum Packet Size
Four Byte Integer
CONNECT, CONNACK
40
0x28
Wildcard Subscription Available
Byte
CONNACK
41
0x29
Subscription Identifier Available
Byte
CONNACK
42
0x2A
Shared Subscription Available
Byte
CONNACK
屬性ID占用一個字節,每個屬性都有專門的類型對應;由于一個字節可以存儲255種情況而表中只有42個,因此有足夠的空間可以讓用戶定義自己的屬性類型。在這裏有個屬性是比較特別的,那就是38的UserProperty,它內部是一個K-V結構,所以在消息傳遞時是可以存在多個UserProperty項的。
屬性結構定義 由于每種屬性的都有著不同的值類型,因此只是簡單地定義這樣一個結構顯然是不可能的;爲了規範這些不同類型屬性的數據轉換需要定義以下接口:
public interface IHeaderProperty { HeaderType Type { get; } void Read(MQTTParse mqttParse, System.IO.Stream stream); void Write(MQTTParse mqttParse, System.IO.Stream stream); } public interface IHeaderPropertyExpend<T> : IHeaderProperty { T Value { get; set; } }可以有同學會問爲什麽不只定義一個接口,如果定義一個泛型接口那這種情況就無法更好使用了
protected List<IHeaderProperty> mWriteProperties = new List<IHeaderProperty>();protected List<?> mWriteProperties = new List<?>();如果接口上要明確泛型那有些集合規範就無法定義了!有了接口的規範就可以實現具體的屬性類型。
public ContentType : IHeaderPropertyExpend<string>{ public string Value { get; set; } public HeaderType Type => HeaderType.ContentType; public void Read(MQTTParse mqttParse, Stream stream){ Value = mqttParse.ReadString(stream); } public void Write(MQTTParse mqttParse, Stream stream){ mqttParse.WriteString(stream, Value); } public static implicit operator ContentType(PropertyStream b) => b.To<ContentType>(HeaderType.ContentType);}public MessageExpiryInterval : IHeaderPropertyExpend<int>{ public int Value { get; set; } public HeaderType Type => HeaderType.MessageExpiryInterval; public void Read(MQTTParse mqttParse, Stream stream){ Value = mqttParse.ReadInt(stream); } public void Write(MQTTParse mqttParse, Stream stream){ mqttParse.WriteInt(stream, Value); } public static implicit operator MessageExpiryInterval(PropertyStream b) => b.To<MessageExpiryInterval>(HeaderType.MessageExpiryInterval);}public UserProperty : IHeaderPropertyExpend<string>{ public string Value { get; set; } public string Name { get; set; } public HeaderType Type => HeaderType.UserProperty; public void Read(MQTTParse mqttParse, Stream stream){ Name = mqttParse.ReadString(stream); Value = mqttParse.ReadString(stream); } public void Write(MQTTParse mqttParse, Stream stream){ mqttParse.WriteString(stream, Name); mqttParse.WriteString(stream, Value); } public static implicit operator UserProperty(PropertyStream b) => b.To<UserProperty>(HeaderType.UserProperty);}每個屬性都重載了一個類型轉換,後面在使用的時候就會方便很多。
PropertyStream 每個消息都帶有一個可變數量的屬性集,該結構也是通過一個可變長度整數來定義其內容。文檔描述如下:屬性長度是一個變長整數。長度的值不包括自己所占用的字節數,其長度是所有屬性占用的字節數量。如果沒有屬性,必須通過一個 0 值的屬性長度來明確表示。有了以上規範就可以實現這個Stream了
public PropertyStream : System.IO.MemoryStream{ private Dictionary<HeaderType, object> mReadProperties = new Dictionary<HeaderType, object>(); public List<UserProperty> UserProperties { get; set; } protected List<IHeaderProperty> mWriteProperties = new List<IHeaderProperty>(); public void Refresh() { mReadProperties.Clear(); mWriteProperties.Clear(); UserProperties = ; SetLength(0); } public T To<T>(HeaderType type) where T : IHeaderProperty { mReadProperties.TryGetValue(type, out var property); return (T)property; } public PropertyStream Add(IHeaderProperty property) { if (property != ) mWriteProperties.Add(property); return this; } public void Read(MQTTParse parse, Stream stream, Action<IHeaderProperty> handler = ) { var len = parse.Int7BitHandler.Read(stream); if (len > 0) { SetLength(0); parse.CopyStream(stream, this, (int)len); this.Position = 0; while (this.Position < this.Length) { HeaderType htype = (HeaderType)this.ReadByte(); var property = HeaderFactory.GetHeader(htype); property.Read(parse, this); handler?.Invoke(property); if (property is UserProperty) { if (UserProperties == ) UserProperties = new List<UserProperty>(); UserProperties.Add((UserProperty)property); } else { mReadProperties.Add(property.Type, property); } } } } public void Write(MQTTParse parse, Stream stream) { foreach (var item in mWriteProperties) { WriteByte((byte)item.Type); item.Write(parse, this); } parse.Int7BitHandler.Write(stream, (int)Length); this.Position = 0; CopyTo(stream); } public static PropertyStream operator +(PropertyStream stream, Tuple<IHeaderProperty, IHeaderProperty, IHeaderProperty> properties) { stream.Add(properties.Item1).Add(properties.Item2).Add(properties.Item3); return stream; } public static PropertyStream operator +(PropertyStream stream, Tuple<IHeaderProperty, IHeaderProperty> properties) { stream.Add(properties.Item1).Add(properties.Item2); return stream; } public static PropertyStream operator +(PropertyStream stream, IHeaderProperty property) => stream.Add(property); public static PropertyStream operator +(PropertyStream stream, IEnumerable<IHeaderProperty> properties) { if (properties != ) foreach (var item in properties) { stream.Add(item); } return stream; } public static implicit operator List<UserProperty>(PropertyStream d) => d.UserProperties;}由于是一個內存的讀寫塊,因此從MemoryStream派生下來;考慮到對象複用實現Refresh方法重置應該對象的一些屬性。同樣這個類也重載了運算符方便操作,有了這些重載代碼編寫就相對簡單很多
protected override void OnRead(MQTTParse parse, Stream stream, ISession session){ base.OnRead(parse, stream, session); SessionFlag = (byte)stream.ReadByte(); Status = (ReturnType)stream.ReadByte(); var ps = GetPropertiesStream(); ps.Read(parse, stream); SessionExpiryInterval = ps; ReceiveMaximum = ps; MaximumQoS = ps; RetainAvailable = ps; AssignedClientIdentifier = ps; TopicAliasMaximum = ps; ReasonString = ps; UserProperties = ps; WildcardSubscriptionAvailable = ps; SubscriptionIdentifierAvailable = ps; SharedSubscriptionAvailable = ps; ServerKeepAlive = ps; ResponseInformation = ps; ServerReference = ps; AuthenticationData = ps;}protected override void OnWrite(MQTTParse parse, Stream stream, ISession session){ base.OnWrite(parse, stream, session); stream.WriteByte(SessionFlag); stream.WriteByte((byte)Status); var ps = GetPropertiesStream() + SessionExpiryInterval + ReceiveMaximum + MaximumQoS + RetainAvailable + AssignedClientIdentifier + TopicAliasMaximum + ReasonString + WildcardSubscriptionAvailable + SubscriptionIdentifierAvailable + SharedSubscriptionAvailable + ServerKeepAlive + ResponseInformation + ServerReference + AuthenticationData + UserProperties; ps.Write(parse, stream);}通過運算符重載就可以讓數據流和對象之間做個自動轉換,後續相關代碼編寫起來也方便多了。 到這裏協議解釋的基礎封裝都完成了,後面的章節就使用這些基礎功能封裝具體的消息了。
BeetleX
開源跨平台通訊框架(支持TLS)
提供HTTP,Websocket,MQTT,Redis,RPC和服務網關開源組件
個人微信:henryfan128 QQ:28304340
關注公衆號
https://github.com/beetlex-io/http://beetlex-io.com