Squeak
  links to this page:    
View this PageEdit this PageUploads to this PageHistory of this PageTop of the SwikiRecent ChangesSearch the SwikiHelp Guide
MQTT client
Last updated at 12:02 am UTC on 17 March 2017
I've written an MQTT (http://mqtt.org) client for Squeak 5.1 that appears to be reasonably robust and usable. It can be found at http://www.squeaksource.com/MQTTClient.html with the MCZ at

 location: 'http://www.squeaksource.com/MQTTClient'
    user: 'tpr'
    password: ''

It's also listed on SqueakMap.

Assuming you have some idea of what MQTT is and what it does (from reading the above link etc) it is fairly simple to test out. Best idea is to inspect
MQTTClient new
and then doit with some of the following -
self connectTo: {some known mqtt broker name or IP as a String} port: {nil or possibly a specific port}.
self onTopic: {an mqtt topic string} do: [:t : m | Transpcript show: 'got topic ', t, ' message ', m; cr].
self unsubscribeFrom: {that topic string}.
self disconnect.


Basics

The basic functionality is to connect to a broker and set up several threads to handle the packets that need to be sent and handled.
The main read thread waits on data coming into a SocketStream connected to the broker; as data arrives the initial bytes tell us what sort of packet it is (take a look at the MQTTPacket hierarchy, starting with MQTTPacket class>>readFrom:) and the following bytes get read to make the packet's structure. Once a packet is complete it gets handled in MQTTClient>handleIncomingPacket:.
The main write thread waits for a shared queue to have a packet in need of transmission and tells each one to #encodeOn: the socketStream.
In order to keep the communications running over a potentially unreliable link we also have to be able retry sending packets if we do not get a suitable confirmation packet in a timely manner, and to send simple ping packets if no important traffic is in progress. This is handled by a single thread that occasionally looks for any pending jobs that represent matters such as an expected reply to a publish packet, and also the 'fake' pending job for the occasional ping check. Since all these threads spend most of their time waiting on sockets, shared queues or timers, they collaboratively multitask quite nicely with the main Squeak UI threads and the client can run without making much impact on the rest of the system - depending somewhat on the work triggers by any subscription, of course.

We can handle any of the three specified levels of QOS for all expected users of QOS. When a packet is published with QOS 2, for example, we also create a pending job to handle the expected PUBREC packet. That in turn would handle the incoming PUBREC by send out a PUBREL and adding a pending job for the final PUBCOMP packet.


Connecting

The connection can handle the will message - requiring the topic, message, retain flag and QOS data - as well as the username/password pair, the clean session flag, the keep alive time and a client ID that can be left to default or specific to your application. The simplest message to make a connection is the above demonstrated #connectTo: {broker name} port: {nil or port number}. For most of the useful API, see there MQTTClient 'public api' protocol.

A connection can be cleanly closed with #disconnect.

Subscribing

Topic names are parsed to make sure they meet the requirements of the MQTT spec (http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106 before the subscription is requested. The #onTopic:do: and #onTopic:qos:do: methods require a block that takes two arguments, the incoming topic string and the incoming message data, a byte array. Note that the incoming data may be any kind of binary data of pretty much any size up to around 260Mb, though that should be an extreme case and would likely crush your system. Any incoming strings would be byte encode UTF-8 and need converting with 'data asString utf8ToSqueak'; numbers can generally be handled with 'Number readFrom: data asString utf8ToSqueak ifFail:[explode]'. Note that the blocks provided for each subscription are evaluated in the context of the packet reading process. Note also that whilst wildcard topics are permitted, we do not handle multiple topics within a single subscribe.
A topic can be unsubscribed with MQTTClient>>unsubscribeFrom: which will handle any matching current subscribed topics. It is entirely possible to misuse multiple subscriptions with assorted wildcards mixed with unsubscribes to get the system into a real tangle. My advice is not to do that.

Publishing

Publishing is very simple - MQTTClient>>publishTopic: a topic string message: message data qos: 0|1|2. The message can be a String or ByteArray with encoded data, or any object that can be converted to a ByteArray by sending it #asByteArray. The topic string will be converted to utf8 as it gets transmitted.

Keep Alive

One of the parameters that can be set at connect time is the 'keep alive' interval. This is a way of checking if the socket link is still ok at regular intervals for hardware setups that may be subjected to wifi or wire failures. The default value of 0 tells the broker to not worry and be happy, and assume all is always well. When there is reason to want to be more careful we can set the interval with #keepAliveTime: and pass in a number of seconds. The expectation is that the broker will close the connection as cleanly as possible if it does not see some traffic at least every ~1.5x that time. We implement this by sending a PINGREQ packet and awaiting the PINGRESP. There is no current clean handling of a failure.

Packet IDs

The requirement is that packet IDs are unique during the time they are in use, so a simple counter is not adequate. Each time a new ID is requested we check the potential next value of a simple counter against all currently queued pending jobs, each of which has a previously allocated ID. Unless we end up with more than 65534 active packets we will find a valid free value. It should be noted that at least one of the example 'paho mqtt' implementations fails this test.

Currently known implementation limits

Error handling is still inadequate. A problem with anything socket like is the astonishing number of things that can go wrong and the subsequent decision about handling them. For example , merely attempting to make the basic SocketStream connection may cause SocketPrimitieFailed, NoNetworkError, NameLookupFailure, ConnectionTimedOut or ConnectionRefused errors. At any point the socket may close, during a read or write it may time out. We try to make a new connection if the broker drops the link but as yet it can hardly be claimed to be bullet proof. Advice and assistance is welcomed.
No testing is done to check the SUBACK packets and the return codes beyond simple pass/fail.