Compare commits

...

80 Commits

Author SHA1 Message Date
Andre Basche
327d4a1814
Update README.md 2024-01-15 10:36:18 +01:00
Andre Basche
4dc60c29ee Fix crash in loading attributs Andre0512/hon#134 2024-01-11 01:21:38 +01:00
Andre Basche
7bc9e718a0
Merge pull request #17 from MiguelAngelLV/main
Use class attribute to check active
2023-11-26 13:43:44 +01:00
Mguel Ángel
d4e5793186 Fix check active oven 2023-11-26 13:36:41 +01:00
Andre Basche
e9f2bb9f4f Send program name Andre0512/hon#124 2023-11-21 02:23:53 +01:00
Andre Basche
ea81e2891f Bump version 2023-11-20 17:51:32 +01:00
Andre Basche
c71f8f17f5 Fix error in wh 2023-11-20 17:50:28 +01:00
Andre Basche
27d974abba Fix dependencies 2023-11-20 16:38:39 +01:00
Andre Basche
ab18e45f97 Add python3.12 support 2023-11-19 23:53:21 +01:00
Andre Basche
e44b9b6773 Update mypy 2023-10-12 22:26:12 +02:00
Andre Basche
5c650d722b Bump version 2023-10-12 22:16:02 +02:00
Vadym
6ae40481e3
Issue with sync_command (#16)
* Added water heater appliance. Added ability to send only mandatory parameters

* fixed build

* formatting

* cleanup

* cleanup

* reformatting

* Added ability to send specific parameters. Useful in case the command has many not mandatory parameters and you want to send only one/few

* cleanup

* Fixed code style

* sync_command - fixed typos, skip to sync(actually reset) parameters of different types. Improved WaterHeater appliance

* cleanup

* cleanup

* clean code style

* check if base parameter is mandatory

* Reverted back sync_command, send mandatory parameters beside with specified

---------

Co-authored-by: Vadym Melnychuk <vme@primexm.com>
2023-10-12 16:43:41 +02:00
Andre Basche
ff8ae160bb Fix empty prStr 2023-10-06 01:29:38 +02:00
Andre Basche
658e80a8f4 Change dependencies to variable major version 2023-10-02 03:21:51 +02:00
Andre Basche
ffba85bf0d Bump version 2023-10-02 01:38:54 +02:00
Andre Basche
10c8d961c4 Support new style rules hon#112 2023-10-02 01:38:40 +02:00
Andre Basche
61dd470588 Set versions of dependant packages to 'compatible releases' 2023-07-27 19:16:23 +02:00
Andre Basche
1ed81c2a77 Simplify get favorites 2023-07-24 02:33:45 +02:00
Andre Basche
e4dc3cb1d0 Next try to add py.typed in package 2023-07-24 01:47:45 +02:00
Andre Basche
2523069ce9 Fix false name caused by chatgpt's wrong advice 2023-07-23 23:18:09 +02:00
Andre Basche
eeb458cb1b Add py.typed into package 2023-07-23 22:54:46 +02:00
Andre Basche
2764700bc7 Bump version 2023-07-23 21:56:16 +02:00
Andre Basche
e6c796e822 Improve type hints 2023-07-23 21:55:42 +02:00
Andre Basche
454f2d8916 Use equal mypy cnofig as home assistant 2023-07-22 12:39:50 +02:00
Andre Basche
59ca8b6caf Not loading favorite if base program renamed 2023-07-22 11:53:39 +02:00
Andre Basche
44c55c681d Update requirements 2023-07-20 23:55:40 +02:00
Andre Basche
cfee10df5f Improve logging for test api 2023-07-20 23:52:46 +02:00
Andre Basche
e0774677eb Add and apply some mypy rules 2023-07-20 23:52:07 +02:00
Andre Basche
fc60d15e60 Fix error for fridge without quickmode 2023-07-19 23:55:37 +02:00
Andre Basche
8ef8c0405d Fix empty value in settings 2023-07-19 19:52:21 +02:00
Andre Basche
5575b240e1 Bump version 2023-07-18 21:32:12 +02:00
Andre Basche
22367242a2 Add flake8 config 2023-07-18 21:31:26 +02:00
Vadym
4f7d4860db
Water Heater. Ability to send only mandatory parameters (#14)
* Added water heater appliance. Added ability to send only mandatory parameters

* fixed build

* formatting

* cleanup

* cleanup

* reformatting

* Added ability to send specific parameters. Useful in case the command has many not mandatory parameters and you want to send only one/few

* cleanup

* Fixed code style

---------

Co-authored-by: Vadym Melnychuk <vme@primexm.com>
2023-07-18 21:26:11 +02:00
Andre Basche
5a778373b6 Enable more pylint checks 2023-07-16 05:53:23 +02:00
Andre Basche
e1c8bc5835 Reduce complexity and line length for flake8 2023-07-16 04:42:29 +02:00
Andre Basche
442e7a07dd Fix pylint check 2023-07-14 00:40:48 +02:00
Andre Basche
b0e3b15ff0 Add pylint checks 2023-07-12 19:36:32 +02:00
Andre Basche
2788a3ac91 Reset commands to defaults 2023-07-12 00:05:27 +02:00
Andre Basche
bc7e8994c9 Bump version 2023-07-11 00:21:37 +02:00
Andre Basche
8ca40d7ad0 Add missing requirement typing-extensions 2023-07-10 23:59:55 +02:00
Andre Basche
9a6a07fd46 Sync enum values of commands 2023-07-10 23:58:24 +02:00
Andre Basche
f1818bbc5d Bump version 2023-07-09 23:59:27 +02:00
Andre Basche
3d5c8405ea Improve error handling 2023-07-09 23:58:55 +02:00
Andre Basche
e234ef3bbb Remove old code in ov hon#88 2023-07-09 01:36:03 +02:00
Andre Basche
e00e147ecd Bump version 2023-07-01 16:29:29 +02:00
Andre Basche
26bc35c8a6 Fix issue in locking attribute updates 2023-07-01 16:27:50 +02:00
Andre Basche
17d73cdeb8 Sync parameter to settings 2023-07-01 16:04:34 +02:00
Andre Basche
a10ab4423e Add stricter mypy rules 2023-07-01 14:59:09 +02:00
Andre Basche
0553e6c17d Improvements 2023-07-01 14:31:37 +02:00
Andre Basche
44f40c531e Fix some minor issues 2023-06-29 22:08:51 +02:00
Andre Basche
4e88bc7a9f Fix error in archive again 2023-06-29 18:49:33 +02:00
Andre Basche
b5d8a90d79 Fix messed up parameters in request 2023-06-28 20:25:52 +02:00
Andre Basche
52837f16e3 Improve appliance import 2023-06-28 19:45:37 +02:00
Andre Basche
2a6b040193 Add import parameter 2023-06-28 19:17:17 +02:00
Andre Basche
9eb99f283b Add more type hints 2023-06-28 19:02:11 +02:00
Andre Basche
ad0d065b03 Fix creating wrong zip archive 2023-06-26 02:19:07 +02:00
Andre Basche
924e2c240d Fix wrong archive link 2023-06-25 18:30:29 +02:00
Andre Basche
76bd189e7b Bump version 2023-06-25 17:49:03 +02:00
Andre Basche
ef67188b93 Create data archive and use it to test 2023-06-25 17:30:15 +02:00
Andre Basche
66cb7bcc24 Merge branch 'refactor2' 2023-06-22 00:03:07 +02:00
Andre Basche
c25e898b42 Bump version 2023-06-21 19:56:09 +02:00
Andre Basche
55966dd52f Fix typeerror hon#77 2023-06-21 18:02:07 +02:00
Andre Basche
8c65a37f29 Add command loader class 2023-06-15 02:16:03 +02:00
Andre Basche
1ca89995a2 Lock attributes 2023-06-13 00:39:18 +02:00
Andre Basche
f6139db0b5 Use class for attributes 2023-06-13 00:12:29 +02:00
Andre Basche
310d1bafd7 Improve rule parsing 2023-06-10 06:47:37 +02:00
Andre Basche
9e35dcf9cf Don't send optional program rules 2023-06-10 06:40:55 +02:00
Andre Basche
f9d0fa4ae8 Add wine cellar 2023-06-09 22:52:33 +02:00
Andre Basche
11988c73a6 Fix command parameter issue hon#63 2023-06-09 02:09:20 +02:00
Andre Basche
7b51caecca Improve update performance 2023-06-08 19:52:08 +02:00
Andre Basche
38d09e2ef5 Fix step is 0 hon#60 2023-06-08 14:20:08 +02:00
Andre Basche
3c7ad3f395 Fix changed hOn login 2023-06-07 02:27:02 +02:00
Andre Basche
31c03faca8 Get program name 2023-05-29 19:05:37 +02:00
Andre Basche
a081ef1f97 Add favourites to progams hon#47 2023-05-28 19:24:02 +02:00
Andre Basche
4888f2b1d0 Add oven climate support 2023-05-28 17:41:20 +02:00
Andre Basche
7c6ac15901 Add and improve fridge 2023-05-28 07:37:38 +02:00
Andre Basche
eea79e28b9 Fix for fridge program names 2023-05-22 01:07:55 +02:00
Andre Basche
ecbf438889 Bump version 2023-05-21 20:48:27 +02:00
Andre Basche
9cd12e3234 Fixes for fridge 2023-05-21 20:33:08 +02:00
Andre Basche
c2765fe953 Add rule handling 2023-05-21 02:25:43 +02:00
44 changed files with 1632 additions and 545 deletions

3
.flake8 Normal file
View File

@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
max-complexity = 7

View File

@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11"]
python-version: ["3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v3
@ -28,15 +28,13 @@ jobs:
python -m pip install -r requirements_dev.txt
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
flake8 . --count --statistics
- name: Type check with mypy
run: |
mypy pyhon/
# - name: Analysing the code with pylint
# run: |
# pylint --max-line-length 88 $(git ls-files '*.py')
- name: Analysing the code with pylint
run: |
pylint $(git ls-files '*.py')
- name: Check black style
run: |
black . --check

9
.pylintrc Normal file
View File

@ -0,0 +1,9 @@
[MESSAGES CONTROL]
disable=missing-docstring
[FORMAT]
max-args=6
max-attributes=8
max-line-length=88

1
MANIFEST.in Normal file
View File

@ -0,0 +1 @@
include pyhon/py.typed

View File

@ -1,3 +1,14 @@
# Announcement: I have to take the project down in the next few days
> Dear User,
>
> We are writing to inform you that we have discovered two Home Assistant integration plug-ins developed by you ( https://github.com/Andre0512/hon and https://github.com/Andre0512/pyhOn ) that are in violation of our terms of service. Specifically, the plug-ins are using our services in an unauthorized manner which is causing significant economic harm to our Company.
> We take the protection of our intellectual property very seriously and demand that you immediately cease and desist all illegal activities related to the development and distribution of these plug-ins. We also request that you remove the plug-ins from all stores and code hosting platforms where they are currently available.
> Please be advised that we will take all necessary legal action to protect our interests if you fail to comply with this notice. We reserve the right to pursue all available remedies, including but not limited to monetary damages, injunctive relief, and attorney's fees.
> We strongly urge you to take immediate action to rectify this situation and avoid any further legal action. If you have any questions or concerns, please do not hesitate to contact us.
>
> Haier Europe Security and Governance Department
**This python package is unofficial and is not related in any way to Haier. It was developed by reversed engineered requests and can stop working at anytime!**
# pyhOn
@ -107,3 +118,6 @@ This library is used for the custom [HomeAssistant Integration "Haier hOn"](http
## Contribution
Any kind of contribution is welcome!
| Please add your appliances data to our [hon-test-data collection](https://github.com/Andre0512/hon-test-data). <br/>This helps us to develop new features and not to break compatibility in newer versions. |
|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

24
mypy.ini Normal file
View File

@ -0,0 +1,24 @@
[mypy]
check_untyped_defs = true
disallow_any_generics = true
disallow_any_unimported = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
disable_error_code = annotation-unchecked
enable_error_code = ignore-without-code, redundant-self, truthy-iterable
follow_imports = silent
local_partial_types = true
no_implicit_optional = true
no_implicit_reexport = true
show_error_codes = true
strict_concatenate = false
strict_equality = true
warn_incomplete_stub = true
warn_redundant_casts = true
warn_return_any = true
warn_unreachable = true
warn_unused_configs = true
warn_unused_ignores = true

View File

@ -6,16 +6,18 @@ import logging
import sys
from getpass import getpass
from pathlib import Path
from typing import Tuple, Dict, Any
if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent))
from pyhon import Hon, HonAPI, helper
# pylint: disable=wrong-import-position
from pyhon import Hon, HonAPI, diagnose, printer
_LOGGER = logging.getLogger(__name__)
def get_arguments():
def get_arguments() -> Dict[str, Any]:
"""Get parsed arguments."""
parser = argparse.ArgumentParser(description="pyhOn: Command Line Utility")
parser.add_argument("-u", "--user", help="user for haier hOn account")
@ -24,17 +26,25 @@ def get_arguments():
keys = subparser.add_parser("keys", help="print as key format")
keys.add_argument("keys", help="print as key format", action="store_true")
keys.add_argument("--all", help="print also full keys", action="store_true")
translate = subparser.add_parser(
export = subparser.add_parser("export")
export.add_argument("export", help="export pyhon data", action="store_true")
export.add_argument("--zip", help="create zip archive", action="store_true")
export.add_argument("--anonymous", help="anonymize data", action="store_true")
export.add_argument("directory", nargs="?", default=Path().cwd())
translation = subparser.add_parser(
"translate", help="print available translation keys"
)
translate.add_argument(
translation.add_argument(
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
)
translate.add_argument("--json", help="print as json", action="store_true")
translation.add_argument("--json", help="print as json", action="store_true")
parser.add_argument(
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
)
return vars(parser.parse_args())
async def translate(language, json_output=False):
async def translate(language: str, json_output: bool = False) -> None:
async with HonAPI(anonymous=True) as hon:
keys = await hon.translation_keys(language)
if json_output:
@ -47,41 +57,55 @@ async def translate(language, json_output=False):
.replace("\\r", "")
)
keys = json.loads(clean_keys)
print(helper.pretty_print(keys))
print(printer.pretty_print(keys))
async def main():
args = get_arguments()
if language := args.get("translate"):
await translate(language, json_output=args.get("json"))
return
def get_login_data(args: Dict[str, str]) -> Tuple[str, str]:
if not (user := args["user"]):
user = input("User for hOn account: ")
if not (password := args["password"]):
password = getpass("Password for hOn account: ")
async with Hon(user, password) as hon:
return user, password
async def main() -> None:
args = get_arguments()
if language := args.get("translate"):
await translate(language, json_output=args.get("json", ""))
return
async with Hon(
*get_login_data(args), test_data_path=Path(args.get("import", ""))
) as hon:
for device in hon.appliances:
if args.get("export"):
anonymous = args.get("anonymous", False)
path = Path(args.get("directory", "."))
if not args.get("zip"):
for file in await diagnose.appliance_data(device, path, anonymous):
print(f"Created {file}")
else:
archive = await diagnose.zip_archive(device, path, anonymous)
print(f"Created {archive}")
continue
print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10)
if args.get("keys"):
data = device.data.copy()
attr = "get" if args.get("all") else "pop"
print(
helper.key_print(
data["attributes"].__getattribute__(attr)("parameters")
)
printer.key_print(getattr(data["attributes"], attr)("parameters"))
)
print(helper.key_print(data.__getattribute__(attr)("appliance")))
print(helper.key_print(data))
print(printer.key_print(getattr(data, attr)("appliance")))
print(printer.key_print(data))
print(
helper.pretty_print(
helper.create_command(device.commands, concat=True)
printer.pretty_print(
printer.create_commands(device.commands, concat=True)
)
)
else:
print(device.diagnose(" "))
print(diagnose.yaml_export(device))
def start():
def start() -> None:
try:
asyncio.run(main())
except KeyboardInterrupt:

View File

@ -1,23 +1,29 @@
import importlib
import json
import logging
from contextlib import suppress
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Any
from typing import TYPE_CHECKING
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
from pyhon import helper
from pyhon import diagnose, exceptions
from pyhon.appliances.base import ApplianceBase
from pyhon.attributes import HonAttribute
from pyhon.command_loader import HonCommandLoader
from pyhon.commands import HonCommand
from pyhon.parameter.base import HonParameter
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
from pyhon.typedefs import Parameter
if TYPE_CHECKING:
from pyhon import HonAPI
_LOGGER = logging.getLogger(__name__)
T = TypeVar("T")
# pylint: disable=too-many-public-methods,too-many-instance-attributes
class HonAppliance:
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
@ -26,107 +32,136 @@ class HonAppliance:
) -> None:
if attributes := info.get("attributes"):
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._info: Dict = info
self._info: Dict[str, Any] = info
self._api: Optional[HonAPI] = api
self._appliance_model: Dict = {}
self._appliance_model: Dict[str, Any] = {}
self._commands: Dict = {}
self._statistics: Dict = {}
self._attributes: Dict = {}
self._commands: Dict[str, HonCommand] = {}
self._statistics: Dict[str, Any] = {}
self._attributes: Dict[str, Any] = {}
self._zone: int = zone
self._additional_data: Dict[str, Any] = {}
self._last_update = None
self._last_update: Optional[datetime] = None
self._default_setting = HonParameter("", {}, "")
try:
self._extra = importlib.import_module(
self._extra: Optional[ApplianceBase] = importlib.import_module(
f"pyhon.appliances.{self.appliance_type.lower()}"
).Appliance(self)
except ModuleNotFoundError:
self._extra = None
def __getitem__(self, item):
def _get_nested_item(self, item: str) -> Any:
result: List[Any] | Dict[str, Any] = self.data
for key in item.split("."):
if all(k in "0123456789" for k in key) and isinstance(result, list):
result = result[int(key)]
elif isinstance(result, dict):
result = result[key]
return result
def __getitem__(self, item: str) -> Any:
if self._zone:
item += f"Z{self._zone}"
if "." in item:
result = self.data
for key in item.split("."):
if all(k in "0123456789" for k in key) and isinstance(result, list):
result = result[int(key)]
else:
result = result[key]
return result
return self._get_nested_item(item)
if item in self.data:
return self.data[item]
if item in self.attributes["parameters"]:
return self.attributes["parameters"].get(item)
return self.attributes["parameters"][item].value
return self.info[item]
def get(self, item, default=None):
@overload
def get(self, item: str, default: None = None) -> Any:
...
@overload
def get(self, item: str, default: T) -> T:
...
def get(self, item: str, default: Optional[T] = None) -> Any:
try:
return self[item]
except (KeyError, IndexError):
return default
def _check_name_zone(self, name: str, frontend: bool = True) -> str:
middle = " Z" if frontend else "_z"
if (attribute := self._info.get(name, "")) and self._zone:
return f"{attribute}{middle}{self._zone}"
zone = " Z" if frontend else "_z"
attribute: str = self._info.get(name, "")
if attribute and self._zone:
return f"{attribute}{zone}{self._zone}"
return attribute
@property
def appliance_model_id(self) -> str:
return self._info.get("applianceModelId", "")
return str(self._info.get("applianceModelId", ""))
@property
def appliance_type(self) -> str:
return self._info.get("applianceTypeName", "")
return str(self._info.get("applianceTypeName", ""))
@property
def mac_address(self) -> str:
return self.info.get("macAddress", "")
return str(self.info.get("macAddress", ""))
@property
def unique_id(self) -> str:
return self._check_name_zone("macAddress", frontend=False)
default_mac = "xx-xx-xx-xx-xx-xx"
import_name = f"{self.appliance_type.lower()}_{self.appliance_model_id}"
result = self._check_name_zone("macAddress", frontend=False)
result = result.replace(default_mac, import_name)
return result
@property
def model_name(self) -> str:
return self._check_name_zone("modelName")
@property
def brand(self) -> str:
brand = self._check_name_zone("brand")
return brand[0].upper() + brand[1:]
@property
def nick_name(self) -> str:
return self._check_name_zone("nickName")
result = self._check_name_zone("nickName")
if not result or re.findall("^[xX1\\s-]+$", result):
return self.model_name
return result
@property
def code(self) -> str:
if code := self.info.get("code"):
code: str = self.info.get("code", "")
if code:
return code
serial_number = self.info.get("serialNumber", "")
serial_number: str = self.info.get("serialNumber", "")
return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
@property
def commands_options(self):
return self._appliance_model.get("options")
def model_id(self) -> int:
return int(self._info.get("applianceModelId", 0))
@property
def commands(self):
def options(self) -> Dict[str, Any]:
return dict(self._appliance_model.get("options", {}))
@property
def commands(self) -> Dict[str, HonCommand]:
return self._commands
@property
def attributes(self):
def attributes(self) -> Dict[str, Any]:
return self._attributes
@property
def statistics(self):
def statistics(self) -> Dict[str, Any]:
return self._statistics
@property
def info(self):
def info(self) -> Dict[str, Any]:
return self._info
@property
def additional_data(self):
def additional_data(self) -> Dict[str, Any]:
return self._additional_data
@property
@ -134,111 +169,52 @@ class HonAppliance:
return self._zone
@property
def api(self) -> Optional["HonAPI"]:
def api(self) -> "HonAPI":
"""api connection object"""
if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api
async def _recover_last_command_states(self):
command_history = await self.api.command_history(self)
for name, command in self._commands.items():
last = next(
(
index
for (index, d) in enumerate(command_history)
if d.get("command", {}).get("commandName") == name
),
None,
)
if last is None:
continue
parameters = command_history[last].get("command", {}).get("parameters", {})
if command.categories and (
parameters.get("program") or parameters.get("category")
):
if parameters.get("program"):
command.category = parameters.pop("program").split(".")[-1].lower()
else:
command.category = parameters.pop("category")
command = self.commands[name]
for key, data in command.settings.items():
if (
not isinstance(data, HonParameterFixed)
and parameters.get(key) is not None
):
with suppress(ValueError):
data.value = parameters.get(key)
async def load_commands(self) -> None:
command_loader = HonCommandLoader(self.api, self)
await command_loader.load_commands()
self._commands = command_loader.commands
self._additional_data = command_loader.additional_data
self._appliance_model = command_loader.appliance_data
self.sync_params_to_command("settings")
def _get_categories(self, command, data):
categories = {}
for category, value in data.items():
result = self._get_command(value, command, category, categories)
if result:
if "PROGRAM" in category:
category = category.split(".")[-1].lower()
categories[category] = result[0]
if categories:
if "setParameters" in categories:
return [categories["setParameters"]]
return [list(categories.values())[0]]
return []
def _get_commands(self, data):
commands = []
for command, value in data.items():
commands += self._get_command(value, command, "")
return {c.name: c for c in commands}
def _get_command(self, data, command="", category="", categories=None):
commands = []
if isinstance(data, dict):
if data.get("description") and data.get("protocolType", None):
commands += [
HonCommand(
command,
data,
self,
category_name=category,
categories=categories,
)
]
async def load_attributes(self) -> None:
attributes = await self.api.load_attributes(self)
for name, values in attributes.pop("shadow", {}).get("parameters", {}).items():
if name in self._attributes.get("parameters", {}):
self._attributes["parameters"][name].update(values)
else:
commands += self._get_categories(command, data)
elif category:
self._additional_data.setdefault(command, {})[category] = data
else:
self._additional_data[command] = data
return commands
self._attributes.setdefault("parameters", {})[name] = HonAttribute(
values
)
self._attributes |= attributes
if self._extra:
self._attributes = self._extra.attributes(self._attributes)
async def load_commands(self):
raw = await self.api.load_commands(self)
self._appliance_model = raw.pop("applianceModel")
raw.pop("dictionaryId", None)
self._commands = self._get_commands(raw)
await self._recover_last_command_states()
async def load_attributes(self):
self._attributes = await self.api.load_attributes(self)
for name, values in self._attributes.pop("shadow").get("parameters").items():
self._attributes.setdefault("parameters", {})[name] = values["parNewVal"]
async def load_statistics(self):
async def load_statistics(self) -> None:
self._statistics = await self.api.load_statistics(self)
self._statistics |= await self.api.load_maintenance(self)
async def update(self):
async def update(self, force: bool = False) -> None:
now = datetime.now()
if not self._last_update or self._last_update < now - timedelta(
seconds=self._MINIMAL_UPDATE_INTERVAL
):
min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
if force or not self._last_update or self._last_update < min_age:
self._last_update = now
await self.load_attributes()
self.sync_params_to_command("settings")
@property
def command_parameters(self):
def command_parameters(self) -> Dict[str, Dict[str, str | float]]:
return {n: c.parameter_value for n, c in self._commands.items()}
@property
def settings(self):
result = {}
def settings(self) -> Dict[str, Parameter]:
result: Dict[str, Parameter] = {}
for name, command in self._commands.items():
for key in command.setting_keys:
setting = command.settings.get(key, self._default_setting)
@ -248,7 +224,7 @@ class HonAppliance:
return result
@property
def available_settings(self):
def available_settings(self) -> List[str]:
result = []
for name, command in self._commands.items():
for key in command.setting_keys:
@ -256,69 +232,85 @@ class HonAppliance:
return result
@property
def data(self):
def data(self) -> Dict[str, Any]:
result = {
"attributes": self.attributes,
"appliance": self.info,
"statistics": self.statistics,
"additional_data": self._additional_data,
**self.command_parameters,
**self.attributes,
}
if self._extra:
return self._extra.data(result)
return result
def diagnose(self, whitespace=" ", command_only=False):
data = {
"attributes": self.attributes.copy(),
"appliance": self.info,
"statistics": self.statistics,
"additional_data": self._additional_data,
}
if command_only:
data.pop("attributes")
data.pop("appliance")
data.pop("statistics")
data |= {n: c.parameter_groups for n, c in self._commands.items()}
extra = {n: c.data for n, c in self._commands.items() if c.data}
if extra:
data |= {"extra_command_data": extra}
for sensible in ["PK", "SK", "serialNumber", "coords", "device"]:
data.get("appliance", {}).pop(sensible, None)
result = helper.pretty_print({"data": data}, whitespace=whitespace)
result += helper.pretty_print(
{"commands": helper.create_command(self.commands)},
whitespace=whitespace,
)
return result.replace(self.mac_address, "xx-xx-xx-xx-xx-xx")
class HonApplianceTest(HonAppliance):
def __init__(self, name):
super().__init__(None, {})
self._name = name
self.load_commands()
self._info = self._appliance_model
def load_commands(self):
device = Path(__file__).parent / "test_data" / f"{self._name}.json"
with open(str(device)) as f:
raw = json.loads(f.read())
self._appliance_model = raw.pop("applianceModel")
raw.pop("dictionaryId", None)
self._commands = self._get_commands(raw)
async def update(self):
return
@property
def nick_name(self) -> str:
return self._name
def diagnose(self) -> str:
return diagnose.yaml_export(self, anonymous=True)
@property
def unique_id(self) -> str:
return self._name
async def data_archive(self, path: Path) -> str:
return await diagnose.zip_archive(self, path, anonymous=True)
@property
def mac_address(self) -> str:
return "xx-xx-xx-xx-xx-xx"
def sync_command_to_params(self, command_name: str) -> None:
if not (command := self.commands.get(command_name)):
return
for key in self.attributes.get("parameters", {}):
if new := command.parameters.get(key):
self.attributes["parameters"][key].update(
str(new.intern_value), shield=True
)
def sync_params_to_command(self, command_name: str) -> None:
if not (command := self.commands.get(command_name)):
return
for key in command.setting_keys:
if (
new := self.attributes.get("parameters", {}).get(key)
) is None or new.value == "":
continue
setting = command.settings[key]
try:
if not isinstance(setting, HonParameterRange):
command.settings[key].value = str(new.value)
else:
command.settings[key].value = float(new.value)
except ValueError as error:
_LOGGER.info("Can't set %s - %s", key, error)
continue
def sync_command(
self,
main: str,
target: Optional[List[str] | str] = None,
to_sync: Optional[List[str] | bool] = None,
) -> None:
base: Optional[HonCommand] = self.commands.get(main)
if not base:
return
for command, data in self.commands.items():
if command == main or target and command not in target:
continue
for name, target_param in data.parameters.items():
if not (base_param := base.parameters.get(name)):
continue
if to_sync and (
(isinstance(to_sync, list) and name not in to_sync)
or not base_param.mandatory
):
continue
self.sync_parameter(base_param, target_param)
def sync_parameter(self, main: Parameter, target: Parameter) -> None:
if isinstance(main, HonParameterRange) and isinstance(
target, HonParameterRange
):
target.max = main.max
target.min = main.min
target.step = main.step
elif isinstance(target, HonParameterRange):
target.max = int(main.value)
target.min = int(main.value)
target.step = 1
elif isinstance(target, HonParameterEnum):
target.values = main.values
target.value = main.value

25
pyhon/appliances/base.py Normal file
View File

@ -0,0 +1,25 @@
from typing import Dict, Any, TYPE_CHECKING
from pyhon.parameter.program import HonParameterProgram
if TYPE_CHECKING:
from pyhon.appliance import HonAppliance
class ApplianceBase:
def __init__(self, appliance: "HonAppliance"):
self.parent = appliance
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
program_name = "No Program"
if program := int(str(data.get("parameters", {}).get("prCode", "0"))):
if start_cmd := self.parent.settings.get("startProgram.program"):
if isinstance(start_cmd, HonParameterProgram) and (
ids := start_cmd.ids
):
program_name = ids.get(program, program_name)
data["programName"] = program_name
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings

View File

@ -1,12 +1,13 @@
class Appliance:
def __init__(self, appliance):
self.parent = appliance
# pylint: disable=duplicate-code
from typing import Any, Dict
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
return data
def settings(self, settings):
return settings

View File

@ -1,17 +1,16 @@
class Appliance:
def __init__(self, appliance):
self.parent = appliance
from typing import Any, Dict
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["temp"] = "0"
data["attributes"]["parameters"]["onOffStatus"] = "0"
data["attributes"]["parameters"]["remoteCtrValid"] = "0"
data["attributes"]["parameters"]["remainingTimeMM"] = "0"
from pyhon.appliances.base import ApplianceBase
data["active"] = data["attributes"]["parameters"]["onOffStatus"] == "1"
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["temp"].value = 0
data["parameters"]["onOffStatus"].value = 0
data["parameters"]["remoteCtrValid"].value = 0
data["parameters"]["remainingTimeMM"].value = 0
data["active"] = data["parameters"]["onOffStatus"].value == 1
return data
def settings(self, settings):
return settings

25
pyhon/appliances/ref.py Normal file
View File

@ -0,0 +1,25 @@
from typing import Dict, Any
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data["parameters"]["holidayMode"] == "1":
data["modeZ1"] = "holiday"
elif data["parameters"]["intelligenceMode"] == "1":
data["modeZ1"] = "auto_set"
elif data["parameters"].get("quickModeZ1") == "1":
data["modeZ1"] = "super_cool"
else:
data["modeZ1"] = "no_mode"
if data["parameters"].get("quickModeZ2") == "1":
data["modeZ2"] = "super_freeze"
elif data["parameters"]["intelligenceMode"] == "1":
data["modeZ2"] = "auto_set"
else:
data["modeZ2"] = "no_mode"
return data

View File

@ -1,21 +1,20 @@
# pylint: disable=duplicate-code
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.fixed import HonParameterFixed
class Appliance:
def __init__(self, appliance):
self.parent = appliance
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
if program := int(data["attributes"]["parameters"]["prCode"]):
ids = self.parent.settings["startProgram.program"].ids
data["programName"] = ids.get(program, "")
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
return data
def settings(self, settings):
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
dry_level = settings.get("startProgram.dryLevel")
if isinstance(dry_level, HonParameterFixed) and dry_level.value == "11":
settings.pop("startProgram.dryLevel", None)

5
pyhon/appliances/wc.py Normal file
View File

@ -0,0 +1,5 @@
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
pass

View File

@ -1,13 +1,17 @@
class Appliance:
def __init__(self, appliance):
self.parent = appliance
# pylint: disable=duplicate-code
from typing import Dict, Any
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
return data
def settings(self, settings):
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings

16
pyhon/appliances/wh.py Normal file
View File

@ -0,0 +1,16 @@
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.base import HonParameter
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
parameter = data.get("parameters", {}).get("onOffStatus")
is_class = isinstance(parameter, HonParameter)
data["active"] = parameter.value == 1 if is_class else parameter == 1
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings

View File

@ -1,13 +1,17 @@
class Appliance:
def __init__(self, appliance):
self.parent = appliance
# pylint: disable=duplicate-code
from typing import Any, Dict
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
return data
def settings(self, settings):
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings

58
pyhon/attributes.py Normal file
View File

@ -0,0 +1,58 @@
from datetime import datetime, timedelta
from typing import Optional, Final, Dict
from pyhon.helper import str_to_float
class HonAttribute:
_LOCK_TIMEOUT: Final = 10
def __init__(self, data: Dict[str, str] | str):
self._value: str = ""
self._last_update: Optional[datetime] = None
self._lock_timestamp: Optional[datetime] = None
self.update(data)
@property
def value(self) -> float | str:
"""Attribute value"""
try:
return str_to_float(self._value)
except ValueError:
return self._value
@value.setter
def value(self, value: str) -> None:
self._value = value
@property
def last_update(self) -> Optional[datetime]:
"""Timestamp of last api update"""
return self._last_update
@property
def lock(self) -> bool:
"""Shows if value changes are forbidden"""
if not self._lock_timestamp:
return False
lock_until = self._lock_timestamp + timedelta(seconds=self._LOCK_TIMEOUT)
return lock_until >= datetime.utcnow()
def update(self, data: Dict[str, str] | str, shield: bool = False) -> bool:
if self.lock and not shield:
return False
if shield:
self._lock_timestamp = datetime.utcnow()
if isinstance(data, str):
self.value = data
return True
self.value = data.get("parNewVal", "")
if last_update := data.get("lastUpdate"):
try:
self._last_update = datetime.fromisoformat(last_update)
except ValueError:
self._last_update = None
return True
def __str__(self) -> str:
return self._value

227
pyhon/command_loader.py Normal file
View File

@ -0,0 +1,227 @@
import asyncio
from contextlib import suppress
from copy import copy
from typing import Dict, Any, Optional, TYPE_CHECKING, List
from pyhon.commands import HonCommand
from pyhon.exceptions import NoAuthenticationException
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
if TYPE_CHECKING:
from pyhon import HonAPI
from pyhon.appliance import HonAppliance
class HonCommandLoader:
"""Loads and parses hOn command data"""
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
self._api: "HonAPI" = api
self._appliance: "HonAppliance" = appliance
self._api_commands: Dict[str, Any] = {}
self._favourites: List[Dict[str, Any]] = []
self._command_history: List[Dict[str, Any]] = []
self._commands: Dict[str, HonCommand] = {}
self._appliance_data: Dict[str, Any] = {}
self._additional_data: Dict[str, Any] = {}
@property
def api(self) -> "HonAPI":
"""api connection object"""
if self._api is None:
raise NoAuthenticationException("Missing hOn login")
return self._api
@property
def appliance(self) -> "HonAppliance":
"""appliance object"""
return self._appliance
@property
def commands(self) -> Dict[str, HonCommand]:
"""Get list of hon commands"""
return self._commands
@property
def appliance_data(self) -> Dict[str, Any]:
"""Get command appliance data"""
return self._appliance_data
@property
def additional_data(self) -> Dict[str, Any]:
"""Get command additional data"""
return self._additional_data
async def load_commands(self) -> None:
"""Trigger loading of command data"""
await self._load_data()
self._appliance_data = self._api_commands.pop("applianceModel", {})
self._get_commands()
self._add_favourites()
self._recover_last_command_states()
async def _load_commands(self) -> None:
self._api_commands = await self._api.load_commands(self._appliance)
async def _load_favourites(self) -> None:
self._favourites = await self._api.load_favourites(self._appliance)
async def _load_command_history(self) -> None:
self._command_history = await self._api.load_command_history(self._appliance)
async def _load_data(self) -> None:
"""Callback parallel all relevant data"""
await asyncio.gather(
*[
self._load_commands(),
self._load_favourites(),
self._load_command_history(),
]
)
@staticmethod
def _is_command(data: Dict[str, Any]) -> bool:
"""Check if dict can be parsed as command"""
return (
data.get("description") is not None and data.get("protocolType") is not None
)
@staticmethod
def _clean_name(category: str) -> str:
"""Clean up category name"""
if "PROGRAM" in category:
return category.split(".")[-1].lower()
return category
def _get_commands(self) -> None:
"""Generates HonCommand dict from api data"""
commands = []
for name, data in self._api_commands.items():
if command := self._parse_command(data, name):
commands.append(command)
self._commands = {c.name: c for c in commands}
def _parse_command(
self,
data: Dict[str, Any] | str,
command_name: str,
categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "",
) -> Optional[HonCommand]:
"""Try to crate HonCommand object"""
if not isinstance(data, dict):
self._additional_data[command_name] = data
return None
if self._is_command(data):
return HonCommand(
command_name,
data,
self._appliance,
category_name=category_name,
categories=categories,
)
if category := self._parse_categories(data, command_name):
return category
return None
def _parse_categories(
self, data: Dict[str, Any], command_name: str
) -> Optional[HonCommand]:
"""Parse categories and create reference to other"""
categories: Dict[str, HonCommand] = {}
for category, value in data.items():
if command := self._parse_command(
value, command_name, category_name=category, categories=categories
):
categories[self._clean_name(category)] = command
if categories:
# setParameters should be at first place
if "setParameters" in categories:
return categories["setParameters"]
return list(categories.values())[0]
return None
def _get_last_command_index(self, name: str) -> Optional[int]:
"""Get index of last command execution"""
return next(
(
index
for (index, d) in enumerate(self._command_history)
if d.get("command", {}).get("commandName") == name
),
None,
)
def _set_last_category(
self, command: HonCommand, name: str, parameters: Dict[str, Any]
) -> HonCommand:
"""Set category to last state"""
if command.categories:
if program := parameters.pop("program", None):
command.category = self._clean_name(program)
elif category := parameters.pop("category", None):
command.category = category
else:
return command
return self.commands[name]
return command
def _recover_last_command_states(self) -> None:
"""Set commands to last state"""
for name, command in self.commands.items():
if (last_index := self._get_last_command_index(name)) is None:
continue
last_command = self._command_history[last_index]
parameters = last_command.get("command", {}).get("parameters", {})
command = self._set_last_category(command, name, parameters)
for key, data in command.settings.items():
if parameters.get(key) is None:
continue
with suppress(ValueError):
data.value = parameters.get(key)
def _add_favourites(self) -> None:
"""Patch program categories with favourites"""
for favourite in self._favourites:
name, command_name, base = self._get_favourite_info(favourite)
if not base:
continue
base_command: HonCommand = copy(base)
self._update_base_command_with_data(base_command, favourite)
self._update_base_command_with_favourite(base_command)
self._update_program_categories(command_name, name, base_command)
def _get_favourite_info(
self, favourite: Dict[str, Any]
) -> tuple[str, str, HonCommand | None]:
name: str = favourite.get("favouriteName", {})
command = favourite.get("command", {})
command_name: str = command.get("commandName", "")
program_name = self._clean_name(command.get("programName", ""))
base_command = self.commands[command_name].categories.get(program_name)
return name, command_name, base_command
def _update_base_command_with_data(
self, base_command: HonCommand, command: Dict[str, Any]
) -> None:
for data in command.values():
if isinstance(data, str):
continue
for key, value in data.items():
if not (parameter := base_command.parameters.get(key)):
continue
with suppress(ValueError):
parameter.value = value
def _update_base_command_with_favourite(self, base_command: HonCommand) -> None:
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
base_command.parameters.update(favourite=extra_param)
def _update_program_categories(
self, command_name: str, name: str, base_command: HonCommand
) -> None:
program = base_command.parameters["program"]
if isinstance(program, HonParameterProgram):
program.set_value(name)
self.commands[command_name].categories[name] = base_command

View File

@ -1,16 +1,22 @@
import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING, Union
from pyhon import exceptions
from pyhon.exceptions import ApiError, NoAuthenticationException
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange
from pyhon.rules import HonRuleSet
from pyhon.typedefs import Parameter
if TYPE_CHECKING:
from pyhon import HonAPI
from pyhon.appliance import HonAppliance
_LOGGER = logging.getLogger(__name__)
class HonCommand:
def __init__(
@ -21,33 +27,39 @@ class HonCommand:
categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "",
):
self._api: Optional[HonAPI] = appliance.api
self._appliance: "HonAppliance" = appliance
self._name: str = name
self._api: Optional[HonAPI] = None
self._appliance: "HonAppliance" = appliance
self._categories: Optional[Dict[str, "HonCommand"]] = categories
self._category_name: str = category_name
self._description: str = attributes.pop("description", "")
self._protocol_type: str = attributes.pop("protocolType", "")
self._parameters: Dict[str, HonParameter] = {}
self._parameters: Dict[str, Parameter] = {}
self._data: Dict[str, Any] = {}
self._available_settings: Dict[str, HonParameter] = {}
self._rules: List[HonRuleSet] = []
attributes.pop("description", "")
attributes.pop("protocolType", "")
self._load_parameters(attributes)
def __repr__(self) -> str:
return f"{self._name} command"
@property
def name(self):
def name(self) -> str:
return self._name
@property
def api(self) -> "HonAPI":
if self._api is None and self._appliance:
self._api = self._appliance.api
if self._api is None:
raise exceptions.NoAuthenticationException
raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api
@property
def data(self):
def appliance(self) -> "HonAppliance":
return self._appliance
@property
def data(self) -> Dict[str, Any]:
return self._data
@property
@ -62,21 +74,43 @@ class HonCommand:
def parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
result: Dict[str, Dict[str, Union[str, float]]] = {}
for name, parameter in self._parameters.items():
result.setdefault(parameter.group, {})[name] = parameter.value
result.setdefault(parameter.group, {})[name] = parameter.intern_value
return result
@property
def mandatory_parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
result: Dict[str, Dict[str, Union[str, float]]] = {}
for name, parameter in self._parameters.items():
if parameter.mandatory:
result.setdefault(parameter.group, {})[name] = parameter.intern_value
return result
@property
def parameter_value(self) -> Dict[str, Union[str, float]]:
return {n: p.value for n, p in self._parameters.items()}
def _load_parameters(self, attributes):
def _load_parameters(self, attributes: Dict[str, Dict[str, Any] | Any]) -> None:
for key, items in attributes.items():
if not isinstance(items, dict):
_LOGGER.info("Loading Attributes - Skipping %s", str(items))
continue
for name, data in items.items():
self._create_parameters(data, name, key)
for rule in self._rules:
rule.patch()
def _create_parameters(self, data: Dict, name: str, parameter: str) -> None:
def _create_parameters(
self, data: Dict[str, Any], name: str, parameter: str
) -> None:
if name == "zoneMap" and self._appliance.zone:
data["default"] = self._appliance.zone
if data.get("category") == "rule":
if "fixedValue" in data:
self._rules.append(HonRuleSet(self, data["fixedValue"]))
elif "enumValues" in data:
self._rules.append(HonRuleSet(self, data["enumValues"]))
else:
_LOGGER.warning("Rule not supported: %s", data)
match data.get("typology"):
case "range":
self._parameters[name] = HonParameterRange(name, data, parameter)
@ -91,12 +125,41 @@ class HonCommand:
name = "program" if "PROGRAM" in self._category_name else "category"
self._parameters[name] = HonParameterProgram(name, self, "custom")
async def send(self) -> bool:
params = self.parameter_groups.get("parameters", {})
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
return await self.api.send_command(
self._appliance, self._name, params, ancillary_params
async def send(self, only_mandatory: bool = False) -> bool:
grouped_params = (
self.mandatory_parameter_groups if only_mandatory else self.parameter_groups
)
params = grouped_params.get("parameters", {})
return await self.send_parameters(params)
async def send_specific(self, param_names: List[str]) -> bool:
params: Dict[str, str | float] = {}
for key, parameter in self._parameters.items():
if key in param_names or parameter.mandatory:
params[key] = parameter.value
return await self.send_parameters(params)
async def send_parameters(self, params: Dict[str, str | float]) -> bool:
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
ancillary_params.pop("programRules", None)
if "prStr" in params:
params["prStr"] = self._category_name.upper()
self.appliance.sync_command_to_params(self.name)
try:
result = await self.api.send_command(
self._appliance,
self._name,
params,
ancillary_params,
self._category_name,
)
if not result:
_LOGGER.error(result)
raise ApiError("Can't send command")
except NoAuthenticationException:
_LOGGER.error("No Authentication")
return False
return result
@property
def categories(self) -> Dict[str, "HonCommand"]:
@ -120,7 +183,7 @@ class HonCommand:
)
@staticmethod
def _more_options(first: HonParameter, second: HonParameter):
def _more_options(first: Parameter, second: Parameter) -> Parameter:
if isinstance(first, HonParameterFixed) and not isinstance(
second, HonParameterFixed
):
@ -130,8 +193,8 @@ class HonCommand:
return first
@property
def available_settings(self) -> Dict[str, HonParameter]:
result: Dict[str, HonParameter] = {}
def available_settings(self) -> Dict[str, Parameter]:
result: Dict[str, Parameter] = {}
for command in self.categories.values():
for name, parameter in command.parameters.items():
if name in result:
@ -139,3 +202,7 @@ class HonCommand:
else:
result[name] = parameter
return result
def reset(self) -> None:
for parameter in self._parameters.values():
parameter.reset()

View File

@ -1,7 +1,10 @@
import json
import logging
from datetime import datetime
from typing import Dict, Optional
from pathlib import Path
from pprint import pformat
from types import TracebackType
from typing import Dict, Optional, Any, List, no_type_check, Type
from aiohttp import ClientSession
from typing_extensions import Self
@ -34,7 +37,12 @@ class HonAPI:
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()
@property
@ -44,13 +52,13 @@ class HonAPI:
return self._hon.auth
@property
def _hon(self):
def _hon(self) -> HonConnectionHandler:
if self._hon_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_handler
@property
def _hon_anonymous(self):
def _hon_anonymous(self) -> HonAnonymousConnectionHandler:
if self._hon_anonymous_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_anonymous_handler
@ -65,12 +73,18 @@ class HonAPI:
).create()
return self
async def load_appliances(self) -> Dict:
async def load_appliances(self) -> List[Dict[str, Any]]:
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
return await resp.json()
result = await resp.json()
if result:
appliances: List[Dict[str, Any]] = result.get("payload", {}).get(
"appliances", {}
)
return appliances
return []
async def load_commands(self, appliance: HonAppliance) -> Dict:
params: Dict = {
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str | int] = {
"applianceType": appliance.appliance_type,
"applianceModelId": appliance.appliance_model_id,
"macAddress": appliance.mac_address,
@ -86,92 +100,105 @@ class HonAPI:
params["series"] = series
url: str = f"{const.API_URL}/commands/v1/retrieve"
async with self._hon.get(url, params=params) as response:
result: Dict = (await response.json()).get("payload", {})
result: Dict[str, Any] = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0":
_LOGGER.error(await response.json())
return {}
return result
async def command_history(self, appliance: HonAppliance) -> Dict:
async def load_command_history(
self, appliance: HonAppliance
) -> List[Dict[str, Any]]:
url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
)
async with self._hon.get(url) as response:
result: Dict = await response.json()
if not result or not result.get("payload"):
return {}
return result["payload"]["history"]
result: Dict[str, Any] = await response.json()
if not result or not result.get("payload"):
return []
command_history: List[Dict[str, Any]] = result["payload"]["history"]
return command_history
async def command_favourites(self, appliance: HonAppliance) -> Dict:
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/favourite"
)
async with self._hon.get(url) as response:
result: Dict = await response.json()
if not result or not result.get("payload"):
return {}
return result["payload"]["favourites"]
result: Dict[str, Any] = await response.json()
if not result or not result.get("payload"):
return []
favourites: List[Dict[str, Any]] = result["payload"]["favourites"]
return favourites
async def last_activity(self, appliance: HonAppliance) -> Dict:
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params: Dict = {"macAddress": appliance.mac_address}
params: Dict[str, str] = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
result: Dict = await response.json()
if result and (activity := result.get("attributes")):
return activity
result: Dict[str, Any] = await response.json()
if result:
activity: Dict[str, Any] = result.get("attributes", "")
if activity:
return activity
return {}
async def appliance_model(self, appliance: HonAppliance) -> Dict:
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/appliance-model"
params: Dict = {
"code": appliance.info["code"],
params: Dict[str, str] = {
"code": appliance.code,
"macAddress": appliance.mac_address,
}
async with self._hon.get(url, params=params) as response:
result: Dict = await response.json()
if result and (activity := result.get("attributes")):
return activity
result: Dict[str, Any] = await response.json()
if result:
appliance_data: Dict[str, Any] = result.get("payload", {}).get(
"applianceModel", {}
)
return appliance_data
return {}
async def load_attributes(self, appliance: HonAppliance) -> Dict:
params: Dict = {
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str] = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
"category": "CYCLE",
}
url: str = f"{const.API_URL}/commands/v1/context"
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
attributes: Dict[str, Any] = (await response.json()).get("payload", {})
return attributes
async def load_statistics(self, appliance: HonAppliance) -> Dict:
params: Dict = {
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str] = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
}
url: str = f"{const.API_URL}/commands/v1/statistics"
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
statistics: Dict[str, Any] = (await response.json()).get("payload", {})
return statistics
async def load_maintenance(self, appliance: HonAppliance):
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
url = f"{const.API_URL}/commands/v1/maintenance-cycle"
params = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
maintenance: Dict[str, Any] = (await response.json()).get("payload", {})
return maintenance
async def send_command(
self,
appliance: HonAppliance,
command: str,
parameters: Dict,
ancillary_parameters: Dict,
parameters: Dict[str, Any],
ancillary_parameters: Dict[str, Any],
program_name: str = "",
) -> bool:
now: str = datetime.utcnow().isoformat()
data: Dict = {
data: Dict[str, Any] = {
"macAddress": appliance.mac_address,
"timestamp": f"{now[:-3]}Z",
"commandName": command,
"transactionId": f"{appliance.mac_address}_{now[:-3]}Z",
"applianceOptions": appliance.commands_options,
"applianceOptions": appliance.options,
"device": self._hon.device.get(mobile=True),
"attributes": {
"channel": "mobileApp",
@ -182,25 +209,29 @@ class HonAPI:
"parameters": parameters,
"applianceType": appliance.appliance_type,
}
if command == "startProgram" and program_name:
data.update({"programName": program_name.upper()})
url: str = f"{const.API_URL}/commands/v1/send"
async with self._hon.post(url, json=data) as response:
json_data: Dict = await response.json()
json_data: Dict[str, Any] = await response.json()
if json_data.get("payload", {}).get("resultCode") == "0":
return True
_LOGGER.error(await response.text())
_LOGGER.error("%s - Payload:\n%s", url, pformat(data))
return False
async def appliance_configuration(self) -> Dict:
async def appliance_configuration(self) -> Dict[str, Any]:
url: str = f"{const.API_URL}/config/v1/program-list-rules"
async with self._hon_anonymous.get(url) as response:
result: Dict = await response.json()
if result and (data := result.get("payload")):
return data
return {}
result: Dict[str, Any] = await response.json()
data: Dict[str, Any] = result.get("payload", {})
return data
async def app_config(self, language: str = "en", beta: bool = True) -> Dict:
async def app_config(
self, language: str = "en", beta: bool = True
) -> Dict[str, Any]:
url: str = f"{const.API_URL}/app-config"
payload_data: Dict = {
payload_data: Dict[str, str | int] = {
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
@ -208,20 +239,95 @@ class HonAPI:
}
payload: str = json.dumps(payload_data, separators=(",", ":"))
async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")):
return data
return {}
result = await response.json()
data: Dict[str, Any] = result.get("payload", {})
return data
async def translation_keys(self, language: str = "en") -> Dict:
async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response:
if result := await response.json():
return result
return {}
if not (url := config.get("language", {}).get("jsonPath")):
return {}
async with self._hon_anonymous.get(url) as response:
result: Dict[str, Any] = await response.json()
return result
async def close(self) -> None:
if self._hon_handler is not None:
await self._hon_handler.close()
if self._hon_anonymous_handler is not None:
await self._hon_anonymous_handler.close()
class TestAPI(HonAPI):
def __init__(self, path: Path):
super().__init__()
self._anonymous = True
self._path: Path = path
def _load_json(self, appliance: HonAppliance, file: str) -> Dict[str, Any]:
directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower()
if not (path := self._path / directory / f"{file}.json").exists():
_LOGGER.warning("Can't open %s", str(path))
return {}
with open(path, "r", encoding="utf-8") as json_file:
text = json_file.read()
try:
data: Dict[str, Any] = json.loads(text)
return data
except json.decoder.JSONDecodeError as error:
_LOGGER.error("%s - %s", str(path), error)
return {}
async def load_appliances(self) -> List[Dict[str, Any]]:
result = []
for appliance in self._path.glob("*/"):
file = appliance / "appliance_data.json"
with open(file, "r", encoding="utf-8") as json_file:
try:
result.append(json.loads(json_file.read()))
except json.decoder.JSONDecodeError as error:
_LOGGER.error("%s - %s", str(file), error)
return result
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "commands")
@no_type_check
async def load_command_history(
self, appliance: HonAppliance
) -> List[Dict[str, Any]]:
return self._load_json(appliance, "command_history")
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
return []
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
return {}
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "appliance_data")
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "attributes")
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "statistics")
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "maintenance")
async def send_command(
self,
appliance: HonAppliance,
command: str,
parameters: Dict[str, Any],
ancillary_parameters: Dict[str, Any],
program_name: str = "",
) -> bool:
_LOGGER.info(
"%s - %s - %s",
str(parameters),
str(ancillary_parameters),
str(program_name),
)
return True

