public class MqttClient extends Object implements IMqttClient
This class implements the blocking IMqttClient
client interface where all actions block until they have
completed (or timed out). This implementation is compatible with all Java SE runtimes from 1.4.2 and up.
An application can connect to an MQTT server using:
To enable messages to be delivered even across network and client restarts messages need to be safely stored until the message has been delivered at the requested quality of service. A pluggable persistence mechanism is provided to store the messages.
If connecting with MqttConnectOptions.setCleanSession(boolean)
set to true it is safe to use memory
persistence as all state it cleared when a client disconnects. If connecting with cleanSession set to false, to
provide reliable message delivery then a persistent message store should be used such as the default one.
The message store interface is pluggable. Different stores can be used by implementing the
MqttClientPersistence
interface and passing it to the clients constructor.
IMqttClient
Modifier and Type | Field and Description |
---|---|
protected MqttAsyncClient |
aClient |
protected long |
timeToWait |
Constructor and Description |
---|
MqttClient(String serverURI,
String clientId)
Create an MqttClient that can be used to communicate with an MQTT server.
|
MqttClient(String serverURI,
String clientId,
MqttClientPersistence persistence)
Create an MqttClient that can be used to communicate with an MQTT server.
|
Modifier and Type | Method and Description |
---|---|
void |
close()
Close the client Releases all resource associated with the client.
|
void |
connect()
Connects to an MQTT server using the default options.
|
void |
connect(MqttConnectOptions options)
Connects to an MQTT server using the specified options.
|
IMqttToken |
connectWithResult(MqttConnectOptions options)
Connects to an MQTT server using the specified options.
|
void |
disconnect()
Disconnects from the server.
|
void |
disconnect(long quiesceTimeout)
Disconnects from the server.
|
void |
disconnectForcibly()
Disconnects from the server forcibly to reset all the states.
|
void |
disconnectForcibly(long disconnectTimeout)
Disconnects from the server forcibly to reset all the states.
|
void |
disconnectForcibly(long quiesceTimeout,
long disconnectTimeout)
Disconnects from the server forcibly to reset all the states.
|
static String |
generateClientId()
Returns a randomly generated client identifier based on the current user's login name and the system time.
|
String |
getClientId()
Returns the client ID used by this client.
|
Debug |
getDebug()
Return a debug object that can be used to help solve problems.
|
IMqttDeliveryToken[] |
getPendingDeliveryTokens()
Returns the delivery tokens for any outstanding publish operations.
|
String |
getServerURI()
Returns the address of the server used by this client, as a URI.
|
long |
getTimeToWait()
Return the maximum time to wait for an action to complete.
|
MqttTopic |
getTopic(String topic)
Get a topic object which can be used to publish messages.
|
boolean |
isConnected()
Determines if this client is currently connected to the server.
|
void |
publish(String topic,
byte[] payload,
int qos,
boolean retained)
Publishes a message to a topic on the server and return once it is delivered.
|
void |
publish(String topic,
MqttMessage message)
Publishes a message to a topic on the server.
|
void |
setCallback(MqttCallback callback)
Sets the callback listener to use for events that happen asynchronously.
|
void |
setTimeToWait(long timeToWaitInMillis)
Set the maximum time to wait for an action to complete.
|
void |
subscribe(String topicFilter)
Subscribe to a topic, which may include wildcards using a QoS of 1.
|
void |
subscribe(String[] topicFilters)
Subscribes to a one or more topics, which may include wildcards using a QoS of 1.
|
void |
subscribe(String[] topicFilters,
int[] qos)
Subscribes to multiple topics, each of which may include wildcards.
|
void |
subscribe(String topicFilter,
int qos)
Subscribe to a topic, which may include wildcards.
|
void |
unsubscribe(String topicFilter)
Requests the server unsubscribe the client from a topic.
|
void |
unsubscribe(String[] topicFilters)
Requests the server unsubscribe the client from one or more topics.
|
@Nullable protected MqttAsyncClient aClient
protected long timeToWait
public MqttClient(String serverURI, String clientId) throws MqttException
The address of a server can be specified on the constructor. Alternatively a list containing one or more servers
can be specified using the setServerURIs
method on
MqttConnectOptions.
The serverURI
parameter is typically used with the the clientId
parameter to form a
key. The key is used to store and reference messages while they are being delivered. Hence the serverURI
specified on the constructor must still be specified even if a list of servers is specified on an
MqttConnectOptions object. The serverURI on the constructor must remain the same across restarts of the client
for delivery of messages to be maintained from a given client to a given server or set of servers.
The address of the server to connect to is specified as a URI. Two types of connection are supported
tcp://
for a TCP connection and ssl://
for a TCP connection secured by SSL/TLS. For
example:
tcp://localhost:1883
ssl://localhost:8883
tcp://
" URIs, and 8883 for
ssl://
URIs.
A client identifier clientId
must be specified and be less that 65535 characters. It must be unique
across all clients connecting to the same server. The clientId is used by the server to store data related to the
client, hence it is important that the clientId remain the same when connecting to a server if durable
subscriptions or reliable messaging are required.
A convenience method is provided to generate a random client id that should satisfy this criteria -
generateClientId()
. As the client identifier is used by the server to identify a client when it
reconnects, the client must use the same identifier between connections if durable subscriptions or reliable
delivery of messages is required.
In Java SE, SSL can be configured in one of several ways, which the client will use in the following order:
SSLSocketFactory
- applications can use
MqttConnectOptions.setSocketFactory(SocketFactory)
to supply a factory with the appropriate SSL settings.
MqttConnectOptions.setSSLProperties(Properties)
.In Java ME, the platform settings are used for SSL connections.
serverURI
- the address of the server to connect to, specified as a URI. Can be overridden using
MqttConnectOptions.setServerURIs(String[])
clientId
- a client identifier that is unique on the server being connected toIllegalArgumentException
- if the URI does not start with "tcp://", "ssl://" or "local://".IllegalArgumentException
- if the clientId is null or is greater than 65535 characters in lengthMqttException
- if any other problem was encounteredpublic MqttClient(String serverURI, String clientId, @Nullable MqttClientPersistence persistence) throws MqttException
The address of a server can be specified on the constructor. Alternatively a list containing one or more servers
can be specified using the setServerURIs
method on
MqttConnectOptions.
The serverURI
parameter is typically used with the the clientId
parameter to form a
key. The key is used to store and reference messages while they are being delivered. Hence the serverURI
specified on the constructor must still be specified even if a list of servers is specified on an
MqttConnectOptions object. The serverURI on the constructor must remain the same across restarts of the client
for delivery of messages to be maintained from a given client to a given server or set of servers.
The address of the server to connect to is specified as a URI. Two types of connection are supported
tcp://
for a TCP connection and ssl://
for a TCP connection secured by SSL/TLS. For
example:
tcp://localhost:1883
ssl://localhost:8883
tcp://
" URIs, and 8883 for
ssl://
URIs.
A client identifier clientId
must be specified and be less that 65535 characters. It must be unique
across all clients connecting to the same server. The clientId is used by the server to store data related to the
client, hence it is important that the clientId remain the same when connecting to a server if durable
subscriptions or reliable messaging are required.
A convenience method is provided to generate a random client id that should satisfy this criteria -
generateClientId()
. As the client identifier is used by the server to identify a client when it
reconnects, the client must use the same identifier between connections if durable subscriptions or reliable
delivery of messages is required.
In Java SE, SSL can be configured in one of several ways, which the client will use in the following order:
SSLSocketFactory
- applications can use
MqttConnectOptions.setSocketFactory(SocketFactory)
to supply a factory with the appropriate SSL settings.
MqttConnectOptions.setSSLProperties(Properties)
.In Java ME, the platform settings are used for SSL connections.
A persistence mechanism is used to enable reliable messaging. For messages sent at qualities of service (QoS) 1
or 2 to be reliably delivered, messages must be stored (on both the client and server) until the delivery of the
message is complete. If messages are not safely stored when being delivered then a failure in the client or
server can result in lost messages. A pluggable persistence mechanism is supported via the
MqttClientPersistence
interface. An implementer of this interface that safely stores messages must be
specified in order for delivery of messages to be reliable. In addition
MqttConnectOptions.setCleanSession(boolean)
must be set to false. In the event that only QoS 0 messages
are sent or received or cleanSession is set to true then a safe store is not needed.
serverURI
- the address of the server to connect to, specified as a URI. Can be overridden using
MqttConnectOptions.setServerURIs(String[])
clientId
- a client identifier that is unique on the server being connected topersistence
- the persistence class to use to store in-flight message. If null then the default persistence
mechanism is usedIllegalArgumentException
- if the URI does not start with "tcp://", "ssl://" or "local://"IllegalArgumentException
- if the clientId is null or is greater than 65535 characters in lengthMqttException
- if any other problem was encounteredpublic void close() throws MqttException
IMqttClient
close
in interface IMqttClient
MqttException
- if the client is not disconnected.public void connect() throws MqttSecurityException, MqttException
IMqttClient
The default options are specified in MqttConnectOptions
class.
connect
in interface IMqttClient
MqttSecurityException
- when the server rejects the connect for security reasonsMqttException
- for non security related problemsIMqttClient.connect(MqttConnectOptions)
public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException
IMqttClient
The server to connect to is specified on the constructor. It is recommended to call
IMqttClient.setCallback(MqttCallback)
prior to connecting in order that messages destined for the client can be
accepted as soon as the client is connected.
This is a blocking method that returns once connect completes
connect
in interface IMqttClient
options
- a set of connection parameters that override the defaults.MqttSecurityException
- when the server rejects the connect for security reasonsMqttException
- for non security related problems including communication errorspublic IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException
IMqttClient
The server to connect to is specified on the constructor. It is recommended to call
IMqttClient.setCallback(MqttCallback)
prior to connecting in order that messages destined for the client can be
accepted as soon as the client is connected.
This is a blocking method that returns once connect completes
connectWithResult
in interface IMqttClient
options
- a set of connection parameters that override the defaults.MqttSecurityException
- when the server rejects the connect for security reasonsMqttException
- for non security related problems including communication errorspublic void disconnect() throws MqttException
IMqttClient
An attempt is made to quiesce the client allowing outstanding work to complete before disconnecting. It will wait
for a maximum of 30 seconds for work to quiesce before disconnecting. This method must not be called from inside
MqttCallback
methods.
disconnect
in interface IMqttClient
MqttException
IMqttClient.disconnect(long)
public void disconnect(long quiesceTimeout) throws MqttException
IMqttClient
The client will wait for all MqttCallback
methods to complete. It will then wait for up to the quiesce
timeout to allow for work which has already been initiated to complete - for example, it will wait for the QoS 2
flows from earlier publications to complete. When work has completed or after the quiesce timeout, the client
will disconnect from the server. If the cleanSession flag was set to false and is set to false the next time a
connection is made QoS 1 and 2 messages that were not previously delivered will be delivered.
This is a blocking method that returns once disconnect completes
disconnect
in interface IMqttClient
quiesceTimeout
- the amount of time in milliseconds to allow for existing work to finish before disconnecting. A value
of zero or less means the client will not quiesce.MqttException
- if a problem is encountered while disconnectingpublic void disconnectForcibly() throws MqttException
IMqttClient
Because the client is able to establish the TCP/IP connection to a none MQTT server and it will certainly fail to send the disconnect packet. It will wait for a maximum of 30 seconds for work to quiesce before disconnecting and wait for a maximum of 10 seconds for sending the disconnect packet to server.
disconnectForcibly
in interface IMqttClient
MqttException
- if any unexpected errorpublic void disconnectForcibly(long disconnectTimeout) throws MqttException
IMqttClient
Because the client is able to establish the TCP/IP connection to a none MQTT server and it will certainly fail to send the disconnect packet. It will wait for a maximum of 30 seconds for work to quiesce before disconnecting.
disconnectForcibly
in interface IMqttClient
disconnectTimeout
- the amount of time in milliseconds to allow send disconnect packet to server.MqttException
- if any unexpected errorpublic void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException
IMqttClient
Because the client is able to establish the TCP/IP connection to a none MQTT server and it will certainly fail to send the disconnect packet.
disconnectForcibly
in interface IMqttClient
quiesceTimeout
- the amount of time in milliseconds to allow for existing work to finish before disconnecting. A value
of zero or less means the client will not quiesce.disconnectTimeout
- the amount of time in milliseconds to allow send disconnect packet to server.MqttException
- if any unexpected errorpublic static String generateClientId()
When cleanSession is set to false, an application must ensure it uses the same client identifier when it reconnects to the server to resume state and maintain assured message delivery.
MqttConnectOptions.setCleanSession(boolean)
public String getClientId()
IMqttClient
All clients connected to the same server or server farm must have a unique ID.
getClientId
in interface IMqttClient
public Debug getDebug()
public IMqttDeliveryToken[] getPendingDeliveryTokens()
IMqttClient
If a client has been restarted and there are messages that were in the process of being delivered when the client
stopped this method will return a token for each message enabling the delivery to be tracked Alternately the
MqttCallback.deliveryComplete(IMqttDeliveryToken)
callback can be used to track the delivery of
outstanding messages.
If a client connects with cleanSession true then there will be no delivery tokens as the cleanSession option deletes all earlier state. For state to be remembered the client must connect with cleanSession set to false
getPendingDeliveryTokens
in interface IMqttClient
public String getServerURI()
IMqttClient
The format is the same as specified on the constructor.
getServerURI
in interface IMqttClient
MqttAsyncClient.MqttAsyncClient(String, String)
public long getTimeToWait()
setTimeToWait(long)
public MqttTopic getTopic(String topic)
IMqttClient
An alternative method that should be used in preference to this one when publishing a message is:
publish(String, MqttMessage)
to publish a message in a blocking manner
IMqttAsyncClient.publish(String, MqttMessage, Object, IMqttActionListener)
When building an application, the design of the topic tree should take into account the following principles of topic name syntax and semantics:
The following principles apply to the construction and content of a topic tree:
getTopic
in interface IMqttClient
topic
- the topic to use, for example "finance/stock/ibm".public boolean isConnected()
IMqttClient
isConnected
in interface IMqttClient
true
if connected, false
otherwise.public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException
IMqttClient
This is a convenience method, which will create a new MqttMessage
object with a byte array payload and
the specified QoS, and then publish it. All other values in the message will be set to the defaults.
publish
in interface IMqttClient
topic
- to deliver the message to, for example "finance/stock/ibm".payload
- the byte array to use as the payloadqos
- the Quality of Service to deliver the message at. Valid values are 0, 1 or 2.retained
- whether or not this message should be retained by the server.MqttPersistenceException
- when a problem with storing the messageMqttException
- for other errors encountered while publishing the message. For instance client not connected.IMqttClient.publish(String, MqttMessage)
,
MqttMessage.setQos(int)
,
MqttMessage.setRetained(boolean)
public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException
IMqttClient
Delivers a message to the server at the requested quality of service and returns control once the message has been delivered. In the event the connection fails or the client stops, any messages that are in the process of being delivered will be delivered once a connection is re-established to the server on condition that:
In the event that the connection breaks or the client stops it is still possible to determine when the delivery of the message completes. Prior to re-establishing the connection to the server:
IMqttClient.setCallback(MqttCallback)
callback on the client and the delivery complete callback will
be notified once a delivery of a message completes
IMqttClient.getPendingDeliveryTokens()
which will return a token for each message that is in-flight. The
token can be used to wait for delivery to complete.
When building an application, the design of the topic tree should take into account the following principles of topic name syntax and semantics:
The following principles apply to the construction and content of a topic tree:
This is a blocking method that returns once publish completes
*publish
in interface IMqttClient
topic
- to deliver the message to, for example "finance/stock/ibm".message
- to delivery to the serverMqttPersistenceException
- when a problem with storing the messageMqttException
- for other errors encountered while publishing the message. For instance client not connected.public void setCallback(MqttCallback callback)
IMqttClient
There are a number of events that listener will be notified about. These include
Other events that track the progress of an individual operation such as connect and subscribe can be tracked
using the MqttToken
passed to the operation
setCallback
in interface IMqttClient
callback
- the class to callback when for events related to the clientMqttCallback
public void setTimeToWait(long timeToWaitInMillis) throws IllegalArgumentException
Set the maximum time to wait for an action to complete before returning control to the invoking application. Control is returned when:
timeToWaitInMillis
- before the action times out. A value or 0 or -1 will wait until the action finishes and not timeout.IllegalArgumentException
public void subscribe(String topicFilter) throws MqttException
IMqttClient
subscribe
in interface IMqttClient
topicFilter
- the topic to subscribe to, which can include wildcards.MqttException
- if there was an error registering the subscription.IMqttClient.subscribe(String[], int[])
public void subscribe(String[] topicFilters) throws MqttException
IMqttClient
subscribe
in interface IMqttClient
topicFilters
- the topic to subscribe to, which can include wildcards.MqttException
- if there was an error registering the subscription.IMqttClient.subscribe(String[], int[])
public void subscribe(String[] topicFilters, int[] qos) throws MqttException
IMqttClient
The IMqttClient.setCallback(MqttCallback)
method should be called before this method, otherwise any received
messages will be discarded.
If (@link MqttConnectOptions#setCleanSession(boolean)} was set to true when when connecting to the server then the subscription remains in place until either:
If (@link MqttConnectOptions#setCleanSession(boolean)} was set to false when when connecting to the server then the subscription remains in place until either:
The "topic filter" string used when subscribing may contain special characters, which allow you to subscribe to multiple topics at once.
The topic level separator is used to introduce structure into the topic, and can therefore be specified within the topic for that purpose. The multi-level wildcard and single-level wildcard can be used for subscriptions, but they cannot be used within a topic by the publisher of a message.
The number sign (#) is a wildcard character that matches any number of levels within a topic. For example, if you
subscribe to finance/stock/ibm/#, you receive messages on these
topics:
finance / stock / ibm
finance / stock / ibm / closingprice
finance / stock / ibm / currentprice
The multi-level wildcard can represent zero or more levels. Therefore, finance/# can also match the
singular finance, where # represents zero levels. The topic level separator is meaningless in
this context, because there are no levels to separate.
The multi-level wildcard can be specified only on its own or next to the topic level separator character. Therefore, # and finance/# are both valid, but finance# is not valid. The multi-level wildcard must be the last character used within the topic tree. For example, finance/# is valid but finance/#/closingprice is not valid.
The plus sign (+) is a wildcard character that matches only one topic level. For example, finance/stock/+ matches finance/stock/ibm and finance/stock/xyz, but not finance/stock/ibm/closingprice. Also, because the single-level wildcard matches only a single level, finance/+ does not match finance.
Use the single-level wildcard at any level in the topic tree, and in conjunction with the multilevel wildcard. Specify the single-level wildcard next to the topic level separator, except when it is specified on its own. Therefore, + and finance/+ are both valid, but finance+ is not valid. The single-level wildcard can be used at the end of the topic tree or within the topic tree. For example, finance/+ and finance/+/ibm are both valid.
This is a blocking method that returns once subscribe completes
subscribe
in interface IMqttClient
topicFilters
- one or more topics to subscribe to, which can include wildcards.qos
- the maximum quality of service to subscribe each topic at.Messages published at a lower quality of
service will be received at the published QoS. Messages published at a higher quality of service will
be received using the QoS specified on the subscribe.MqttException
- if there was an error registering the subscription.public void subscribe(String topicFilter, int qos) throws MqttException
IMqttClient
subscribe
in interface IMqttClient
topicFilter
- the topic to subscribe to, which can include wildcards.qos
- the maximum quality of service at which to subscribe. Messages published at a lower quality of service
will be received at the published QoS. Messages published at a higher quality of service will be
received using the QoS specified on the subscribe.MqttException
- if there was an error registering the subscription.IMqttClient.subscribe(String[], int[])
public void unsubscribe(String topicFilter) throws MqttException
IMqttClient
unsubscribe
in interface IMqttClient
topicFilter
- the topic to unsubscribe from. It must match a topicFilter specified on the subscribe.MqttException
- if there was an error unregistering the subscription.IMqttClient.unsubscribe(String[])
public void unsubscribe(String[] topicFilters) throws MqttException
IMqttClient
Unsubcribing is the opposite of subscribing. When the server receives the unsubscribe request it looks to see if it can find a subscription for the client and then removes it. After this point the server will send no more messages to the client for this subscription.
The topic(s) specified on the unsubscribe must match the topic(s) specified in the original subscribe request for the subscribe to succeed
This is a blocking method that returns once unsubscribe completes
unsubscribe
in interface IMqttClient
topicFilters
- one or more topics to unsubscribe from. Each topicFilter must match one specified on a subscribeMqttException
- if there was an error unregistering the subscription.