MQTT-SN Client (emCute)

emCute, the MQTT-SN implementation for RIOT

About

emCute is the implementation of the OASIS MQTT-SN protocol for RIOT. It is designed with a focus on small memory footprint and usability.

Design Decisions and Restrictions

  • emCute is designed to run on top of UDP only, making use of UDP sock API. The design is not intended to be used with any other transport.

The implementation is based on a 2-thread model: emCute needs one thread of its own, in which receiving of packets and sending of ping messages are handled. All ‘user space functions’ have to run from (a) different (i.e. user) thread(s). emCute uses thread flags to synchronize between threads.

Further know restrictions are:

  • ASCII topic names only (no support for UTF8 names, yet)
  • topic length is restricted to fit in a single length byte (248 byte max)
  • no support for wildcards in topic names. This feature requires more elaborate internal memory management, supposedly at the cost of quite increased ROM and RAM usage
  • no retransmit when receiving a REJ_CONG (reject, reason congestion). when getting a REJ_CONG (reject, reason congestion), the spec tells us to resend the original message after T_WAIT (default: >5min). This is not supported, as this would require to block to calling thread (or keep state) for long periods of time and is (in Hauke’s opinion) not feasible for constrained nodes.

Error Handling

This implementation tries minimize parameter checks to a minimum, checking as many parameters as feasible using assertions. For the sake of run-time stability and usability, typical overflow checks are always done during run- time and explicit error values returned in case of errors.

Implementation state

In the current state, emCute supports:

  • connecting to a gateway
  • disconnecting from gateway
  • registering a last will topic and message during connection setup
  • registering topic names with the gateway (obtaining topic IDs)
  • subscribing to topics
  • unsubscribing from topics
  • updating will topic
  • updating will message
  • sending out periodic PINGREQ messages
  • handling re-transmits

The following features are however still missing (but planned):

Gateway discovery (so far there is no support for handling ADVERTISE, GWINFO, and SEARCHGW). Open question to answer here: how to put / how to encode the IPv(4/6) address AND the port of a gateway in the GwAdd field of the GWINFO message

QOS level 2

put the node to sleep (send DISCONNECT with duration field set)

handle DISCONNECT messages initiated by the broker/gateway

support for pre-defined and short topic IDs

handle (previously) active subscriptions on reconnect/disconnect

handle re-connect/disconnect from unresponsive gateway (in case a number of ping requests are unanswered)

react only to incoming ping requests that are actually send by the gateway we are connected to