View File

@ -6,14 +6,16 @@ import urllib
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional
from typing import Dict, Optional, Any, List
from urllib import parse
from urllib.parse import quote
import aiohttp
from aiohttp import ClientResponse
from yarl import URL
from pyhon import const, exceptions
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.auth import HonAuthConnectionHandler
_LOGGER = logging.getLogger(__name__)
@ -25,41 +27,52 @@ class HonLoginData:
email: str = ""
password: str = ""
fw_uid: str = ""
loaded: Optional[Dict] = None
loaded: Optional[Dict[str, Any]] = None
@dataclass
class HonAuthData:
access_token: str = ""
refresh_token: str = ""
cognito_token: str = ""
id_token: str = ""
class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7
def __init__(self, session, email, password, device) -> None:
def __init__(
self,
session: aiohttp.ClientSession,
email: str,
password: str,
device: HonDevice,
) -> None:
self._session = session
self._request = HonAuthConnectionHandler(session)
self._login_data = HonLoginData()
self._login_data.email = email
self._login_data.password = password
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
self._device = device
self._expires: datetime = datetime.utcnow()
self._auth = HonAuthData()
@property
def cognito_token(self) -> str:
return self._cognito_token
return self._auth.cognito_token
@property
def id_token(self) -> str:
return self._id_token
return self._auth.id_token
@property
def access_token(self) -> str:
return self._access_token
return self._auth.access_token
@property
def refresh_token(self) -> str:
return self._refresh_token
return self._auth.refresh_token
def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@ -107,7 +120,8 @@ class HonAuth:
async with self._request.get(url) as response:
text = await response.text()
self._expires = datetime.utcnow()
if not (login_url := re.findall("url = '(.+?)'", text)):
login_url: List[str] = re.findall("url = '(.+?)'", text)
if not login_url:
if "oauth/done#access_token=" in text:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
@ -120,7 +134,7 @@ class HonAuth:
await self._error_logger(response)
return new_location
async def _handle_redirects(self, login_url) -> str:
async def _handle_redirects(self, login_url: str) -> str:
redirect1 = await self._manual_redirect(login_url)
redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
@ -134,9 +148,7 @@ class HonAuth:
fw_uid, loaded_str = context[0]
self._login_data.fw_uid = fw_uid
self._login_data.loaded = json.loads(loaded_str)
self._login_data.url = login_url.replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
self._login_data.url = login_url.replace(const.AUTH_API, "")
return True
await self._error_logger(response)
return False
@ -149,8 +161,8 @@ class HonAuth:
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": quote(self._login_data.email),
"password": quote(self._login_data.password),
"username": self._login_data.email,
"password": self._login_data.password,
"startUrl": start_url,
},
}
@ -172,24 +184,25 @@ class HonAuth:
async with self._request.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
data="&".join(f"{k}={quote(json.dumps(v))}" for k, v in data.items()),
params=params,
) as response:
if response.status == 200:
with suppress(json.JSONDecodeError, KeyError):
result = await response.json()
return result["events"][0]["attributes"]["values"]["url"]
url: str = result["events"][0]["attributes"]["values"]["url"]
return url
await self._error_logger(response)
return ""
def _parse_token_data(self, text: str) -> bool:
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
self._auth.access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
self._auth.refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
return True if access_token and refresh_token and id_token else False
self._auth.id_token = id_token[0]
return bool(access_token and refresh_token and id_token)
async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response:
@ -210,7 +223,7 @@ class HonAuth:
url_search = re.findall(
"href\\s*=\\s*[\"'](.*?)[\"']", await response.text()
)
url = "/".join(const.AUTH_API.split("/")[:-1]) + url_search[0]
url = const.AUTH_API + url_search[0]
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
@ -221,7 +234,7 @@ class HonAuth:
return True
async def _api_auth(self) -> bool:
post_headers = {"id-token": self._id_token}
post_headers = {"id-token": self._auth.id_token}
data = self._device.get()
async with self._request.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
@ -231,8 +244,8 @@ class HonAuth:
except json.JSONDecodeError:
await self._error_logger(response)
return False
self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
if not self._cognito_token:
self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
if not self._auth.cognito_token:
_LOGGER.error(json_data)
raise exceptions.HonAuthenticationError()
return True
@ -254,7 +267,7 @@ class HonAuth:
async def refresh(self) -> bool:
params = {
"client_id": const.CLIENT_ID,
"refresh_token": self._refresh_token,
"refresh_token": self._auth.refresh_token,
"grant_type": "refresh_token",
}
async with self._request.post(
@ -265,14 +278,14 @@ class HonAuth:
return False
data = await response.json()
self._expires = datetime.utcnow()
self._id_token = data["id_token"]
self._access_token = data["access_token"]
self._auth.id_token = data["id_token"]
self._auth.access_token = data["access_token"]
return await self._api_auth()
def clear(self) -> None:
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._request.called_urls = []
self._cognito_token = ""
self._id_token = ""
self._access_token = ""
self._refresh_token = ""
self._auth.cognito_token = ""
self._auth.id_token = ""
self._auth.access_token = ""
self._auth.refresh_token = ""

View File

@ -21,7 +21,7 @@ class HonDevice:
return self._os_version
@property
def os(self) -> str:
def os_type(self) -> str:
return self._os
@property
@ -32,12 +32,14 @@ class HonDevice:
def mobile_id(self) -> str:
return self._mobile_id
def get(self, mobile: bool = False) -> Dict:
result = {
def get(self, mobile: bool = False) -> Dict[str, str | int]:
result: Dict[str, str | int] = {
"appVersion": self.app_version,
"mobileId": self.mobile_id,
"os": self.os,
"os": self.os_type,
"osVersion": self.os_version,
"deviceModel": self.device_model,
}
return (result | {"mobileOs": result.pop("os")}) if mobile else result
if mobile:
result |= {"mobileOs": result.pop("os", "")}
return result

View File

@ -1,21 +1,27 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Callable, Dict
from typing import Dict, Any
import aiohttp
from yarl import URL
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class HonAnonymousConnectionHandler(ConnectionHandler):
_HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
_HEADERS: Dict[str, str] = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager
async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator:
async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
) -> AsyncIterator[aiohttp.ClientResponse]:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
async with method(url, *args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response

View File

@ -1,12 +1,14 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, List, Tuple
from typing import Optional, List, Tuple, Any, Dict
import aiohttp
from yarl import URL
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
@ -28,9 +30,9 @@ class HonAuthConnectionHandler(ConnectionHandler):
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
) -> AsyncIterator[aiohttp.ClientResponse]:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
self._called_urls.append((response.status, response.request_info.url))
async with method(url, *args, **kwargs) as response:
self._called_urls.append((response.status, str(response.request_info.url)))
yield response

View File

@ -1,18 +1,21 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
from types import TracebackType
from typing import Optional, Dict, Type, Any
import aiohttp
from typing_extensions import Self
from yarl import URL
from pyhon import const, exceptions
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class ConnectionHandler:
_HEADERS: Dict = {
_HEADERS: Dict[str, str] = {
"user-agent": const.USER_AGENT,
"Content-Type": "application/json",
}
@ -24,32 +27,52 @@ class ConnectionHandler:
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()
@property
def session(self) -> aiohttp.ClientSession:
if self._session is None:
raise exceptions.NoSessionException
return self._session
async def create(self) -> Self:
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
raise NotImplementedError
@asynccontextmanager
async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.get, *args, **kwargs) as response:
async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
) -> AsyncIterator[aiohttp.ClientResponse]:
async with method(url, *args, **kwargs) as response:
yield response
@asynccontextmanager
async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
async def get(
self, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.post, *args, **kwargs) as response:
args = self._session.get, *args
async with self._intercept(*args, **kwargs) as response:
yield response
@asynccontextmanager
async def post(
self, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
args = self._session.post, *args
async with self._intercept(*args, **kwargs) as response:
yield response
async def close(self) -> None:

View File

@ -2,15 +2,17 @@ import json
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
from typing import Optional, Dict, Any
import aiohttp
from typing_extensions import Self
from yarl import URL
from pyhon.connection.auth import HonAuth
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
@ -41,10 +43,10 @@ class HonConnectionHandler(ConnectionHandler):
async def create(self) -> Self:
await super().create()
self._auth = HonAuth(self._session, self._email, self._password, self._device)
self._auth = HonAuth(self.session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers: Dict) -> Dict:
async def _check_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
if not (self.auth.cognito_token and self.auth.id_token):
await self.auth.authenticate()
headers["cognito-token"] = self.auth.cognito_token
@ -53,17 +55,18 @@ class HonConnectionHandler(ConnectionHandler):
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
self, method: Callback, url: str | URL, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
loop: int = kwargs.pop("loop", 0)
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
async with method(url, *args, **kwargs) as response:
if (
self.auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
_LOGGER.info("Try refreshing token...")
await self.auth.refresh()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
method, url, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif (
@ -77,7 +80,7 @@ class HonConnectionHandler(ConnectionHandler):
)
await self.create()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
method, url, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
@ -92,11 +95,11 @@ class HonConnectionHandler(ConnectionHandler):
try:
await response.json()
yield response
except json.JSONDecodeError:
except json.JSONDecodeError as exc:
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Decode Error")
raise HonAuthenticationError("Decode Error") from exc

View File

@ -1,10 +1,12 @@
AUTH_API = "https://he-accounts.force.com/SmartHome"
AUTH_API = "https://account2.hon-smarthome.com"
API_URL = "https://api-iot.he.services"
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
APP = "hon"
# All seen id's (different accounts, different devices) are the same, so I guess this hash is static
CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
APP_VERSION = "1.53.7"
CLIENT_ID = (
"3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9."
"HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
)
APP_VERSION = "2.4.7"
OS_VERSION = 31
OS = "android"
DEVICE_MODEL = "exynos9820"

99
pyhon/diagnose.py Normal file
View File

@ -0,0 +1,99 @@
import asyncio
import json
import re
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, List, Tuple
from pyhon import printer
if TYPE_CHECKING:
from pyhon.appliance import HonAppliance
def anonymize_data(data: str) -> str:
default_date = "1970-01-01T00:00:00.0Z"
default_mac = "xx-xx-xx-xx-xx-xx"
data = re.sub("[0-9A-Fa-f]{2}(-[0-9A-Fa-f]{2}){5}", default_mac, data)
data = re.sub("[\\d-]{10}T[\\d:]{8}(.\\d+)?Z", default_date, data)
for sensible in [
"serialNumber",
"code",
"nickName",
"mobileId",
"PK",
"SK",
"lat",
"lng",
]:
for match in re.findall(f'"{sensible}.*?":\\s"?(.+?)"?,?\\n', data):
replace = re.sub("[a-z]", "x", match)
replace = re.sub("[A-Z]", "X", replace)
replace = re.sub("\\d", "1", replace)
data = data.replace(match, replace)
return data
async def load_data(appliance: "HonAppliance", topic: str) -> Tuple[str, str]:
return topic, await getattr(appliance.api, f"load_{topic}")(appliance)
def write_to_json(data: str, topic: str, path: Path, anonymous: bool = False) -> Path:
json_data = json.dumps(data, indent=4)
if anonymous:
json_data = anonymize_data(json_data)
file = path / f"{topic}.json"
with open(file, "w", encoding="utf-8") as json_file:
json_file.write(json_data)
return file
async def appliance_data(
appliance: "HonAppliance", path: Path, anonymous: bool = False
) -> List[Path]:
requests = [
"commands",
"attributes",
"command_history",
"statistics",
"maintenance",
"appliance_data",
]
path /= f"{appliance.appliance_type}_{appliance.model_id}".lower()
path.mkdir(parents=True, exist_ok=True)
api_data = await asyncio.gather(*[load_data(appliance, name) for name in requests])
return [write_to_json(data, topic, path, anonymous) for topic, data in api_data]
async def zip_archive(
appliance: "HonAppliance", path: Path, anonymous: bool = False
) -> str:
data = await appliance_data(appliance, path, anonymous)
archive = data[0].parent
shutil.make_archive(str(archive), "zip", archive)
shutil.rmtree(archive)
return f"{archive.stem}.zip"
def yaml_export(appliance: "HonAppliance", anonymous: bool = False) -> str:
data = {
"attributes": appliance.attributes.copy(),
"appliance": appliance.info,
"statistics": appliance.statistics,
"additional_data": appliance.additional_data,
}
data |= {n: c.parameter_groups for n, c in appliance.commands.items()}
extra = {n: c.data for n, c in appliance.commands.items() if c.data}
if extra:
data |= {"extra_command_data": extra}
if anonymous:
for sensible in ["serialNumber", "coords"]:
data.get("appliance", {}).pop(sensible, None)
result = printer.pretty_print({"data": data})
if commands := printer.create_commands(appliance.commands):
result += printer.pretty_print({"commands": commands})
if rules := printer.create_rules(appliance.commands):
result += printer.pretty_print({"rules": rules})
if anonymous:
result = anonymize_data(result)
return result

View File

@ -12,3 +12,7 @@ class NoSessionException(Exception):
class NoAuthenticationException(Exception):
pass
class ApiError(Exception):
pass

View File

@ -1,63 +1,5 @@
def key_print(data, key="", start=True):
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "):
result = ""
if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
result += pretty_print(
value, key=key, intend=intend, is_list=True, whitespace=whitespace
)
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
)
else:
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
)
else:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.available_settings.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
def str_to_float(string: str | float) -> float:
try:
return int(string)
except ValueError:
return float(str(string).replace(",", "."))

