mqtt wrapper: initial commit

Signed-off-by: Thomas Klaehn <tkl@blackfinn.de>
This commit is contained in:
Thomas Klaehn 2018-01-29 21:01:46 +00:00 committed by Thomas Klaehn
commit 31b6294088
2 changed files with 110 additions and 0 deletions

97
mqtt/__init__.py Normal file
View File

@ -0,0 +1,97 @@
'''
Created on Dec 19, 2016
@author: klaehn
'''
import Queue
import threading
import time
import paho.mqtt.client as mqtt_client
class Mqtt(object):
''' Wrapper class for mqtt communication '''
def __init__(self, hostname, port=1883, keepalive=60, qos=2, retain=True, subscribe=[]):
''' Constructor
@param hostname: The hostname of the mqtt broker
@param port: The port number used for mqtt transport
@param keepalive: t.b.d.
@param qos: Quality of service
@param retain: Retain messages?
@param subscribe: List of topics to subscribe to for receive.
'''
self.hostname = hostname
self.port = port
self.keepalive = keepalive
self.client = mqtt_client.Client()
self.client.on_connect = self.__on_connect
self.client.on_message = self.__on_message
self.is_connected = False
self.qos = qos
self.retain = retain
self.subscribe = subscribe
self.receive_queue = Queue.Queue()
self.rx_sema = threading.Semaphore(0)
def connect(self):
''' Connect to mqtt broker '''
if not self.is_connected:
res = self.client.connect(self.hostname, self.port, \
self.keepalive)
if res != mqtt_client.MQTT_ERR_SUCCESS:
return False
res = self.client.loop_start()
if res == mqtt_client.MQTT_ERR_INVAL:
return False
self.is_connected = True
return True
return False
def disconnect(self):
''' Disconnect from mqtt broker '''
if self.is_connected:
self.client.loop_stop()
self.client.disconnect()
self.is_connected = False
return True
return False
def transmit(self, topic, payload):
''' Transmit to subscriber via broker
@param topic: The topic to be transmitted
@param payload: The payload to be transmitted
'''
was_connected = True
if not self.is_connected:
was_connected = False
if self.connect() is False:
return False
result = self.client.publish(topic, payload, self.qos, self.retain)
if not was_connected:
self.disconnect()
if result == 0:
return True
return False
def receive(self, timeout='infinite'):
''' Receive of subscribed messages.
@param timeout: The maximum time to wait for a message to be received.
'''
expire = 0
if timeout != 'infinite':
expire = time.time() + timeout
while True:
self.rx_sema.acquire()
if not self.receive_queue.empty():
return self.receive_queue.get()
if expire > 0 and time.time() > expire:
return None
def __on_connect(self, client, userdata, flags, rc):
''' Callback for the client's on_connect '''
for subscribtion in self.subscribe:
client.subscribe(subscribtion)
def __on_message(self, client, userdata, message):
''' Callback for the clients on_message '''
self.receive_queue.put(message)
self.rx_sema.release()

13
setup.py Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
'''
Created on Jan 29, 2011
Type 'python setup.py sdist' to create the distribution,
type 'python setup.py install' to install the distribution.
@author: Thomas Klaehn <thomas.klaehn@u-blox.com>
'''
from distutils.core import setup
setup(name='mqtt', version='1.0.0', author='Thomas Klaehn',
author_email='tkl@blackfinn.de', packages=['mqtt'])