Squeak
  links to this page:    
View this PageEdit this PageUploads to this PageHistory of this PageTop of the SwikiRecent ChangesSearch the SwikiHelp Guide
Some implementation notes for MQTT
Last updated at 5:01 pm UTC on 26 September 2023
Herewith some not especially coherent notes from the implemenetation phase of the MQTT client .

Socket listenOn: aPortNumber backlog: somenumberwithdefault8 interface: anIPnumberOr0forAny seems to be the key low-level. See Webserver>listenOn:interface:backlogSize:
MQTT defaults to port 1883, so we would probably use
listenerSocket := Socket newTCP.
listenerSocket listenOn: 1883 backlog: 8
and allow default to any IP.
Listening needs a process to spawn new sockets for connections. See Webserver>startListener. We use listenerSocket waitForAccept: sometimetowait to hang around and wait for a connection. Each connection makes a new socket and uses asyncHandleConnectionFrom: to run a process that deals with input from that socket.
#handleConnectionFrom: actually reads the incoming stuff and makes a WebRequest to parse it. We’d need a bit more cleverness here I tihnk, hopefully making a proper subclass of request to match the MQTT packet types. #dispatchRequest then sorts out which type of html request it was and what to do.

Looking at the C implementation (which is completely insane) for how the message-ID stuff is handled; the specs seems to require keeping track of the used IDs and making sure not to use one currently in use. C code does really wild linked list stuff, which might be an ok way to keep the ‘active’ packets and thereby IDs. It also tracks the last used value as a starting place for scanning, a bit like a hash in message lookups. Interestingly the Python version simply uses a counter and wraps at 65535. Is that allowable?

Packet taxonomy.

Some packets are fixed header only
Some are fixed + variable header
Some are headers + payload connect

All the basic packets are easy enough to implement and work at least well enough in some UnitTests. The harder part is the listening for Publish packets after a subscribe. Even getting the acknowledgement packet for a subscribe can be tricky if there is a retained message queued up since it may well arrive before the suback. So, some asynchronous listening to the socket is needed and queueing of packets received, along with a way to track pending acknowledgements and multi-part sequences like QOS>0 publishing.

KeepAlive requires an asynch loop that can send a PingReq every now and then (maybe a bit less than the set keepalive time?) which in turn means we need to have away to prevent concurrent (ab)use of the outbound socket. We don’t need this if keepalive =0 since that should make the server assume the connection is reliable enough.

Connection

Open a SocketStream, send a connect packet, read back a connack packet and potentially take action on the sessionPresent flag or any error code.
Currently able to
set the will topic & message, whether to retain it and the relevant QOS.
set username and/or password
set cleanSession flag
set clientID - this is meant to be a unique identifier but it doesn’t seem to be used in any interesting way outside a possible re-connect following a crash of some sort. If you want to be able to continue from a prior state then the same clientid needs to be used and the cleanSession flag set false.

Following sending a connect packet we wait for a connack (not strictly required {3.1.4-5} but that seems pointless) and we should check the error codes and make sure the clean session code is acceptable to the client (3.2.2.2)

Looping

Once the connection is established we should

Keep alive loop

If the keepAliveTime is 0 we do not run this loop and assume that the server will just keep going. A reconnect attempt would be used should the network disconnect, with this being a likely case for using a cleanSession=false connect packet.
In general the loop will wait for keepAliveTime seconds (NOT milliseconds), send a PINGREQ and repeat. We should set a timer to see if a PINGRESP is returned within a ’sensible time’ - say 25% of keepAliveTime. If no such response arrives we are supposed to close the connection and possibly retry.
Since the timer may fire on the loop at any point in other code sending a packet we will need a semaphore of some form to hold off the ‘intruding’ packet write.

Read loop

This will simply loop forever reading packets from the incoming socket and adding them to a sharedqueue.

Subscribing

The interesting bit here is that subscribing is not synchronous - the SUBACK packet is not at all guaranteed to be the first packet received after a SUBSCRIBE is sent. We need to keep a record of a subscribe being sent and for any received suback we check which one it matches. We need to handle failed subscribes too.

Pending actions

Seems like we need to be able to record incomplete actions

Publishing

A bit more complex than most actions. We can both publish data to the broker and receive data published by the broker.
The multiple QOS levels add some fun.

Sending

Both qos1/2 imply we need to keep the original packet somewhere until the publish is completed. Best place is the pending job object(s)

Receiving

The PUBREL pending job sends a PUBCOMP, frees the packetID & pending job. There is a small complication in that a repeat PUBREL may be received and so we cannot expect to always find a pending job for a PUBREL of a certain msgID. Simialrly we have to not try to complete a job that has been completed

Resending unacknowledged packeets

We need a roughly timed process to resend packets that have not been acknowledged promptly. The Python-paho client uses a default of 20 secs for this but seems to allow user setting too.
To resend a PUBLISH packet we must set DUP=1 and add it to the out-queue but not add a new pending job. Since the pending jobs have most of the details perhaps it is good to make use of them? Scan the list for jobs past their retry time and retry any hits. Also used this to replace the original keepalive process and unify things a little.

Passing data to user

Now we can subscribe we need to pass the received data back to the user. In general I think we want to subscribe to a topic and have a block evaluated with any data that comes in for that topic.


Reading data

#initializePacketReading makes a process that needs to wait on incoming data. Currently it relies upon socketStream>>peek to deal with that. Is this proper? Looks like it should work as long as the socket is connected, which can go wrong at times.