diff options
Diffstat (limited to 'pyhon/connection/mqtt.py')
-rw-r--r-- | pyhon/connection/mqtt.py | 195 |
1 files changed, 116 insertions, 79 deletions
diff --git a/pyhon/connection/mqtt.py b/pyhon/connection/mqtt.py index ce6eaed..cb1f309 100644 --- a/pyhon/connection/mqtt.py +++ b/pyhon/connection/mqtt.py | |||
@@ -1,3 +1,4 @@ | |||
1 | import asyncio | ||
1 | import json | 2 | import json |
2 | import logging | 3 | import logging |
3 | import secrets | 4 | import secrets |
@@ -10,91 +11,127 @@ from pyhon import const | |||
10 | from pyhon.appliance import HonAppliance | 11 | from pyhon.appliance import HonAppliance |
11 | 12 | ||
12 | if TYPE_CHECKING: | 13 | if TYPE_CHECKING: |
13 | from pyhon import HonAPI | 14 | from pyhon import Hon |
14 | 15 | ||
15 | _LOGGER = logging.getLogger(__name__) | 16 | _LOGGER = logging.getLogger(__name__) |
16 | 17 | ||
17 | appliances: list[HonAppliance] = [] | ||
18 | |||
19 | |||
20 | def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData) -> None: | ||
21 | _LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data)) | ||
22 | |||
23 | |||
24 | def on_lifecycle_connection_success( | ||
25 | lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData, | ||
26 | ) -> None: | ||
27 | _LOGGER.info( | ||
28 | "Lifecycle Connection Success: %s", str(lifecycle_connect_success_data) | ||
29 | ) | ||
30 | |||
31 | |||
32 | def on_lifecycle_attempting_connect( | ||
33 | lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData, | ||
34 | ) -> None: | ||
35 | _LOGGER.info( | ||
36 | "Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data) | ||
37 | ) | ||
38 | |||
39 | |||
40 | def on_lifecycle_connection_failure( | ||
41 | lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData, | ||
42 | ) -> None: | ||
43 | _LOGGER.info( | ||
44 | "Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data) | ||
45 | ) | ||
46 | 18 | ||
19 | class MQTTClient: | ||
20 | def __init__(self, hon: "Hon"): | ||
21 | self._client: mqtt5.Client | None = None | ||
22 | self._hon = hon | ||
23 | self._api = hon.api | ||
24 | self._appliances = hon.appliances | ||
25 | self._connection = False | ||
26 | self._watchdog_task: asyncio.Task[None] | None = None | ||
27 | |||
28 | @property | ||
29 | def client(self) -> mqtt5.Client: | ||
30 | if self._client is not None: | ||
31 | return self._client | ||
32 | raise AttributeError("Client is not set") | ||
33 | |||
34 | async def create(self) -> "MQTTClient": | ||
35 | await self._start() | ||
36 | self._subscribe_appliances() | ||
37 | return self | ||
38 | |||
39 | def _on_lifecycle_stopped( | ||
40 | self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData | ||
41 | ) -> None: | ||
42 | _LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data)) | ||
43 | |||
44 | def _on_lifecycle_connection_success( | ||
45 | self, | ||
46 | lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData, | ||
47 | ) -> None: | ||
48 | self._connection = True | ||
49 | _LOGGER.info( | ||
50 | "Lifecycle Connection Success: %s", str(lifecycle_connect_success_data) | ||
51 | ) | ||
47 | 52 | ||
48 | def on_lifecycle_disconnection( | 53 | def _on_lifecycle_attempting_connect( |
49 | lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData, | 54 | self, |
50 | ) -> None: | 55 | lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData, |
51 | _LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data)) | 56 | ) -> None: |
57 | _LOGGER.info( | ||
58 | "Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data) | ||
59 | ) | ||
52 | 60 | ||
61 | def _on_lifecycle_connection_failure( | ||
62 | self, | ||
63 | lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData, | ||
64 | ) -> None: | ||
65 | _LOGGER.info( | ||
66 | "Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data) | ||
67 | ) | ||
53 | 68 | ||
54 | def on_publish_received(data: mqtt5.PublishReceivedData) -> None: | 69 | def _on_lifecycle_disconnection( |
55 | if not (data and data.publish_packet and data.publish_packet.payload): | 70 | self, |
56 | return | 71 | lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData, |
57 | payload = json.loads(data.publish_packet.payload.decode()) | 72 | ) -> None: |
58 | topic = data.publish_packet.topic | 73 | self._connection = False |
59 | if topic and "appliancestatus" in topic: | 74 | _LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data)) |
75 | |||
76 | def _on_publish_received(self, data: mqtt5.PublishReceivedData) -> None: | ||
77 | if not (data and data.publish_packet and data.publish_packet.payload): | ||
78 | return | ||
79 | payload = json.loads(data.publish_packet.payload.decode()) | ||
80 | topic = data.publish_packet.topic | ||
60 | appliance = next( | 81 | appliance = next( |
61 | a for a in appliances if topic in a.info["topics"]["subscribe"] | 82 | a for a in self._appliances if topic in a.info["topics"]["subscribe"] |
83 | ) | ||
84 | if topic and "appliancestatus" in topic: | ||
85 | for parameter in payload["parameters"]: | ||
86 | appliance.attributes["parameters"][parameter["parName"]].update( | ||
87 | parameter | ||
88 | ) | ||
89 | appliance.sync_params_to_command("settings") | ||
90 | self._hon.notify() | ||
91 | elif topic and "connected" in topic: | ||
92 | _LOGGER.info("Connected %s", appliance.nick_name) | ||
93 | elif topic and "disconnected" in topic: | ||
94 | _LOGGER.info("Disconnected %s", appliance.nick_name) | ||
95 | elif topic and "discovery" in topic: | ||
96 | _LOGGER.info("Discovered %s", appliance.nick_name) | ||
97 | _LOGGER.info("%s - %s", topic, payload) | ||
98 | |||
99 | async def _start(self) -> None: | ||
100 | self._client = mqtt5_client_builder.websockets_with_custom_authorizer( | ||
101 | endpoint=const.AWS_ENDPOINT, | ||
102 | auth_authorizer_name=const.AWS_AUTHORIZER, | ||
103 | auth_authorizer_signature=await self._api.load_aws_token(), | ||
104 | auth_token_key_name="token", | ||
105 | auth_token_value=self._api.auth.id_token, | ||
106 | client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}", | ||
107 | on_lifecycle_stopped=self._on_lifecycle_stopped, | ||
108 | on_lifecycle_connection_success=self._on_lifecycle_connection_success, | ||
109 | on_lifecycle_attempting_connect=self._on_lifecycle_attempting_connect, | ||
110 | on_lifecycle_connection_failure=self._on_lifecycle_connection_failure, | ||
111 | on_lifecycle_disconnection=self._on_lifecycle_disconnection, | ||
112 | on_publish_received=self._on_publish_received, | ||
62 | ) | 113 | ) |
63 | for parameter in payload["parameters"]: | 114 | self.client.start() |
64 | appliance.attributes["parameters"][parameter["parName"]].update(parameter) | 115 | |
65 | appliance.notify() | 116 | def _subscribe_appliances(self) -> None: |
66 | _LOGGER.info("%s - %s", topic, payload) | 117 | for appliance in self._appliances: |
67 | 118 | self._subscribe(appliance) | |
68 | 119 | ||
69 | async def create_mqtt_client(api: "HonAPI") -> mqtt5.Client: | 120 | def _subscribe(self, appliance: HonAppliance) -> None: |
70 | client: mqtt5.Client = mqtt5_client_builder.websockets_with_custom_authorizer( | 121 | for topic in appliance.info.get("topics", {}).get("subscribe", []): |
71 | endpoint=const.AWS_ENDPOINT, | 122 | self.client.subscribe( |
72 | auth_authorizer_name=const.AWS_AUTHORIZER, | 123 | mqtt5.SubscribePacket([mqtt5.Subscription(topic)]) |
73 | auth_authorizer_signature=await api.load_aws_token(), | 124 | ).result(10) |
74 | auth_token_key_name="token", | 125 | _LOGGER.info("Subscribed to topic %s", topic) |
75 | auth_token_value=api.auth.id_token, | 126 | |
76 | client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}", | 127 | async def start_watchdog(self) -> None: |
77 | on_lifecycle_stopped=on_lifecycle_stopped, | 128 | if not self._watchdog_task or self._watchdog_task.done(): |
78 | on_lifecycle_connection_success=on_lifecycle_connection_success, | 129 | await asyncio.create_task(self._watchdog()) |
79 | on_lifecycle_attempting_connect=on_lifecycle_attempting_connect, | 130 | |
80 | on_lifecycle_connection_failure=on_lifecycle_connection_failure, | 131 | async def _watchdog(self) -> None: |
81 | on_lifecycle_disconnection=on_lifecycle_disconnection, | 132 | while True: |
82 | on_publish_received=on_publish_received, | 133 | await asyncio.sleep(5) |
83 | ) | 134 | if not self._connection: |
84 | client.start() | 135 | _LOGGER.info("Restart mqtt connection") |
85 | return client | 136 | await self._start() |
86 | 137 | self._subscribe_appliances() | |
87 | |||
88 | def subscribe(client: mqtt5.Client, appliance: HonAppliance) -> None: | ||
89 | for topic in appliance.info.get("topics", {}).get("subscribe", []): | ||
90 | client.subscribe(mqtt5.SubscribePacket([mqtt5.Subscription(topic)])).result(10) | ||
91 | _LOGGER.info("Subscribed to topic %s", topic) | ||
92 | |||
93 | |||
94 | async def start(api: "HonAPI", app: list[HonAppliance]) -> mqtt5.Client: | ||
95 | client = await create_mqtt_client(api) | ||
96 | global appliances # pylint: disable=global-statement | ||
97 | appliances = app | ||
98 | for appliance in appliances: | ||
99 | subscribe(client, appliance) | ||
100 | return client | ||