diff --git a/plugins/event_source/mqtt.py b/plugins/event_source/mqtt.py index 12c50af..0f71f0e 100644 --- a/plugins/event_source/mqtt.py +++ b/plugins/event_source/mqtt.py @@ -56,29 +56,28 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]) -> None: cert_reqs=validate_certs if validate_certs is not None else True, ) - mqtt_consumer = aiomqtt.Client( + async with aiomqtt.Client( hostname=host, port=port, username=username, password=password, tls_params=tls_params if ca_certs else None, - ) - - await mqtt_consumer.connect() - - try: - async with mqtt_consumer.messages() as messages: - await mqtt_consumer.subscribe(topic) - async for message in messages: - try: - data = json.loads(message.payload.decode()) - await queue.put(data) - except json.decoder.JSONDecodeError: - logger.exception("Decoding exception for incoming message") - finally: - logger.info("Disconneccting from broker") - mqtt_consumer.disconnect() - + ) as mqtt_consumer: + + try: + async with mqtt_consumer.messages() as messages: + await mqtt_consumer.subscribe(topic) + async for message in messages: + try: + try: + data = json.loads(message.payload.decode()) + except json.decoder.JSONDecodeError: + data = dict(payload=message.payload.decode()) + await queue.put(data) + except json.decoder.JSONDecodeError: + logger.exception("Decoding exception for incoming message") + finally: + logger.info("Disconneccting from broker") if __name__ == "__main__": """MockQueue if running directly."""