aboutsummaryrefslogtreecommitdiff
path: root/pyhon/connection/mqtt.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyhon/connection/mqtt.py')
-rw-r--r--pyhon/connection/mqtt.py195
1 files changed, 116 insertions, 79 deletions
diff --git a/pyhon/connection/mqtt.py b/pyhon/connection/mqtt.py
index ce6eaed..cb1f309 100644
--- a/pyhon/connection/mqtt.py
+++ b/pyhon/connection/mqtt.py
@@ -1,3 +1,4 @@
1import asyncio
1import json 2import json
2import logging 3import logging
3import secrets 4import secrets
@@ -10,91 +11,127 @@ from pyhon import const
10from pyhon.appliance import HonAppliance 11from pyhon.appliance import HonAppliance
11 12
12if TYPE_CHECKING: 13if TYPE_CHECKING:
13 from pyhon import HonAPI 14 from pyhon import Hon
14 15
15_LOGGER = logging.getLogger(__name__) 16_LOGGER = logging.getLogger(__name__)
16 17
17appliances: list[HonAppliance] = []
18
19
20def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData) -> None:
21 _LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data))
22
23
24def on_lifecycle_connection_success(
25 lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
26) -> None:
27 _LOGGER.info(
28 "Lifecycle Connection Success: %s", str(lifecycle_connect_success_data)
29 )
30
31
32def on_lifecycle_attempting_connect(
33 lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData,
34) -> None:
35 _LOGGER.info(
36 "Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data)
37 )
38
39
40def on_lifecycle_connection_failure(
41 lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData,
42) -> None:
43 _LOGGER.info(
44 "Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data)
45 )
46 18
19class MQTTClient:
20 def __init__(self, hon: "Hon"):
21 self._client: mqtt5.Client | None = None
22 self._hon = hon
23 self._api = hon.api
24 self._appliances = hon.appliances
25 self._connection = False
26 self._watchdog_task: asyncio.Task[None] | None = None
27
28 @property
29 def client(self) -> mqtt5.Client:
30 if self._client is not None:
31 return self._client
32 raise AttributeError("Client is not set")
33
34 async def create(self) -> "MQTTClient":
35 await self._start()
36 self._subscribe_appliances()
37 return self
38
39 def _on_lifecycle_stopped(
40 self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData
41 ) -> None:
42 _LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data))
43
44 def _on_lifecycle_connection_success(
45 self,
46 lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
47 ) -> None:
48 self._connection = True
49 _LOGGER.info(
50 "Lifecycle Connection Success: %s", str(lifecycle_connect_success_data)
51 )
47 52
48def on_lifecycle_disconnection( 53 def _on_lifecycle_attempting_connect(
49 lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData, 54 self,
50) -> None: 55 lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData,
51 _LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data)) 56 ) -> None:
57 _LOGGER.info(
58 "Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data)
59 )
52 60
61 def _on_lifecycle_connection_failure(
62 self,
63 lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData,
64 ) -> None:
65 _LOGGER.info(
66 "Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data)
67 )
53 68
54def on_publish_received(data: mqtt5.PublishReceivedData) -> None: 69 def _on_lifecycle_disconnection(
55 if not (data and data.publish_packet and data.publish_packet.payload): 70 self,
56 return 71 lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData,
57 payload = json.loads(data.publish_packet.payload.decode()) 72 ) -> None:
58 topic = data.publish_packet.topic 73 self._connection = False
59 if topic and "appliancestatus" in topic: 74 _LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data))
75
76 def _on_publish_received(self, data: mqtt5.PublishReceivedData) -> None:
77 if not (data and data.publish_packet and data.publish_packet.payload):
78 return
79 payload = json.loads(data.publish_packet.payload.decode())
80 topic = data.publish_packet.topic
60 appliance = next( 81 appliance = next(
61 a for a in appliances if topic in a.info["topics"]["subscribe"] 82 a for a in self._appliances if topic in a.info["topics"]["subscribe"]
83 )
84 if topic and "appliancestatus" in topic:
85 for parameter in payload["parameters"]:
86 appliance.attributes["parameters"][parameter["parName"]].update(
87 parameter
88 )
89 appliance.sync_params_to_command("settings")
90 self._hon.notify()
91 elif topic and "connected" in topic:
92 _LOGGER.info("Connected %s", appliance.nick_name)
93 elif topic and "disconnected" in topic:
94 _LOGGER.info("Disconnected %s", appliance.nick_name)
95 elif topic and "discovery" in topic:
96 _LOGGER.info("Discovered %s", appliance.nick_name)
97 _LOGGER.info("%s - %s", topic, payload)
98
99 async def _start(self) -> None:
100 self._client = mqtt5_client_builder.websockets_with_custom_authorizer(
101 endpoint=const.AWS_ENDPOINT,
102 auth_authorizer_name=const.AWS_AUTHORIZER,
103 auth_authorizer_signature=await self._api.load_aws_token(),
104 auth_token_key_name="token",
105 auth_token_value=self._api.auth.id_token,
106 client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}",
107 on_lifecycle_stopped=self._on_lifecycle_stopped,
108 on_lifecycle_connection_success=self._on_lifecycle_connection_success,
109 on_lifecycle_attempting_connect=self._on_lifecycle_attempting_connect,
110 on_lifecycle_connection_failure=self._on_lifecycle_connection_failure,
111 on_lifecycle_disconnection=self._on_lifecycle_disconnection,
112 on_publish_received=self._on_publish_received,
62 ) 113 )
63 for parameter in payload["parameters"]: 114 self.client.start()
64 appliance.attributes["parameters"][parameter["parName"]].update(parameter) 115
65 appliance.notify() 116 def _subscribe_appliances(self) -> None:
66 _LOGGER.info("%s - %s", topic, payload) 117 for appliance in self._appliances:
67 118 self._subscribe(appliance)
68 119
69async def create_mqtt_client(api: "HonAPI") -> mqtt5.Client: 120 def _subscribe(self, appliance: HonAppliance) -> None:
70 client: mqtt5.Client = mqtt5_client_builder.websockets_with_custom_authorizer( 121 for topic in appliance.info.get("topics", {}).get("subscribe", []):
71 endpoint=const.AWS_ENDPOINT, 122 self.client.subscribe(
72 auth_authorizer_name=const.AWS_AUTHORIZER, 123 mqtt5.SubscribePacket([mqtt5.Subscription(topic)])
73 auth_authorizer_signature=await api.load_aws_token(), 124 ).result(10)
74 auth_token_key_name="token", 125 _LOGGER.info("Subscribed to topic %s", topic)
75 auth_token_value=api.auth.id_token, 126
76 client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}", 127 async def start_watchdog(self) -> None:
77 on_lifecycle_stopped=on_lifecycle_stopped, 128 if not self._watchdog_task or self._watchdog_task.done():
78 on_lifecycle_connection_success=on_lifecycle_connection_success, 129 await asyncio.create_task(self._watchdog())
79 on_lifecycle_attempting_connect=on_lifecycle_attempting_connect, 130
80 on_lifecycle_connection_failure=on_lifecycle_connection_failure, 131 async def _watchdog(self) -> None:
81 on_lifecycle_disconnection=on_lifecycle_disconnection, 132 while True:
82 on_publish_received=on_publish_received, 133 await asyncio.sleep(5)
83 ) 134 if not self._connection:
84 client.start() 135 _LOGGER.info("Restart mqtt connection")
85 return client 136 await self._start()
86 137 self._subscribe_appliances()
87
88def subscribe(client: mqtt5.Client, appliance: HonAppliance) -> None:
89 for topic in appliance.info.get("topics", {}).get("subscribe", []):
90 client.subscribe(mqtt5.SubscribePacket([mqtt5.Subscription(topic)])).result(10)
91 _LOGGER.info("Subscribed to topic %s", topic)
92
93
94async def start(api: "HonAPI", app: list[HonAppliance]) -> mqtt5.Client:
95 client = await create_mqtt_client(api)
96 global appliances # pylint: disable=global-statement
97 appliances = app
98 for appliance in appliances:
99 subscribe(client, appliance)
100 return client