enum @184
EMCUTE_DUP = 0x80
duplicate flag
EMCUTE_QOS_MASK = 0x60
QoS level mask.
EMCUTE_QOS_2 = 0x40
QoS level 2.
EMCUTE_QOS_1 = 0x20
QoS level 1.
EMCUTE_QOS_0 = 0x00
QoS level 0.
EMCUTE_RETAIN = 0x10
retain flag
EMCUTE_WILL = 0x08
will flag, used during CONNECT
EMCUTE_CS = 0x04
clean session flag
EMCUTE_TIT_MASK = 0x03
topic ID type mask
EMCUTE_TIT_SHORT = 0x02
topic ID: short
EMCUTE_TIT_PREDEF = 0x01
topic ID: pre-defined
EMCUTE_TIT_NORMAL = 0x00
topic ID: normal
enum @185
EMCUTE_OK =  0
everything went as expect
EMCUTE_NOGW = -1
error: not connected to a gateway
EMCUTE_REJECT = -2
error: operation was rejected by broker
EMCUTE_OVERFLOW = -3
error: ran out of buffer space
EMCUTE_TIMEOUT = -4
error: timeout
EMCUTE_NOTSUP = -5
error: feature not supported
void(* emcute_cb_t()

Signature for callbacks fired when publish messages are received.

Parameters

topic:topic the received data was published on
data:published data, can be NULL
len:length of data in bytes

struct emcute_sub emcute_sub_t

Data-structure for keeping track of topics we register to.

int emcute_con(sock/udp.h::sock_udp_ep_t * remote, bool clean, const char * will_topic, const void * will_msg, msp430_types.h::size_t will_msg_len, unsigned flags)

Connect to a given MQTT-SN gateway (CONNECT)

When called while already connected to a gateway, call emcute.h::emcute_discon() first to disconnect from the current gateway.

Parameters

remote:address and port of the target MQTT-SN gateway
clean:set to true to start a clean session
will_topic:last will topic name, no last will will be configured if set to NULL
will_msg:last will message content, will be ignored if will_topic is set to NULL
will_msg_len:length of will_msg in byte
flags:flags used for the last will, allowed are retain and QoS

Return values

  • EMCUTE_OK on success
  • EMCUTE_NOGW if already connected to a gateway
  • EMCUTE_REJECT on connection refused by gateway
  • EMCUTE_TIMEOUT on connection timeout
int emcute_discon(void)

Disconnect from the gateway we are currently connected to.

Return values

  • EMCUTE_OK on success
  • EMCUTE_GW if not connected to a gateway
  • EMCUTE_TIMEOUT on response timeout
int emcute_reg(emcute_topic_t * topic)

Get a topic ID for the given topic name from the gateway.

Parameters

topic:topic to register, topic.name must not be NULL

Return values

  • EMCUTE_OK on success
  • EMCUTE_NOGW if not connected to a gateway
  • EMCUTE_OVERFLOW if length of topic name exceeds emcute.h::EMCUTE_TOPIC_MAXLEN
  • EMCUTE_TIMEOUT on connection timeout
int emcute_pub(emcute_topic_t * topic, const void * buf, msp430_types.h::size_t len, unsigned flags)

Publish data on the given topic.

Parameters

topic:topic to send data to, topic must be registered (topic.id must populated).
buf:data to publish
len:length of data in bytes
flags:flags used for publication, allowed are QoS and retain

Return values

  • EMCUTE_OK on success
  • EMCUTE_NOGW if not connected to a gateway
  • EMCUTE_REJECT if publish message was rejected (QoS > 0 only)
  • EMCUTE_OVERFLOW if length of data exceeds emcute.h::EMCUTE_BUFSIZE
  • EMCUTE_TIMEOUT on connection timeout (QoS > 0 only)
  • EMCUTE_NOTSUP on unsupported flag values
int emcute_sub(emcute.h::emcute_sub_t * sub, unsigned flags)

Subscribe to the given topic.

When calling this function, sub->topic.name and sub->cb must be set.

Parameters

sub:subscription context, sub->topic.name and sub->cb must not be NULL.
flags:flags used when subscribing, allowed are QoS, DUP, and topic ID type

Return values

  • EMCUTE_OK on success
  • EMCUTE_NOGW if not connected to a gateway
  • EMCUTE_OVERFLOW if length of topic name exceeds emcute.h::EMCUTE_TOPIC_MAXLEN
  • EMCUTE_TIMEOUT on connection timeout
int emcute_unsub(emcute.h::emcute_sub_t * sub)

Unsubscripbe the given topic.

Parameters

sub:subscription context

Return values

  • EMCUTE_OK on success
  • EMCUTE_NOGW if not connected to a gateway
  • EMCUTE_TIMEOUT on connection timeout
int emcute_willupd_topic(const char * topic, unsigned flags)

Update the last will topic.

Parameters

topic:new last will topic
flags:flags used for the topic, allowed are QoS and retain

Return values

  • EMCUTE_OK on success
  • EMCUTE_NOGW if not connected to a gateway
  • EMCUTE_OVERFLOW if length of topic name exceeds emcute.h::EMCUTE_TOPIC_MAXLEN
  • EMCUTE_REJECT on rejection by the gateway
  • EMCUTE_TIMEOUT on response timeout
int emcute_willupd_msg(const void * data, msp430_types.h::size_t len)

Update the last will message.

Parameters

data:new message to send on last will
len:length of data in bytes

Return values

  • EMCUTE_OK on success
  • EMCUTE_NOGW if not connected to a gateway
  • EMCUTE_OVERFLOW if length of the given message exceeds emcute.h::EMCUTE_BUFSIZE
  • EMCUTE_REJECT on rejection by the gateway
  • EMCUTE_TIMEOUT on response timeout
void emcute_run(uint16_t port, const char * id)

Run emCute, will ‘occupy’ the calling thread.

This function will run the emCute message receiver. It will block the thread it is running in.

Parameters

port:UDP port used for listening (default: 1883)
id:client ID (should be unique)

const char * emcute_type_str(uint8_t type)

Return the string representation of the given type value.

This function is for debugging purposes.

Parameters

type:MQTT-SN message type

Return values

  • string representation of the given type
  • ‘UNKNOWN’ on invalid type value
EMCUTE_DEFAULT_PORT

Default UDP port to listen on (also used as SRC port)

1
(1883U)
EMCUTE_BUFSIZE

Buffer size used for emCute’s transmit and receive buffers.

1
(512U)

Note

The buffer size MUST be less than 32768 on 16-bit and 8-bit platforms to prevent buffer overflows.

The overall buffer size used by emCute is this value time two (Rx + Tx).

EMCUTE_ID_MAXLEN

Maximum client ID length.

1
(196U)

Note

Must be less than (256 - 6) AND less than (emcute.h::EMCUTE_BUFSIZE - 6).

EMCUTE_TOPIC_MAXLEN

Maximum topic length.

1
(196U)

Note

Must be less than (256 - 6) AND less than (emcute.h::EMCUTE_BUFSIZE - 6).

EMCUTE_KEEPALIVE

Keep-alive interval [in s].

1
(360)       /* -> 6 min*/

The node will communicate this interval to the gateway send a ping message every time when this amount of time has passed.

For the default value, see spec v1.2, section 7.2 -> T_WAIT: > 5 min

EMCUTE_T_RETRY

Re-send interval [in seconds].

1
(15U)       /* -> 15 sec */

For the default value, see spec v1.2, section 7.2 -> T_RETRY: 10 to 15 sec

EMCUTE_N_RETRY

Number of retries when sending packets.

1
(3U)

For the default value, see spec v1.2, section 7.2 -> N_RETRY: 3-5

struct emcute_topic_t

MQTT-SN topic.

const char * name

topic string (currently ACSII only)

uint16_t id

topic id, as assigned by the gateway

struct emcute_sub

Data-structure for keeping track of topics we register to.

struct emcute_sub * next

next subscription (saved in a list)

emcute_topic_t topic

topic we subscribe to

emcute.h::emcute_cb_t cb

function called when receiving messages

void * arg

optional custom argument