aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pyhon/appliance.py11
-rw-r--r--pyhon/connection/api.py7
-rw-r--r--pyhon/connection/mqtt.py195
-rw-r--r--pyhon/hon.py15
-rw-r--r--setup.py2
5 files changed, 131 insertions, 99 deletions
diff --git a/pyhon/appliance.py b/pyhon/appliance.py
index 73992c1..3754aca 100644
--- a/pyhon/appliance.py
+++ b/pyhon/appliance.py
@@ -3,7 +3,7 @@ import logging
3import re 3import re
4from datetime import datetime, timedelta 4from datetime import datetime, timedelta
5from pathlib import Path 5from pathlib import Path
6from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload, Callable 6from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
7 7
8from pyhon import diagnose, exceptions 8from pyhon import diagnose, exceptions
9from pyhon.appliances.base import ApplianceBase 9from pyhon.appliances.base import ApplianceBase
@@ -43,7 +43,6 @@ class HonAppliance:
43 self._additional_data: Dict[str, Any] = {} 43 self._additional_data: Dict[str, Any] = {}
44 self._last_update: Optional[datetime] = None 44 self._last_update: Optional[datetime] = None
45 self._default_setting = HonParameter("", {}, "") 45 self._default_setting = HonParameter("", {}, "")
46 self._notify_function: Optional[Callable[[Any], None]] = None
47 46
48 try: 47 try:
49 self._extra: Optional[ApplianceBase] = importlib.import_module( 48 self._extra: Optional[ApplianceBase] = importlib.import_module(
@@ -313,11 +312,3 @@ class HonAppliance:
313 elif isinstance(target, HonParameterEnum): 312 elif isinstance(target, HonParameterEnum):
314 target.values = main.values 313 target.values = main.values
315 target.value = main.value 314 target.value = main.value
316
317 def subscribe(self, notify_function: Callable[[Any], None]) -> None:
318 self._notify_function = notify_function
319
320 def notify(self) -> None:
321 self.sync_params_to_command("settings")
322 if self._notify_function:
323 self._notify_function(self.attributes)
diff --git a/pyhon/connection/api.py b/pyhon/connection/api.py
index f99fd5d..788a067 100644
--- a/pyhon/connection/api.py
+++ b/pyhon/connection/api.py
@@ -7,12 +7,10 @@ from types import TracebackType
7from typing import Dict, Optional, Any, List, no_type_check, Type 7from typing import Dict, Optional, Any, List, no_type_check, Type
8 8
9from aiohttp import ClientSession 9from aiohttp import ClientSession
10from awscrt import mqtt5
11from typing_extensions import Self 10from typing_extensions import Self
12 11
13from pyhon import const, exceptions 12from pyhon import const, exceptions
14from pyhon.appliance import HonAppliance 13from pyhon.appliance import HonAppliance
15from pyhon.connection import mqtt
16from pyhon.connection.auth import HonAuth 14from pyhon.connection.auth import HonAuth
17from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler 15from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler
18from pyhon.connection.handler.hon import HonConnectionHandler 16from pyhon.connection.handler.hon import HonConnectionHandler
@@ -40,7 +38,6 @@ class HonAPI:
40 self._hon_handler: Optional[HonConnectionHandler] = None 38 self._hon_handler: Optional[HonConnectionHandler] = None
41 self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None 39 self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None
42 self._session: Optional[ClientSession] = session 40 self._session: Optional[ClientSession] = session
43 self._mqtt_client: mqtt5.Client | None = None
44 41
45 async def __aenter__(self) -> Self: 42 async def __aenter__(self) -> Self:
46 return await self.create() 43 return await self.create()
@@ -269,10 +266,6 @@ class HonAPI:
269 result: Dict[str, Any] = await response.json() 266 result: Dict[str, Any] = await response.json()
270 return result 267 return result
271 268
272 async def subscribe_mqtt(self, appliances: list[HonAppliance]) -> None:
273 if not self._mqtt_client:
274 self._mqtt_client = await mqtt.start(self, appliances)
275
276 async def close(self) -> None: 269 async def close(self) -> None:
277 if self._hon_handler is not None: 270 if self._hon_handler is not None:
278 await self._hon_handler.close() 271 await self._hon_handler.close()
diff --git a/pyhon/connection/mqtt.py b/pyhon/connection/mqtt.py
index ce6eaed..cb1f309 100644
--- a/pyhon/connection/mqtt.py
+++ b/pyhon/connection/mqtt.py
@@ -1,3 +1,4 @@
1import asyncio
1import json 2import json
2import logging 3import logging
3import secrets 4import secrets
@@ -10,91 +11,127 @@ from pyhon import const
10from pyhon.appliance import HonAppliance 11from pyhon.appliance import HonAppliance
11 12
12if TYPE_CHECKING: 13if TYPE_CHECKING:
13 from pyhon import HonAPI 14 from pyhon import Hon
14 15
15_LOGGER = logging.getLogger(__name__) 16_LOGGER = logging.getLogger(__name__)
16 17
17appliances: list[HonAppliance] = []
18
19
20def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData) -> None:
21 _LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data))
22
23
24def on_lifecycle_connection_success(
25 lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
26) -> None:
27 _LOGGER.info(
28 "Lifecycle Connection Success: %s", str(lifecycle_connect_success_data)
29 )
30
31
32def on_lifecycle_attempting_connect(
33 lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData,
34) -> None:
35 _LOGGER.info(
36 "Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data)
37 )
38
39
40def on_lifecycle_connection_failure(
41 lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData,
42) -> None:
43 _LOGGER.info(
44 "Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data)
45 )
46 18
19class MQTTClient:
20 def __init__(self, hon: "Hon"):
21 self._client: mqtt5.Client | None = None
22 self._hon = hon
23 self._api = hon.api
24 self._appliances = hon.appliances
25 self._connection = False
26 self._watchdog_task: asyncio.Task[None] | None = None
27
28 @property
29 def client(self) -> mqtt5.Client:
30 if self._client is not None:
31 return self._client
32 raise AttributeError("Client is not set")
33
34 async def create(self) -> "MQTTClient":
35 await self._start()
36 self._subscribe_appliances()
37 return self
38
39 def _on_lifecycle_stopped(
40 self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData
41 ) -> None:
42 _LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data))
43
44 def _on_lifecycle_connection_success(
45 self,
46 lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
47 ) -> None:
48 self._connection = True
49 _LOGGER.info(
50 "Lifecycle Connection Success: %s", str(lifecycle_connect_success_data)
51 )
47 52
48def on_lifecycle_disconnection( 53 def _on_lifecycle_attempting_connect(
49 lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData, 54 self,
50) -> None: 55 lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData,
51 _LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data)) 56 ) -> None:
57 _LOGGER.info(
58 "Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data)
59 )
52 60
61 def _on_lifecycle_connection_failure(
62 self,
63 lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData,
64 ) -> None:
65 _LOGGER.info(
66 "Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data)
67 )
53 68
54def on_publish_received(data: mqtt5.PublishReceivedData) -> None: 69 def _on_lifecycle_disconnection(
55 if not (data and data.publish_packet and data.publish_packet.payload): 70 self,
56 return 71 lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData,
57 payload = json.loads(data.publish_packet.payload.decode()) 72 ) -> None:
58 topic = data.publish_packet.topic 73 self._connection = False
59 if topic and "appliancestatus" in topic: 74 _LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data))
75
76 def _on_publish_received(self, data: mqtt5.PublishReceivedData) -> None:
77 if not (data and data.publish_packet and data.publish_packet.payload):
78 return
79 payload = json.loads(data.publish_packet.payload.decode())
80 topic = data.publish_packet.topic
60 appliance = next( 81 appliance = next(
61 a for a in appliances if topic in a.info["topics"]["subscribe"] 82 a for a in self._appliances if topic in a.info["topics"]["subscribe"]
83 )
84 if topic and "appliancestatus" in topic:
85 for parameter in payload["parameters"]:
86 appliance.attributes["parameters"][parameter["parName"]].update(
87 parameter
88 )
89 appliance.sync_params_to_command("settings")
90 self._hon.notify()
91 elif topic and "connected" in topic:
92 _LOGGER.info("Connected %s", appliance.nick_name)
93 elif topic and "disconnected" in topic:
94 _LOGGER.info("Disconnected %s", appliance.nick_name)
95 elif topic and "discovery" in topic:
96 _LOGGER.info("Discovered %s", appliance.nick_name)
97 _LOGGER.info("%s - %s", topic, payload)
98
99 async def _start(self) -> None:
100 self._client = mqtt5_client_builder.websockets_with_custom_authorizer(
101 endpoint=const.AWS_ENDPOINT,
102 auth_authorizer_name=const.AWS_AUTHORIZER,
103 auth_authorizer_signature=await self._api.load_aws_token(),
104 auth_token_key_name="token",
105 auth_token_value=self._api.auth.id_token,
106 client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}",
107 on_lifecycle_stopped=self._on_lifecycle_stopped,
108 on_lifecycle_connection_success=self._on_lifecycle_connection_success,
109 on_lifecycle_attempting_connect=self._on_lifecycle_attempting_connect,
110 on_lifecycle_connection_failure=self._on_lifecycle_connection_failure,
111 on_lifecycle_disconnection=self._on_lifecycle_disconnection,
112 on_publish_received=self._on_publish_received,
62 ) 113 )
63 for parameter in payload["parameters"]: 114 self.client.start()
64 appliance.attributes["parameters"][parameter["parName"]].update(parameter) 115
65 appliance.notify() 116 def _subscribe_appliances(self) -> None:
66 _LOGGER.info("%s - %s", topic, payload) 117 for appliance in self._appliances:
67 118 self._subscribe(appliance)
68 119
69async def create_mqtt_client(api: "HonAPI") -> mqtt5.Client: 120 def _subscribe(self, appliance: HonAppliance) -> None:
70 client: mqtt5.Client = mqtt5_client_builder.websockets_with_custom_authorizer( 121 for topic in appliance.info.get("topics", {}).get("subscribe", []):
71 endpoint=const.AWS_ENDPOINT, 122 self.client.subscribe(
72 auth_authorizer_name=const.AWS_AUTHORIZER, 123 mqtt5.SubscribePacket([mqtt5.Subscription(topic)])
73 auth_authorizer_signature=await api.load_aws_token(), 124 ).result(10)
74 auth_token_key_name="token", 125 _LOGGER.info("Subscribed to topic %s", topic)
75 auth_token_value=api.auth.id_token, 126
76 client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}", 127 async def start_watchdog(self) -> None:
77 on_lifecycle_stopped=on_lifecycle_stopped, 128 if not self._watchdog_task or self._watchdog_task.done():
78 on_lifecycle_connection_success=on_lifecycle_connection_success, 129 await asyncio.create_task(self._watchdog())
79 on_lifecycle_attempting_connect=on_lifecycle_attempting_connect, 130
80 on_lifecycle_connection_failure=on_lifecycle_connection_failure, 131 async def _watchdog(self) -> None:
81 on_lifecycle_disconnection=on_lifecycle_disconnection, 132 while True:
82 on_publish_received=on_publish_received, 133 await asyncio.sleep(5)
83 ) 134 if not self._connection:
84 client.start() 135 _LOGGER.info("Restart mqtt connection")
85 return client 136 await self._start()
86 137 self._subscribe_appliances()
87
88def subscribe(client: mqtt5.Client, appliance: HonAppliance) -> None:
89 for topic in appliance.info.get("topics", {}).get("subscribe", []):
90 client.subscribe(mqtt5.SubscribePacket([mqtt5.Subscription(topic)])).result(10)
91 _LOGGER.info("Subscribed to topic %s", topic)
92
93
94async def start(api: "HonAPI", app: list[HonAppliance]) -> mqtt5.Client:
95 client = await create_mqtt_client(api)
96 global appliances # pylint: disable=global-statement
97 appliances = app
98 for appliance in appliances:
99 subscribe(client, appliance)
100 return client
diff --git a/pyhon/hon.py b/pyhon/hon.py
index acdd92c..5d2da5d 100644
--- a/pyhon/hon.py
+++ b/pyhon/hon.py
@@ -2,7 +2,7 @@ import asyncio
2import logging 2import logging
3from pathlib import Path 3from pathlib import Path
4from types import TracebackType 4from types import TracebackType
5from typing import List, Optional, Dict, Any, Type 5from typing import List, Optional, Dict, Any, Type, Callable
6 6
7from aiohttp import ClientSession 7from aiohttp import ClientSession
8from typing_extensions import Self 8from typing_extensions import Self
@@ -10,6 +10,7 @@ from typing_extensions import Self
10from pyhon.appliance import HonAppliance 10from pyhon.appliance import HonAppliance
11from pyhon.connection.api import HonAPI 11from pyhon.connection.api import HonAPI
12from pyhon.connection.api import TestAPI 12from pyhon.connection.api import TestAPI
13from pyhon.connection.mqtt import MQTTClient
13from pyhon.exceptions import NoAuthenticationException 14from pyhon.exceptions import NoAuthenticationException
14 15
15_LOGGER = logging.getLogger(__name__) 16_LOGGER = logging.getLogger(__name__)
@@ -33,6 +34,8 @@ class Hon:
33 self._test_data_path: Path = test_data_path or Path().cwd() 34 self._test_data_path: Path = test_data_path or Path().cwd()
34 self._mobile_id: str = mobile_id 35 self._mobile_id: str = mobile_id
35 self._refresh_token: str = refresh_token 36 self._refresh_token: str = refresh_token
37 self._mqtt_client: MQTTClient | None = None
38 self._notify_function: Optional[Callable[[Any], None]] = None
36 39
37 async def __aenter__(self) -> Self: 40 async def __aenter__(self) -> Self:
38 return await self.create() 41 return await self.create()
@@ -120,7 +123,15 @@ class Hon:
120 api = TestAPI(test_data) 123 api = TestAPI(test_data)
121 for appliance in await api.load_appliances(): 124 for appliance in await api.load_appliances():
122 await self._create_appliance(appliance, api) 125 await self._create_appliance(appliance, api)
123 await self.api.subscribe_mqtt(self.appliances) 126 if not self._mqtt_client:
127 self._mqtt_client = await MQTTClient(self).create()
128
129 def subscribe_updates(self, notify_function: Callable[[Any], None]) -> None:
130 self._notify_function = notify_function
131
132 def notify(self) -> None:
133 if self._notify_function:
134 self._notify_function(None)
124 135
125 async def close(self) -> None: 136 async def close(self) -> None:
126 await self.api.close() 137 await self.api.close()
diff --git a/setup.py b/setup.py
index 3d5d5e7..022bf16 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ with open("README.md", "r", encoding="utf-8") as f:
7 7
8setup( 8setup(
9 name="pyhOn", 9 name="pyhOn",
10 version="0.17.1", 10 version="0.17.2",
11 author="Andre Basche", 11 author="Andre Basche",
12 description="Control hOn devices with python", 12 description="Control hOn devices with python",
13 long_description=long_description, 13 long_description=long_description,