commit 31b62940880eaa2bd0065a122b9388a7d4925d62 Author: Thomas Klaehn Date: Mon Jan 29 21:01:46 2018 +0000 mqtt wrapper: initial commit Signed-off-by: Thomas Klaehn diff --git a/mqtt/__init__.py b/mqtt/__init__.py new file mode 100644 index 0000000..4ec2924 --- /dev/null +++ b/mqtt/__init__.py @@ -0,0 +1,97 @@ +''' +Created on Dec 19, 2016 + +@author: klaehn +''' +import Queue +import threading +import time +import paho.mqtt.client as mqtt_client + +class Mqtt(object): + ''' Wrapper class for mqtt communication ''' + def __init__(self, hostname, port=1883, keepalive=60, qos=2, retain=True, subscribe=[]): + ''' Constructor + @param hostname: The hostname of the mqtt broker + @param port: The port number used for mqtt transport + @param keepalive: t.b.d. + @param qos: Quality of service + @param retain: Retain messages? + @param subscribe: List of topics to subscribe to for receive. + ''' + self.hostname = hostname + self.port = port + self.keepalive = keepalive + self.client = mqtt_client.Client() + self.client.on_connect = self.__on_connect + self.client.on_message = self.__on_message + self.is_connected = False + self.qos = qos + self.retain = retain + self.subscribe = subscribe + self.receive_queue = Queue.Queue() + self.rx_sema = threading.Semaphore(0) + + def connect(self): + ''' Connect to mqtt broker ''' + if not self.is_connected: + res = self.client.connect(self.hostname, self.port, \ + self.keepalive) + if res != mqtt_client.MQTT_ERR_SUCCESS: + return False + res = self.client.loop_start() + if res == mqtt_client.MQTT_ERR_INVAL: + return False + self.is_connected = True + return True + return False + + def disconnect(self): + ''' Disconnect from mqtt broker ''' + if self.is_connected: + self.client.loop_stop() + self.client.disconnect() + self.is_connected = False + return True + return False + + def transmit(self, topic, payload): + ''' Transmit to subscriber via broker + @param topic: The topic to be transmitted + @param payload: The payload to be transmitted + ''' + was_connected = True + if not self.is_connected: + was_connected = False + if self.connect() is False: + return False + result = self.client.publish(topic, payload, self.qos, self.retain) + if not was_connected: + self.disconnect() + if result == 0: + return True + return False + + def receive(self, timeout='infinite'): + ''' Receive of subscribed messages. + @param timeout: The maximum time to wait for a message to be received. + ''' + expire = 0 + if timeout != 'infinite': + expire = time.time() + timeout + while True: + self.rx_sema.acquire() + if not self.receive_queue.empty(): + return self.receive_queue.get() + if expire > 0 and time.time() > expire: + return None + + def __on_connect(self, client, userdata, flags, rc): + ''' Callback for the client's on_connect ''' + for subscribtion in self.subscribe: + client.subscribe(subscribtion) + + def __on_message(self, client, userdata, message): + ''' Callback for the clients on_message ''' + self.receive_queue.put(message) + self.rx_sema.release() diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..5655729 --- /dev/null +++ b/setup.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +''' +Created on Jan 29, 2011 + +Type 'python setup.py sdist' to create the distribution, +type 'python setup.py install' to install the distribution. + +@author: Thomas Klaehn +''' +from distutils.core import setup + +setup(name='mqtt', version='1.0.0', author='Thomas Klaehn', + author_email='tkl@blackfinn.de', packages=['mqtt'])