View File

@ -1,13 +1,16 @@
import asyncio
import logging
from pathlib import Path
from types import TracebackType
from typing import List, Optional, Dict, Any, Type
from aiohttp import ClientSession
from typing_extensions import Self
from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance
from pyhon.connection.api import HonAPI
from pyhon.connection.api import TestAPI
from pyhon.exceptions import NoAuthenticationException
_LOGGER = logging.getLogger(__name__)
@ -18,12 +21,14 @@ class Hon:
email: Optional[str] = "",
password: Optional[str] = "",
session: Optional[ClientSession] = None,
test_data_path: Optional[Path] = None,
):
self._email: Optional[str] = email
self._password: Optional[str] = password
self._session: ClientSession | None = session
self._appliances: List[HonAppliance] = []
self._api: Optional[HonAPI] = None
self._test_data_path: Path = test_data_path or Path().cwd()
async def __aenter__(self) -> Self:
return await self.create()
@ -39,7 +44,7 @@ class Hon:
@property
def api(self) -> HonAPI:
if self._api is None:
raise exceptions.NoAuthenticationException
raise NoAuthenticationException
return self._api
@property
@ -66,11 +71,13 @@ class Hon:
return self._appliances
@appliances.setter
def appliances(self, appliances) -> None:
def appliances(self, appliances: List[HonAppliance]) -> None:
self._appliances = appliances
async def _create_appliance(self, appliance_data: Dict[str, Any], zone=0) -> None:
appliance = HonAppliance(self._api, appliance_data, zone=zone)
async def _create_appliance(
self, appliance_data: Dict[str, Any], api: HonAPI, zone: int = 0
) -> None:
appliance = HonAppliance(api, appliance_data, zone=zone)
if appliance.mac_address == "":
return
try:
@ -87,12 +94,20 @@ class Hon:
self._appliances.append(appliance)
async def setup(self) -> None:
appliance: Dict
for appliance in (await self.api.load_appliances())["payload"]["appliances"]:
appliances = await self.api.load_appliances()
for appliance in appliances:
if (zones := int(appliance.get("zone", "0"))) > 1:
for zone in range(zones):
await self._create_appliance(appliance.copy(), zone=zone + 1)
await self._create_appliance(appliance)
await self._create_appliance(
appliance.copy(), self.api, zone=zone + 1
)
await self._create_appliance(appliance, self.api)
if (
test_data := self._test_data_path / "hon-test-data" / "test_data"
).exists() or (test_data := test_data / "test_data").exists():
api = TestAPI(test_data)
for appliance in await api.load_appliances():
await self._create_appliance(appliance, api)
async def close(self) -> None:
await self.api.close()

View File

@ -1,14 +1,27 @@
from typing import Dict, Any, List
from typing import Dict, Any, List, Tuple, Callable, TYPE_CHECKING
if TYPE_CHECKING:
from pyhon.rules import HonRule
class HonParameter:
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
self._key = key
self._category: str = attributes.get("category", "")
self._typology: str = attributes.get("typology", "")
self._mandatory: int = attributes.get("mandatory", 0)
self._attributes = attributes
self._category: str = ""
self._typology: str = ""
self._mandatory: int = 0
self._value: str | float = ""
self._group: str = group
self._triggers: Dict[
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
] = {}
self._set_attributes()
def _set_attributes(self) -> None:
self._category = self._attributes.get("category", "")
self._typology = self._attributes.get("typology", "")
self._mandatory = self._attributes.get("mandatory", 0)
@property
def key(self) -> str:
@ -18,6 +31,15 @@ class HonParameter:
def value(self) -> str | float:
return self._value if self._value is not None else "0"
@value.setter
def value(self, value: str | float) -> None:
self._value = value
self.check_trigger(value)
@property
def intern_value(self) -> str:
return str(self.value)
@property
def values(self) -> List[str]:
return [str(self.value)]
@ -37,3 +59,40 @@ class HonParameter:
@property
def group(self) -> str:
return self._group
def add_trigger(
self, value: str, func: Callable[["HonRule"], None], data: "HonRule"
) -> None:
if self._value == value:
func(data)
self._triggers.setdefault(value, []).append((func, data))
def check_trigger(self, value: str | float) -> None:
triggers = {str(k).lower(): v for k, v in self._triggers.items()}
if str(value).lower() in triggers:
for trigger in triggers[str(value)]:
func, args = trigger
func(args)
@property
def triggers(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for value, rules in self._triggers.items():
for _, rule in rules:
if rule.extras:
param = result.setdefault(value, {})
for extra_key, extra_value in rule.extras.items():
param = param.setdefault(extra_key, {}).setdefault(
extra_value, {}
)
else:
param = result.setdefault(value, {})
if fixed_value := rule.param_data.get("fixedValue"):
param[rule.param_key] = fixed_value
else:
param[rule.param_key] = rule.param_data.get("defaultValue", "")
return result
def reset(self) -> None:
self._set_attributes()

View File

@ -3,29 +3,49 @@ from typing import Dict, Any, List
from pyhon.parameter.base import HonParameter
def clean_value(value: str | float) -> str:
return str(value).strip("[]").replace("|", "_").lower()
class HonParameterEnum(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._default = attributes.get("defaultValue")
self._value = self._default or "0"
self._values: List[str] = attributes.get("enumValues", [])
if self._default and str(self._default) not in self.values:
self._default: str | float = ""
self._value: str | float = ""
self._values: List[str] = []
self._set_attributes()
if self._default and clean_value(self._default.strip("[]")) not in self.values:
self._values.append(self._default)
def _set_attributes(self) -> None:
super()._set_attributes()
self._default = self._attributes.get("defaultValue", "")
self._value = self._default or "0"
self._values = self._attributes.get("enumValues", [])
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> {self.values})"
@property
def values(self) -> List[str]:
return [str(value) for value in self._values]
return [clean_value(value) for value in self._values]
@values.setter
def values(self, values: List[str]) -> None:
self._values = values
@property
def intern_value(self) -> str:
return str(self._value) if self._value is not None else str(self.values[0])
@property
def value(self) -> str | float:
return self._value if self._value is not None else self.values[0]
return clean_value(self._value) if self._value is not None else self.values[0]
@value.setter
def value(self, value: str) -> None:
if value in self.values:
self._value = value
self.check_trigger(value)
else:
raise ValueError(f"Allowed values {self._value}")
raise ValueError(f"Allowed values: {self._values} But was: {value}")

View File

@ -6,16 +6,22 @@ from pyhon.parameter.base import HonParameter
class HonParameterFixed(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._value = attributes.get("fixedValue", None)
self._value: str | float = ""
self._set_attributes()
def _set_attributes(self) -> None:
super()._set_attributes()
self._value = self._attributes.get("fixedValue", "")
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> fixed)"
@property
def value(self) -> str | float:
return self._value if self._value is not None else "0"
return self._value if self._value != "" else "0"
@value.setter
def value(self, value: str | float) -> None:
# Fixed values seems being not so fixed as thought
self._value = value
self.check_trigger(value)

View File

@ -28,18 +28,29 @@ class HonParameterProgram(HonParameterEnum):
if value in self.values:
self._command.category = value
else:
raise ValueError(f"Allowed values {self.values}")
raise ValueError(f"Allowed values: {self.values} But was: {value}")
@property
def values(self) -> List[str]:
values = [v for v in self._programs if all(f not in v for f in self._FILTER)]
return sorted(values)
@values.setter
def values(self, values: List[str]) -> None:
raise ValueError("Cant set values {values}")
@property
def ids(self):
values = {
int(p.parameters["prCode"].value): n
for i, (n, p) in enumerate(self._programs.items())
if "iot_" not in n and p.parameters.get("prCode")
}
def ids(self) -> Dict[int, str]:
values: Dict[int, str] = {}
for name, parameter in self._programs.items():
if "iot_" in name:
continue
if parameter.parameters.get("prCode"):
continue
if (fav := parameter.parameters.get("favourite")) and fav.value == "1":
continue
values[int(parameter.parameters["prCode"].value)] = name
return dict(sorted(values.items()))
def set_value(self, value: str) -> None:
self._value = value

View File

@ -1,54 +1,71 @@
from typing import Dict, Any, List
from pyhon.helper import str_to_float
from pyhon.parameter.base import HonParameter
def str_to_float(string: str | float) -> float:
try:
return int(string)
except ValueError:
return float(str(string).replace(",", "."))
class HonParameterRange(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._min: float = str_to_float(attributes["minimumValue"])
self._max: float = str_to_float(attributes["maximumValue"])
self._step: float = str_to_float(attributes["incrementValue"])
self._default: float = str_to_float(attributes.get("defaultValue", self._min))
self._value: float = self._default
self._min: float = 0
self._max: float = 0
self._step: float = 0
self._default: float = 0
self._value: float = 0
self._set_attributes()
def __repr__(self):
return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])"
def _set_attributes(self) -> None:
super()._set_attributes()
self._min = str_to_float(self._attributes.get("minimumValue", 0))
self._max = str_to_float(self._attributes.get("maximumValue", 0))
self._step = str_to_float(self._attributes.get("incrementValue", 0))
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
self._value = self._default
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
@property
def min(self) -> float:
return self._min
@min.setter
def min(self, mini: float) -> None:
self._min = mini
@property
def max(self) -> float:
return self._max
@max.setter
def max(self, maxi: float) -> None:
self._max = maxi
@property
def step(self) -> float:
if not self._step:
return 1
return self._step
@step.setter
def step(self, step: float) -> None:
self._step = step
@property
def value(self) -> float:
return self._value if self._value is not None else self._min
def value(self) -> str | float:
return self._value if self._value is not None else self.min
@value.setter
def value(self, value: float) -> None:
def value(self, value: str | float) -> None:
value = str_to_float(value)
if self._min <= value <= self._max and not (value - self._min) % self._step:
if self.min <= value <= self.max and not ((value - self.min) * 100) % (
self.step * 100
):
self._value = value
self.check_trigger(value)
else:
raise ValueError(
f"Allowed: min {self._min} max {self._max} step {self._step}"
)
allowed = f"min {self.min} max {self.max} step {self.step}"
raise ValueError(f"Allowed: {allowed} But was: {value}")
@property
def values(self) -> List[str]:

87
pyhon/printer.py Normal file
View File

@ -0,0 +1,87 @@
from typing import Dict, Any, TYPE_CHECKING, List
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
if TYPE_CHECKING:
from pyhon.commands import HonCommand
def key_print(data: Any, key: str = "", start: bool = True) -> str:
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(
data: Any,
key: str = "",
intend: int = 0,
is_list: bool = False,
whitespace: str = " ",
) -> str:
result = ""
space = whitespace * intend
if isinstance(data, (dict, list)) and key:
result += f"{space}{'- ' if is_list else ''}{key}:\n"
intend += 1
if isinstance(data, list):
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
for i, (list_key, value) in enumerate(sorted(data.items())):
result += pretty_print(
value,
key=list_key,
intend=intend + (is_list if i else 0),
is_list=is_list and not i,
whitespace=whitespace,
)
else:
result += f"{space}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_commands(
commands: Dict[str, "HonCommand"], concat: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
if isinstance(data, HonParameterEnum):
value: List[str] | Dict[str, str | float] = data.values
elif isinstance(data, HonParameterRange):
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result.setdefault(name, {})[parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
def create_rules(
commands: Dict[str, "HonCommand"], concat: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
value = data.triggers
if not value:
continue
if not concat:
result.setdefault(name, {})[parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result

0
pyhon/py.typed Normal file
View File

145
pyhon/rules.py Normal file
View File

@ -0,0 +1,145 @@
from dataclasses import dataclass
from typing import List, Dict, TYPE_CHECKING, Any, Optional
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
from pyhon.typedefs import Parameter
if TYPE_CHECKING:
from pyhon.commands import HonCommand
from pyhon.parameter.base import HonParameter
@dataclass
class HonRule:
trigger_key: str
trigger_value: str
param_key: str
param_data: Dict[str, Any]
extras: Optional[Dict[str, str]] = None
class HonRuleSet:
def __init__(self, command: "HonCommand", rule: Dict[str, Any]):
self._command: "HonCommand" = command
self._rules: Dict[str, List[HonRule]] = {}
self._parse_rule(rule)
@property
def rules(self) -> Dict[str, List[HonRule]]:
return self._rules
def _parse_rule(self, rule: Dict[str, Any]) -> None:
for param_key, params in rule.items():
param_key = self._command.appliance.options.get(param_key, param_key)
for trigger_key, trigger_data in params.items():
self._parse_conditions(param_key, trigger_key, trigger_data)
def _parse_conditions(
self,
param_key: str,
trigger_key: str,
trigger_data: Dict[str, Any],
extra: Optional[Dict[str, str]] = None,
) -> None:
trigger_key = trigger_key.replace("@", "")
trigger_key = self._command.appliance.options.get(trigger_key, trigger_key)
for multi_trigger_value, param_data in trigger_data.items():
for trigger_value in multi_trigger_value.split("|"):
if isinstance(param_data, dict) and "typology" in param_data:
self._create_rule(
param_key, trigger_key, trigger_value, param_data, extra
)
elif isinstance(param_data, dict):
if extra is None:
extra = {}
extra[trigger_key] = trigger_value
for extra_key, extra_data in param_data.items():
self._parse_conditions(param_key, extra_key, extra_data, extra)
else:
param_data = {"typology": "fixed", "fixedValue": param_data}
self._create_rule(
param_key, trigger_key, trigger_value, param_data, extra
)
def _create_rule(
self,
param_key: str,
trigger_key: str,
trigger_value: str,
param_data: Dict[str, Any],
extras: Optional[Dict[str, str]] = None,
) -> None:
if param_data.get("fixedValue") == f"@{param_key}":
return
self._rules.setdefault(trigger_key, []).append(
HonRule(trigger_key, trigger_value, param_key, param_data, extras)
)
def _duplicate_for_extra_conditions(self) -> None:
new: Dict[str, List[HonRule]] = {}
for rules in self._rules.values():
for rule in rules:
if rule.extras is None:
continue
for key, value in rule.extras.items():
extras = rule.extras.copy()
extras.pop(key)
extras[rule.trigger_key] = rule.trigger_value
new.setdefault(key, []).append(
HonRule(key, value, rule.param_key, rule.param_data, extras)
)
for key, rules in new.items():
for rule in rules:
self._rules.setdefault(key, []).append(rule)
def _extra_rules_matches(self, rule: HonRule) -> bool:
if rule.extras:
for key, value in rule.extras.items():
if not self._command.parameters.get(key):
return False
if str(self._command.parameters.get(key)) != str(value):
return False
return True
def _apply_fixed(self, param: Parameter, value: str | float) -> None:
if isinstance(param, HonParameterEnum) and set(param.values) != {str(value)}:
param.values = [str(value)]
param.value = str(value)
elif isinstance(param, HonParameterRange):
if float(value) < param.min:
param.min = float(value)
elif float(value) > param.max:
param.max = float(value)
param.value = float(value)
return
param.value = str(value)
def _apply_enum(self, param: Parameter, rule: HonRule) -> None:
if not isinstance(param, HonParameterEnum):
return
if enum_values := rule.param_data.get("enumValues"):
param.values = enum_values.split("|")
if default_value := rule.param_data.get("defaultValue"):
param.value = default_value
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
def apply(rule: HonRule) -> None:
if not self._extra_rules_matches(rule):
return
if not (param := self._command.parameters.get(rule.param_key)):
return
if fixed_value := rule.param_data.get("fixedValue", ""):
self._apply_fixed(param, fixed_value)
elif rule.param_data.get("typology") == "enum":
self._apply_enum(param, rule)
parameter.add_trigger(data.trigger_value, apply, data)
def patch(self) -> None:
self._duplicate_for_extra_conditions()
for name, parameter in self._command.parameters.items():
if name not in self._rules:
continue
for data in self._rules.get(name, []):
self._add_trigger(parameter, data)

27
pyhon/typedefs.py Normal file
View File

@ -0,0 +1,27 @@
from typing import Union, Any, TYPE_CHECKING, Protocol
import aiohttp
from yarl import URL
if TYPE_CHECKING:
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange
class Callback(Protocol): # pylint: disable=too-few-public-methods
def __call__(
self, url: str | URL, *args: Any, **kwargs: Any
) -> aiohttp.client._RequestContextManager:
...
Parameter = Union[
"HonParameter",
"HonParameterRange",
"HonParameterEnum",
"HonParameterFixed",
"HonParameterProgram",
]

View File

@ -1,2 +1,3 @@
aiohttp==3.8.4
yarl==1.8.2
aiohttp>=3.8
yarl>=1.8
typing-extensions>=4.8

View File

@ -1,4 +1,5 @@
black==23.3.0
flake8==6.0.0
mypy==1.2.0
pylint==2.17.2
black>=22.12
flake8>=6.0
mypy>=0.991
pylint>=2.15
setuptools>=62.3

View File

@ -2,12 +2,12 @@
from setuptools import setup, find_packages
with open("README.md", "r") as f:
with open("README.md", "r", encoding="utf-8") as f:
long_description = f.read()
setup(
name="pyhOn",
version="0.10.10",
version="0.15.15",
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,
@ -21,7 +21,7 @@ setup(
packages=find_packages(),
include_package_data=True,
python_requires=">=3.10",
install_requires=["aiohttp"],
install_requires=["aiohttp>=3.8", "typing-extensions>=4.8", "yarl>=1.8"],
classifiers=[
"Development Status :: 4 - Beta",
"Environment :: Console",
@ -30,6 +30,7 @@ setup(
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules",
],
entry_points={