Compare commits
57 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
327d4a1814 | ||
|
4dc60c29ee | ||
|
7bc9e718a0 | ||
|
d4e5793186 | ||
|
e9f2bb9f4f | ||
|
ea81e2891f | ||
|
c71f8f17f5 | ||
|
27d974abba | ||
|
ab18e45f97 | ||
|
e44b9b6773 | ||
|
5c650d722b | ||
|
6ae40481e3 | ||
|
ff8ae160bb | ||
|
658e80a8f4 | ||
|
ffba85bf0d | ||
|
10c8d961c4 | ||
|
61dd470588 | ||
|
1ed81c2a77 | ||
|
e4dc3cb1d0 | ||
|
2523069ce9 | ||
|
eeb458cb1b | ||
|
2764700bc7 | ||
|
e6c796e822 | ||
|
454f2d8916 | ||
|
59ca8b6caf | ||
|
44c55c681d | ||
|
cfee10df5f | ||
|
e0774677eb | ||
|
fc60d15e60 | ||
|
8ef8c0405d | ||
|
5575b240e1 | ||
|
22367242a2 | ||
|
4f7d4860db | ||
|
5a778373b6 | ||
|
e1c8bc5835 | ||
|
442e7a07dd | ||
|
b0e3b15ff0 | ||
|
2788a3ac91 | ||
|
bc7e8994c9 | ||
|
8ca40d7ad0 | ||
|
9a6a07fd46 | ||
|
f1818bbc5d | ||
|
3d5c8405ea | ||
|
e234ef3bbb | ||
|
e00e147ecd | ||
|
26bc35c8a6 | ||
|
17d73cdeb8 | ||
|
a10ab4423e | ||
|
0553e6c17d | ||
|
44f40c531e | ||
|
4e88bc7a9f | ||
|
b5d8a90d79 | ||
|
52837f16e3 | ||
|
2a6b040193 | ||
|
9eb99f283b | ||
|
ad0d065b03 | ||
|
924e2c240d |
12
.github/workflows/python-check.yml
vendored
12
.github/workflows/python-check.yml
vendored
@ -13,7 +13,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ["3.10", "3.11"]
|
||||
python-version: ["3.10", "3.11", "3.12"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@ -28,15 +28,13 @@ jobs:
|
||||
python -m pip install -r requirements_dev.txt
|
||||
- name: Lint with flake8
|
||||
run: |
|
||||
# stop the build if there are Python syntax errors or undefined names
|
||||
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
|
||||
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
|
||||
flake8 . --count --statistics
|
||||
- name: Type check with mypy
|
||||
run: |
|
||||
mypy pyhon/
|
||||
# - name: Analysing the code with pylint
|
||||
# run: |
|
||||
# pylint --max-line-length 88 $(git ls-files '*.py')
|
||||
- name: Analysing the code with pylint
|
||||
run: |
|
||||
pylint $(git ls-files '*.py')
|
||||
- name: Check black style
|
||||
run: |
|
||||
black . --check
|
||||
|
9
.pylintrc
Normal file
9
.pylintrc
Normal file
@ -0,0 +1,9 @@
|
||||
[MESSAGES CONTROL]
|
||||
|
||||
disable=missing-docstring
|
||||
|
||||
[FORMAT]
|
||||
|
||||
max-args=6
|
||||
max-attributes=8
|
||||
max-line-length=88
|
1
MANIFEST.in
Normal file
1
MANIFEST.in
Normal file
@ -0,0 +1 @@
|
||||
include pyhon/py.typed
|
11
README.md
11
README.md
@ -1,3 +1,14 @@
|
||||
# Announcement: I have to take the project down in the next few days
|
||||
> Dear User,
|
||||
>
|
||||
> We are writing to inform you that we have discovered two Home Assistant integration plug-ins developed by you ( https://github.com/Andre0512/hon and https://github.com/Andre0512/pyhOn ) that are in violation of our terms of service. Specifically, the plug-ins are using our services in an unauthorized manner which is causing significant economic harm to our Company.
|
||||
> We take the protection of our intellectual property very seriously and demand that you immediately cease and desist all illegal activities related to the development and distribution of these plug-ins. We also request that you remove the plug-ins from all stores and code hosting platforms where they are currently available.
|
||||
> Please be advised that we will take all necessary legal action to protect our interests if you fail to comply with this notice. We reserve the right to pursue all available remedies, including but not limited to monetary damages, injunctive relief, and attorney's fees.
|
||||
> We strongly urge you to take immediate action to rectify this situation and avoid any further legal action. If you have any questions or concerns, please do not hesitate to contact us.
|
||||
>
|
||||
> Haier Europe Security and Governance Department
|
||||
|
||||
|
||||
**This python package is unofficial and is not related in any way to Haier. It was developed by reversed engineered requests and can stop working at anytime!**
|
||||
|
||||
# pyhOn
|
||||
|
24
mypy.ini
Normal file
24
mypy.ini
Normal file
@ -0,0 +1,24 @@
|
||||
[mypy]
|
||||
check_untyped_defs = true
|
||||
disallow_any_generics = true
|
||||
disallow_any_unimported = true
|
||||
disallow_incomplete_defs = true
|
||||
disallow_subclassing_any = true
|
||||
disallow_untyped_calls = true
|
||||
disallow_untyped_decorators = true
|
||||
disallow_untyped_defs = true
|
||||
disable_error_code = annotation-unchecked
|
||||
enable_error_code = ignore-without-code, redundant-self, truthy-iterable
|
||||
follow_imports = silent
|
||||
local_partial_types = true
|
||||
no_implicit_optional = true
|
||||
no_implicit_reexport = true
|
||||
show_error_codes = true
|
||||
strict_concatenate = false
|
||||
strict_equality = true
|
||||
warn_incomplete_stub = true
|
||||
warn_redundant_casts = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
warn_unused_configs = true
|
||||
warn_unused_ignores = true
|
@ -6,16 +6,18 @@ import logging
|
||||
import sys
|
||||
from getpass import getpass
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Dict, Any
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from pyhon import Hon, HonAPI, helper, diagnose
|
||||
# pylint: disable=wrong-import-position
|
||||
from pyhon import Hon, HonAPI, diagnose, printer
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_arguments():
|
||||
def get_arguments() -> Dict[str, Any]:
|
||||
"""Get parsed arguments."""
|
||||
parser = argparse.ArgumentParser(description="pyhOn: Command Line Utility")
|
||||
parser.add_argument("-u", "--user", help="user for haier hOn account")
|
||||
@ -29,17 +31,20 @@ def get_arguments():
|
||||
export.add_argument("--zip", help="create zip archive", action="store_true")
|
||||
export.add_argument("--anonymous", help="anonymize data", action="store_true")
|
||||
export.add_argument("directory", nargs="?", default=Path().cwd())
|
||||
translate = subparser.add_parser(
|
||||
translation = subparser.add_parser(
|
||||
"translate", help="print available translation keys"
|
||||
)
|
||||
translate.add_argument(
|
||||
translation.add_argument(
|
||||
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
|
||||
)
|
||||
translate.add_argument("--json", help="print as json", action="store_true")
|
||||
translation.add_argument("--json", help="print as json", action="store_true")
|
||||
parser.add_argument(
|
||||
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
|
||||
)
|
||||
return vars(parser.parse_args())
|
||||
|
||||
|
||||
async def translate(language, json_output=False):
|
||||
async def translate(language: str, json_output: bool = False) -> None:
|
||||
async with HonAPI(anonymous=True) as hon:
|
||||
keys = await hon.translation_keys(language)
|
||||
if json_output:
|
||||
@ -52,10 +57,10 @@ async def translate(language, json_output=False):
|
||||
.replace("\\r", "")
|
||||
)
|
||||
keys = json.loads(clean_keys)
|
||||
print(helper.pretty_print(keys))
|
||||
print(printer.pretty_print(keys))
|
||||
|
||||
|
||||
def get_login_data(args):
|
||||
def get_login_data(args: Dict[str, str]) -> Tuple[str, str]:
|
||||
if not (user := args["user"]):
|
||||
user = input("User for hOn account: ")
|
||||
if not (password := args["password"]):
|
||||
@ -63,44 +68,44 @@ def get_login_data(args):
|
||||
return user, password
|
||||
|
||||
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
args = get_arguments()
|
||||
if language := args.get("translate"):
|
||||
await translate(language, json_output=args.get("json"))
|
||||
await translate(language, json_output=args.get("json", ""))
|
||||
return
|
||||
async with Hon(*get_login_data(args)) as hon:
|
||||
async with Hon(
|
||||
*get_login_data(args), test_data_path=Path(args.get("import", ""))
|
||||
) as hon:
|
||||
for device in hon.appliances:
|
||||
if args.get("export"):
|
||||
anonymous = args.get("anonymous", False)
|
||||
path = Path(args.get("directory"))
|
||||
path = Path(args.get("directory", "."))
|
||||
if not args.get("zip"):
|
||||
for file in await diagnose.appliance_data(device, path, anonymous):
|
||||
print(f"Created {file}")
|
||||
else:
|
||||
file = await diagnose.zip_archive(device, path, anonymous)
|
||||
print(f"Created {file}")
|
||||
archive = await diagnose.zip_archive(device, path, anonymous)
|
||||
print(f"Created {archive}")
|
||||
continue
|
||||
print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10)
|
||||
if args.get("keys"):
|
||||
data = device.data.copy()
|
||||
attr = "get" if args.get("all") else "pop"
|
||||
print(
|
||||
helper.key_print(
|
||||
data["attributes"].__getattribute__(attr)("parameters")
|
||||
)
|
||||
printer.key_print(getattr(data["attributes"], attr)("parameters"))
|
||||
)
|
||||
print(helper.key_print(data.__getattribute__(attr)("appliance")))
|
||||
print(helper.key_print(data))
|
||||
print(printer.key_print(getattr(data, attr)("appliance")))
|
||||
print(printer.key_print(data))
|
||||
print(
|
||||
helper.pretty_print(
|
||||
helper.create_command(device.commands, concat=True)
|
||||
printer.pretty_print(
|
||||
printer.create_commands(device.commands, concat=True)
|
||||
)
|
||||
)
|
||||
else:
|
||||
print(diagnose.yaml_export(device))
|
||||
|
||||
|
||||
def start():
|
||||
def start() -> None:
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
|
@ -1,22 +1,29 @@
|
||||
import importlib
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, TYPE_CHECKING
|
||||
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
|
||||
|
||||
from pyhon import diagnose
|
||||
from pyhon import diagnose, exceptions
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
from pyhon.attributes import HonAttribute
|
||||
from pyhon.command_loader import HonCommandLoader
|
||||
from pyhon.commands import HonCommand
|
||||
from pyhon.parameter.base import HonParameter
|
||||
from pyhon.parameter.enum import HonParameterEnum
|
||||
from pyhon.parameter.range import HonParameterRange
|
||||
from pyhon.typedefs import Parameter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon import HonAPI
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# pylint: disable=too-many-public-methods,too-many-instance-attributes
|
||||
class HonAppliance:
|
||||
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
|
||||
|
||||
@ -25,69 +32,85 @@ class HonAppliance:
|
||||
) -> None:
|
||||
if attributes := info.get("attributes"):
|
||||
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
|
||||
self._info: Dict = info
|
||||
self._info: Dict[str, Any] = info
|
||||
self._api: Optional[HonAPI] = api
|
||||
self._appliance_model: Dict = {}
|
||||
self._appliance_model: Dict[str, Any] = {}
|
||||
|
||||
self._commands: Dict[str, HonCommand] = {}
|
||||
self._statistics: Dict = {}
|
||||
self._attributes: Dict = {}
|
||||
self._statistics: Dict[str, Any] = {}
|
||||
self._attributes: Dict[str, Any] = {}
|
||||
self._zone: int = zone
|
||||
self._additional_data: Dict[str, Any] = {}
|
||||
self._last_update = None
|
||||
self._last_update: Optional[datetime] = None
|
||||
self._default_setting = HonParameter("", {}, "")
|
||||
|
||||
try:
|
||||
self._extra = importlib.import_module(
|
||||
self._extra: Optional[ApplianceBase] = importlib.import_module(
|
||||
f"pyhon.appliances.{self.appliance_type.lower()}"
|
||||
).Appliance(self)
|
||||
except ModuleNotFoundError:
|
||||
self._extra = None
|
||||
|
||||
def __getitem__(self, item):
|
||||
def _get_nested_item(self, item: str) -> Any:
|
||||
result: List[Any] | Dict[str, Any] = self.data
|
||||
for key in item.split("."):
|
||||
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
||||
result = result[int(key)]
|
||||
elif isinstance(result, dict):
|
||||
result = result[key]
|
||||
return result
|
||||
|
||||
def __getitem__(self, item: str) -> Any:
|
||||
if self._zone:
|
||||
item += f"Z{self._zone}"
|
||||
if "." in item:
|
||||
result = self.data
|
||||
for key in item.split("."):
|
||||
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
||||
result = result[int(key)]
|
||||
else:
|
||||
result = result[key]
|
||||
return result
|
||||
return self._get_nested_item(item)
|
||||
if item in self.data:
|
||||
return self.data[item]
|
||||
if item in self.attributes["parameters"]:
|
||||
return self.attributes["parameters"][item].value
|
||||
return self.info[item]
|
||||
|
||||
def get(self, item, default=None):
|
||||
@overload
|
||||
def get(self, item: str, default: None = None) -> Any:
|
||||
...
|
||||
|
||||
@overload
|
||||
def get(self, item: str, default: T) -> T:
|
||||
...
|
||||
|
||||
def get(self, item: str, default: Optional[T] = None) -> Any:
|
||||
try:
|
||||
return self[item]
|
||||
except (KeyError, IndexError):
|
||||
return default
|
||||
|
||||
def _check_name_zone(self, name: str, frontend: bool = True) -> str:
|
||||
middle = " Z" if frontend else "_z"
|
||||
if (attribute := self._info.get(name, "")) and self._zone:
|
||||
return f"{attribute}{middle}{self._zone}"
|
||||
zone = " Z" if frontend else "_z"
|
||||
attribute: str = self._info.get(name, "")
|
||||
if attribute and self._zone:
|
||||
return f"{attribute}{zone}{self._zone}"
|
||||
return attribute
|
||||
|
||||
@property
|
||||
def appliance_model_id(self) -> str:
|
||||
return self._info.get("applianceModelId", "")
|
||||
return str(self._info.get("applianceModelId", ""))
|
||||
|
||||
@property
|
||||
def appliance_type(self) -> str:
|
||||
return self._info.get("applianceTypeName", "")
|
||||
return str(self._info.get("applianceTypeName", ""))
|
||||
|
||||
@property
|
||||
def mac_address(self) -> str:
|
||||
return self.info.get("macAddress", "")
|
||||
return str(self.info.get("macAddress", ""))
|
||||
|
||||
@property
|
||||
def unique_id(self) -> str:
|
||||
return self._check_name_zone("macAddress", frontend=False)
|
||||
default_mac = "xx-xx-xx-xx-xx-xx"
|
||||
import_name = f"{self.appliance_type.lower()}_{self.appliance_model_id}"
|
||||
result = self._check_name_zone("macAddress", frontend=False)
|
||||
result = result.replace(default_mac, import_name)
|
||||
return result
|
||||
|
||||
@property
|
||||
def model_name(self) -> str:
|
||||
@ -95,45 +118,50 @@ class HonAppliance:
|
||||
|
||||
@property
|
||||
def brand(self) -> str:
|
||||
return self._check_name_zone("brand")
|
||||
brand = self._check_name_zone("brand")
|
||||
return brand[0].upper() + brand[1:]
|
||||
|
||||
@property
|
||||
def nick_name(self) -> str:
|
||||
return self._check_name_zone("nickName")
|
||||
result = self._check_name_zone("nickName")
|
||||
if not result or re.findall("^[xX1\\s-]+$", result):
|
||||
return self.model_name
|
||||
return result
|
||||
|
||||
@property
|
||||
def code(self) -> str:
|
||||
if code := self.info.get("code"):
|
||||
code: str = self.info.get("code", "")
|
||||
if code:
|
||||
return code
|
||||
serial_number = self.info.get("serialNumber", "")
|
||||
serial_number: str = self.info.get("serialNumber", "")
|
||||
return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
|
||||
|
||||
@property
|
||||
def model_id(self) -> int:
|
||||
return self._info.get("applianceModelId", 0)
|
||||
return int(self._info.get("applianceModelId", 0))
|
||||
|
||||
@property
|
||||
def options(self):
|
||||
return self._appliance_model.get("options", {})
|
||||
def options(self) -> Dict[str, Any]:
|
||||
return dict(self._appliance_model.get("options", {}))
|
||||
|
||||
@property
|
||||
def commands(self) -> Dict[str, HonCommand]:
|
||||
return self._commands
|
||||
|
||||
@property
|
||||
def attributes(self):
|
||||
def attributes(self) -> Dict[str, Any]:
|
||||
return self._attributes
|
||||
|
||||
@property
|
||||
def statistics(self):
|
||||
def statistics(self) -> Dict[str, Any]:
|
||||
return self._statistics
|
||||
|
||||
@property
|
||||
def info(self):
|
||||
def info(self) -> Dict[str, Any]:
|
||||
return self._info
|
||||
|
||||
@property
|
||||
def additional_data(self):
|
||||
def additional_data(self) -> Dict[str, Any]:
|
||||
return self._additional_data
|
||||
|
||||
@property
|
||||
@ -141,50 +169,52 @@ class HonAppliance:
|
||||
return self._zone
|
||||
|
||||
@property
|
||||
def api(self) -> Optional["HonAPI"]:
|
||||
def api(self) -> "HonAPI":
|
||||
"""api connection object"""
|
||||
if self._api is None:
|
||||
raise exceptions.NoAuthenticationException("Missing hOn login")
|
||||
return self._api
|
||||
|
||||
async def load_commands(self):
|
||||
async def load_commands(self) -> None:
|
||||
command_loader = HonCommandLoader(self.api, self)
|
||||
await command_loader.load_commands()
|
||||
self._commands = command_loader.commands
|
||||
self._additional_data = command_loader.additional_data
|
||||
self._appliance_model = command_loader.appliance_data
|
||||
self.sync_params_to_command("settings")
|
||||
|
||||
async def load_attributes(self):
|
||||
self._attributes = await self.api.load_attributes(self)
|
||||
for name, values in self._attributes.pop("shadow").get("parameters").items():
|
||||
async def load_attributes(self) -> None:
|
||||
attributes = await self.api.load_attributes(self)
|
||||
for name, values in attributes.pop("shadow", {}).get("parameters", {}).items():
|
||||
if name in self._attributes.get("parameters", {}):
|
||||
self._attributes["parameters"][name].update(values)
|
||||
else:
|
||||
self._attributes.setdefault("parameters", {})[name] = HonAttribute(
|
||||
values
|
||||
)
|
||||
self._attributes |= attributes
|
||||
if self._extra:
|
||||
self._attributes = self._extra.attributes(self._attributes)
|
||||
|
||||
async def load_statistics(self):
|
||||
async def load_statistics(self) -> None:
|
||||
self._statistics = await self.api.load_statistics(self)
|
||||
self._statistics |= await self.api.load_maintenance(self)
|
||||
|
||||
async def update(self, force=False):
|
||||
async def update(self, force: bool = False) -> None:
|
||||
now = datetime.now()
|
||||
if (
|
||||
force
|
||||
or not self._last_update
|
||||
or self._last_update
|
||||
< now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
||||
):
|
||||
min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
||||
if force or not self._last_update or self._last_update < min_age:
|
||||
self._last_update = now
|
||||
await self.load_attributes()
|
||||
self.sync_params_to_command("settings")
|
||||
|
||||
@property
|
||||
def command_parameters(self):
|
||||
def command_parameters(self) -> Dict[str, Dict[str, str | float]]:
|
||||
return {n: c.parameter_value for n, c in self._commands.items()}
|
||||
|
||||
@property
|
||||
def settings(self):
|
||||
result = {}
|
||||
def settings(self) -> Dict[str, Parameter]:
|
||||
result: Dict[str, Parameter] = {}
|
||||
for name, command in self._commands.items():
|
||||
for key in command.setting_keys:
|
||||
setting = command.settings.get(key, self._default_setting)
|
||||
@ -194,7 +224,7 @@ class HonAppliance:
|
||||
return result
|
||||
|
||||
@property
|
||||
def available_settings(self):
|
||||
def available_settings(self) -> List[str]:
|
||||
result = []
|
||||
for name, command in self._commands.items():
|
||||
for key in command.setting_keys:
|
||||
@ -202,7 +232,7 @@ class HonAppliance:
|
||||
return result
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
def data(self) -> Dict[str, Any]:
|
||||
result = {
|
||||
"attributes": self.attributes,
|
||||
"appliance": self.info,
|
||||
@ -220,31 +250,67 @@ class HonAppliance:
|
||||
async def data_archive(self, path: Path) -> str:
|
||||
return await diagnose.zip_archive(self, path, anonymous=True)
|
||||
|
||||
def sync_to_params(self, command_name):
|
||||
command: HonCommand = self.commands.get(command_name)
|
||||
for key, value in self.attributes.get("parameters", {}).items():
|
||||
if isinstance(value, str) and (new := command.parameters.get(key)):
|
||||
def sync_command_to_params(self, command_name: str) -> None:
|
||||
if not (command := self.commands.get(command_name)):
|
||||
return
|
||||
for key in self.attributes.get("parameters", {}):
|
||||
if new := command.parameters.get(key):
|
||||
self.attributes["parameters"][key].update(
|
||||
str(new.intern_value), shield=True
|
||||
)
|
||||
|
||||
def sync_command(self, main, target=None) -> None:
|
||||
def sync_params_to_command(self, command_name: str) -> None:
|
||||
if not (command := self.commands.get(command_name)):
|
||||
return
|
||||
for key in command.setting_keys:
|
||||
if (
|
||||
new := self.attributes.get("parameters", {}).get(key)
|
||||
) is None or new.value == "":
|
||||
continue
|
||||
setting = command.settings[key]
|
||||
try:
|
||||
if not isinstance(setting, HonParameterRange):
|
||||
command.settings[key].value = str(new.value)
|
||||
else:
|
||||
command.settings[key].value = float(new.value)
|
||||
except ValueError as error:
|
||||
_LOGGER.info("Can't set %s - %s", key, error)
|
||||
continue
|
||||
|
||||
def sync_command(
|
||||
self,
|
||||
main: str,
|
||||
target: Optional[List[str] | str] = None,
|
||||
to_sync: Optional[List[str] | bool] = None,
|
||||
) -> None:
|
||||
base: Optional[HonCommand] = self.commands.get(main)
|
||||
if not base:
|
||||
return
|
||||
for command, data in self.commands.items():
|
||||
if command == main or target and command not in target:
|
||||
continue
|
||||
for name, parameter in data.parameters.items():
|
||||
if base_value := base.parameters.get(name):
|
||||
if isinstance(base_value, HonParameterRange) and isinstance(
|
||||
parameter, HonParameterRange
|
||||
):
|
||||
parameter.max = base_value.max
|
||||
parameter.min = base_value.min
|
||||
parameter.step = base_value.step
|
||||
elif isinstance(parameter, HonParameterRange):
|
||||
parameter.max = int(base_value.value)
|
||||
parameter.min = int(base_value.value)
|
||||
parameter.step = 1
|
||||
parameter.value = base_value.value
|
||||
|
||||
for name, target_param in data.parameters.items():
|
||||
if not (base_param := base.parameters.get(name)):
|
||||
continue
|
||||
if to_sync and (
|
||||
(isinstance(to_sync, list) and name not in to_sync)
|
||||
or not base_param.mandatory
|
||||
):
|
||||
continue
|
||||
self.sync_parameter(base_param, target_param)
|
||||
|
||||
def sync_parameter(self, main: Parameter, target: Parameter) -> None:
|
||||
if isinstance(main, HonParameterRange) and isinstance(
|
||||
target, HonParameterRange
|
||||
):
|
||||
target.max = main.max
|
||||
target.min = main.min
|
||||
target.step = main.step
|
||||
elif isinstance(target, HonParameterRange):
|
||||
target.max = int(main.value)
|
||||
target.min = int(main.value)
|
||||
target.step = 1
|
||||
elif isinstance(target, HonParameterEnum):
|
||||
target.values = main.values
|
||||
target.value = main.value
|
||||
|
@ -1,15 +1,25 @@
|
||||
from typing import Dict, Any, TYPE_CHECKING
|
||||
|
||||
from pyhon.parameter.program import HonParameterProgram
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon.appliance import HonAppliance
|
||||
|
||||
|
||||
class ApplianceBase:
|
||||
def __init__(self, appliance):
|
||||
def __init__(self, appliance: "HonAppliance"):
|
||||
self.parent = appliance
|
||||
|
||||
def attributes(self, data):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
program_name = "No Program"
|
||||
if program := int(str(data.get("parameters", {}).get("prCode", "0"))):
|
||||
if start_cmd := self.parent.settings.get("startProgram.program"):
|
||||
if ids := start_cmd.ids:
|
||||
if isinstance(start_cmd, HonParameterProgram) and (
|
||||
ids := start_cmd.ids
|
||||
):
|
||||
program_name = ids.get(program, program_name)
|
||||
data["programName"] = program_name
|
||||
return data
|
||||
|
||||
def settings(self, settings):
|
||||
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return settings
|
||||
|
@ -1,8 +1,11 @@
|
||||
# pylint: disable=duplicate-code
|
||||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
data["parameters"]["machMode"].value = "0"
|
||||
|
@ -1,19 +1,16 @@
|
||||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
data["parameters"]["temp"].value = "0"
|
||||
data["parameters"]["onOffStatus"].value = "0"
|
||||
data["parameters"]["remoteCtrValid"].value = "0"
|
||||
data["parameters"]["remainingTimeMM"].value = "0"
|
||||
|
||||
data["active"] = data["parameters"]["onOffStatus"] == "1"
|
||||
|
||||
if program := int(data["parameters"]["prCode"]):
|
||||
ids = self.parent.settings["startProgram.program"].ids
|
||||
data["programName"] = ids.get(program, "")
|
||||
data["parameters"]["temp"].value = 0
|
||||
data["parameters"]["onOffStatus"].value = 0
|
||||
data["parameters"]["remoteCtrValid"].value = 0
|
||||
data["parameters"]["remainingTimeMM"].value = 0
|
||||
|
||||
data["active"] = data["parameters"]["onOffStatus"].value == 1
|
||||
return data
|
||||
|
@ -1,19 +1,21 @@
|
||||
from typing import Dict, Any
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
if data["parameters"]["holidayMode"] == "1":
|
||||
data["modeZ1"] = "holiday"
|
||||
elif data["parameters"]["intelligenceMode"] == "1":
|
||||
data["modeZ1"] = "auto_set"
|
||||
elif data["parameters"]["quickModeZ1"] == "1":
|
||||
elif data["parameters"].get("quickModeZ1") == "1":
|
||||
data["modeZ1"] = "super_cool"
|
||||
else:
|
||||
data["modeZ1"] = "no_mode"
|
||||
|
||||
if data["parameters"]["quickModeZ2"] == "1":
|
||||
if data["parameters"].get("quickModeZ2") == "1":
|
||||
data["modeZ2"] = "super_freeze"
|
||||
elif data["parameters"]["intelligenceMode"] == "1":
|
||||
data["modeZ2"] = "auto_set"
|
||||
|
@ -1,9 +1,12 @@
|
||||
# pylint: disable=duplicate-code
|
||||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
from pyhon.parameter.fixed import HonParameterFixed
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
data["parameters"]["machMode"].value = "0"
|
||||
@ -11,7 +14,7 @@ class Appliance(ApplianceBase):
|
||||
data["pause"] = data["parameters"]["machMode"] == "3"
|
||||
return data
|
||||
|
||||
def settings(self, settings):
|
||||
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
|
||||
dry_level = settings.get("startProgram.dryLevel")
|
||||
if isinstance(dry_level, HonParameterFixed) and dry_level.value == "11":
|
||||
settings.pop("startProgram.dryLevel", None)
|
||||
|
@ -1,8 +1,11 @@
|
||||
# pylint: disable=duplicate-code
|
||||
from typing import Dict, Any
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
data["parameters"]["machMode"].value = "0"
|
||||
@ -10,5 +13,5 @@ class Appliance(ApplianceBase):
|
||||
data["pause"] = data["parameters"]["machMode"] == "3"
|
||||
return data
|
||||
|
||||
def settings(self, settings):
|
||||
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return settings
|
||||
|
16
pyhon/appliances/wh.py
Normal file
16
pyhon/appliances/wh.py
Normal file
@ -0,0 +1,16 @@
|
||||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
from pyhon.parameter.base import HonParameter
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
parameter = data.get("parameters", {}).get("onOffStatus")
|
||||
is_class = isinstance(parameter, HonParameter)
|
||||
data["active"] = parameter.value == 1 if is_class else parameter == 1
|
||||
return data
|
||||
|
||||
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return settings
|
@ -1,8 +1,11 @@
|
||||
# pylint: disable=duplicate-code
|
||||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
data["parameters"]["machMode"].value = "0"
|
||||
@ -10,5 +13,5 @@ class Appliance(ApplianceBase):
|
||||
data["pause"] = data["parameters"]["machMode"] == "3"
|
||||
return data
|
||||
|
||||
def settings(self, settings):
|
||||
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return settings
|
||||
|
@ -7,7 +7,7 @@ from pyhon.helper import str_to_float
|
||||
class HonAttribute:
|
||||
_LOCK_TIMEOUT: Final = 10
|
||||
|
||||
def __init__(self, data):
|
||||
def __init__(self, data: Dict[str, str] | str):
|
||||
self._value: str = ""
|
||||
self._last_update: Optional[datetime] = None
|
||||
self._lock_timestamp: Optional[datetime] = None
|
||||
@ -22,7 +22,7 @@ class HonAttribute:
|
||||
return self._value
|
||||
|
||||
@value.setter
|
||||
def value(self, value) -> None:
|
||||
def value(self, value: str) -> None:
|
||||
self._value = value
|
||||
|
||||
@property
|
||||
|
@ -4,24 +4,25 @@ from copy import copy
|
||||
from typing import Dict, Any, Optional, TYPE_CHECKING, List
|
||||
|
||||
from pyhon.commands import HonCommand
|
||||
from pyhon.exceptions import NoAuthenticationException
|
||||
from pyhon.parameter.fixed import HonParameterFixed
|
||||
from pyhon.parameter.program import HonParameterProgram
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon import HonAPI, exceptions
|
||||
from pyhon import HonAPI
|
||||
from pyhon.appliance import HonAppliance
|
||||
|
||||
|
||||
class HonCommandLoader:
|
||||
"""Loads and parses hOn command data"""
|
||||
|
||||
def __init__(self, api, appliance):
|
||||
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
|
||||
self._api: "HonAPI" = api
|
||||
self._appliance: "HonAppliance" = appliance
|
||||
self._api_commands: Dict[str, Any] = {}
|
||||
self._favourites: List[Dict[str, Any]] = []
|
||||
self._command_history: List[Dict[str, Any]] = []
|
||||
self._commands: Dict[str, HonCommand] = {}
|
||||
self._api: "HonAPI" = api
|
||||
self._appliance: "HonAppliance" = appliance
|
||||
self._appliance_data: Dict[str, Any] = {}
|
||||
self._additional_data: Dict[str, Any] = {}
|
||||
|
||||
@ -29,7 +30,7 @@ class HonCommandLoader:
|
||||
def api(self) -> "HonAPI":
|
||||
"""api connection object"""
|
||||
if self._api is None:
|
||||
raise exceptions.NoAuthenticationException("Missing hOn login")
|
||||
raise NoAuthenticationException("Missing hOn login")
|
||||
return self._api
|
||||
|
||||
@property
|
||||
@ -52,25 +53,25 @@ class HonCommandLoader:
|
||||
"""Get command additional data"""
|
||||
return self._additional_data
|
||||
|
||||
async def load_commands(self):
|
||||
async def load_commands(self) -> None:
|
||||
"""Trigger loading of command data"""
|
||||
await self._load_data()
|
||||
self._appliance_data = self._api_commands.pop("applianceModel")
|
||||
self._appliance_data = self._api_commands.pop("applianceModel", {})
|
||||
self._get_commands()
|
||||
self._add_favourites()
|
||||
self._recover_last_command_states()
|
||||
|
||||
async def _load_commands(self):
|
||||
async def _load_commands(self) -> None:
|
||||
self._api_commands = await self._api.load_commands(self._appliance)
|
||||
|
||||
async def _load_favourites(self):
|
||||
async def _load_favourites(self) -> None:
|
||||
self._favourites = await self._api.load_favourites(self._appliance)
|
||||
|
||||
async def _load_command_history(self):
|
||||
async def _load_command_history(self) -> None:
|
||||
self._command_history = await self._api.load_command_history(self._appliance)
|
||||
|
||||
async def _load_data(self):
|
||||
"""Request parallel all relevant data"""
|
||||
async def _load_data(self) -> None:
|
||||
"""Callback parallel all relevant data"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
self._load_commands(),
|
||||
@ -102,14 +103,24 @@ class HonCommandLoader:
|
||||
self._commands = {c.name: c for c in commands}
|
||||
|
||||
def _parse_command(
|
||||
self, data: Dict[str, Any] | str, command_name: str, **kwargs
|
||||
self,
|
||||
data: Dict[str, Any] | str,
|
||||
command_name: str,
|
||||
categories: Optional[Dict[str, "HonCommand"]] = None,
|
||||
category_name: str = "",
|
||||
) -> Optional[HonCommand]:
|
||||
"""Try to crate HonCommand object"""
|
||||
if not isinstance(data, dict):
|
||||
self._additional_data[command_name] = data
|
||||
return None
|
||||
if self._is_command(data):
|
||||
return HonCommand(command_name, data, self._appliance, **kwargs)
|
||||
return HonCommand(
|
||||
command_name,
|
||||
data,
|
||||
self._appliance,
|
||||
category_name=category_name,
|
||||
categories=categories,
|
||||
)
|
||||
if category := self._parse_categories(data, command_name):
|
||||
return category
|
||||
return None
|
||||
@ -120,8 +131,9 @@ class HonCommandLoader:
|
||||
"""Parse categories and create reference to other"""
|
||||
categories: Dict[str, HonCommand] = {}
|
||||
for category, value in data.items():
|
||||
kwargs = {"category_name": category, "categories": categories}
|
||||
if command := self._parse_command(value, command_name, **kwargs):
|
||||
if command := self._parse_command(
|
||||
value, command_name, category_name=category, categories=categories
|
||||
):
|
||||
categories[self._clean_name(category)] = command
|
||||
if categories:
|
||||
# setParameters should be at first place
|
||||
@ -172,22 +184,44 @@ class HonCommandLoader:
|
||||
def _add_favourites(self) -> None:
|
||||
"""Patch program categories with favourites"""
|
||||
for favourite in self._favourites:
|
||||
name = favourite.get("favouriteName", {})
|
||||
command = favourite.get("command", {})
|
||||
command_name = command.get("commandName", "")
|
||||
program_name = self._clean_name(command.get("programName", ""))
|
||||
base: HonCommand = copy(
|
||||
self.commands[command_name].categories[program_name]
|
||||
)
|
||||
for data in command.values():
|
||||
if isinstance(data, str):
|
||||
name, command_name, base = self._get_favourite_info(favourite)
|
||||
if not base:
|
||||
continue
|
||||
base_command: HonCommand = copy(base)
|
||||
self._update_base_command_with_data(base_command, favourite)
|
||||
self._update_base_command_with_favourite(base_command)
|
||||
self._update_program_categories(command_name, name, base_command)
|
||||
|
||||
def _get_favourite_info(
|
||||
self, favourite: Dict[str, Any]
|
||||
) -> tuple[str, str, HonCommand | None]:
|
||||
name: str = favourite.get("favouriteName", {})
|
||||
command = favourite.get("command", {})
|
||||
command_name: str = command.get("commandName", "")
|
||||
program_name = self._clean_name(command.get("programName", ""))
|
||||
base_command = self.commands[command_name].categories.get(program_name)
|
||||
return name, command_name, base_command
|
||||
|
||||
def _update_base_command_with_data(
|
||||
self, base_command: HonCommand, command: Dict[str, Any]
|
||||
) -> None:
|
||||
for data in command.values():
|
||||
if isinstance(data, str):
|
||||
continue
|
||||
for key, value in data.items():
|
||||
if not (parameter := base_command.parameters.get(key)):
|
||||
continue
|
||||
for key, value in data.items():
|
||||
if parameter := base.parameters.get(key):
|
||||
with suppress(ValueError):
|
||||
parameter.value = value
|
||||
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
||||
base.parameters.update(favourite=extra_param)
|
||||
if isinstance(program := base.parameters["program"], HonParameterProgram):
|
||||
program.set_value(name)
|
||||
self.commands[command_name].categories[name] = base
|
||||
with suppress(ValueError):
|
||||
parameter.value = value
|
||||
|
||||
def _update_base_command_with_favourite(self, base_command: HonCommand) -> None:
|
||||
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
||||
base_command.parameters.update(favourite=extra_param)
|
||||
|
||||
def _update_program_categories(
|
||||
self, command_name: str, name: str, base_command: HonCommand
|
||||
) -> None:
|
||||
program = base_command.parameters["program"]
|
||||
if isinstance(program, HonParameterProgram):
|
||||
program.set_value(name)
|
||||
self.commands[command_name].categories[name] = base_command
|
||||
|
@ -9,6 +9,7 @@ from pyhon.parameter.fixed import HonParameterFixed
|
||||
from pyhon.parameter.program import HonParameterProgram
|
||||
from pyhon.parameter.range import HonParameterRange
|
||||
from pyhon.rules import HonRuleSet
|
||||
from pyhon.typedefs import Parameter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon import HonAPI
|
||||
@ -26,28 +27,29 @@ class HonCommand:
|
||||
categories: Optional[Dict[str, "HonCommand"]] = None,
|
||||
category_name: str = "",
|
||||
):
|
||||
self._api: Optional[HonAPI] = appliance.api
|
||||
self._appliance: "HonAppliance" = appliance
|
||||
self._name: str = name
|
||||
self._api: Optional[HonAPI] = None
|
||||
self._appliance: "HonAppliance" = appliance
|
||||
self._categories: Optional[Dict[str, "HonCommand"]] = categories
|
||||
self._category_name: str = category_name
|
||||
self._description: str = attributes.pop("description", "")
|
||||
self._protocol_type: str = attributes.pop("protocolType", "")
|
||||
self._parameters: Dict[str, HonParameter] = {}
|
||||
self._parameters: Dict[str, Parameter] = {}
|
||||
self._data: Dict[str, Any] = {}
|
||||
self._available_settings: Dict[str, HonParameter] = {}
|
||||
self._rules: List[HonRuleSet] = []
|
||||
attributes.pop("description", "")
|
||||
attributes.pop("protocolType", "")
|
||||
self._load_parameters(attributes)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self._name} command"
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def api(self) -> "HonAPI":
|
||||
if self._api is None and self._appliance:
|
||||
self._api = self._appliance.api
|
||||
if self._api is None:
|
||||
raise exceptions.NoAuthenticationException("Missing hOn login")
|
||||
return self._api
|
||||
@ -57,7 +59,7 @@ class HonCommand:
|
||||
return self._appliance
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
def data(self) -> Dict[str, Any]:
|
||||
return self._data
|
||||
|
||||
@property
|
||||
@ -75,25 +77,40 @@ class HonCommand:
|
||||
result.setdefault(parameter.group, {})[name] = parameter.intern_value
|
||||
return result
|
||||
|
||||
@property
|
||||
def mandatory_parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
|
||||
result: Dict[str, Dict[str, Union[str, float]]] = {}
|
||||
for name, parameter in self._parameters.items():
|
||||
if parameter.mandatory:
|
||||
result.setdefault(parameter.group, {})[name] = parameter.intern_value
|
||||
return result
|
||||
|
||||
@property
|
||||
def parameter_value(self) -> Dict[str, Union[str, float]]:
|
||||
return {n: p.value for n, p in self._parameters.items()}
|
||||
|
||||
def _load_parameters(self, attributes):
|
||||
def _load_parameters(self, attributes: Dict[str, Dict[str, Any] | Any]) -> None:
|
||||
for key, items in attributes.items():
|
||||
if not isinstance(items, dict):
|
||||
_LOGGER.info("Loading Attributes - Skipping %s", str(items))
|
||||
continue
|
||||
for name, data in items.items():
|
||||
self._create_parameters(data, name, key)
|
||||
for rule in self._rules:
|
||||
rule.patch()
|
||||
|
||||
def _create_parameters(self, data: Dict, name: str, parameter: str) -> None:
|
||||
def _create_parameters(
|
||||
self, data: Dict[str, Any], name: str, parameter: str
|
||||
) -> None:
|
||||
if name == "zoneMap" and self._appliance.zone:
|
||||
data["default"] = self._appliance.zone
|
||||
if data.get("category") == "rule":
|
||||
if "fixedValue" not in data:
|
||||
_LOGGER.error("Rule not supported: %s", data)
|
||||
else:
|
||||
if "fixedValue" in data:
|
||||
self._rules.append(HonRuleSet(self, data["fixedValue"]))
|
||||
elif "enumValues" in data:
|
||||
self._rules.append(HonRuleSet(self, data["enumValues"]))
|
||||
else:
|
||||
_LOGGER.warning("Rule not supported: %s", data)
|
||||
match data.get("typology"):
|
||||
case "range":
|
||||
self._parameters[name] = HonParameterRange(name, data, parameter)
|
||||
@ -108,14 +125,33 @@ class HonCommand:
|
||||
name = "program" if "PROGRAM" in self._category_name else "category"
|
||||
self._parameters[name] = HonParameterProgram(name, self, "custom")
|
||||
|
||||
async def send(self) -> bool:
|
||||
params = self.parameter_groups.get("parameters", {})
|
||||
async def send(self, only_mandatory: bool = False) -> bool:
|
||||
grouped_params = (
|
||||
self.mandatory_parameter_groups if only_mandatory else self.parameter_groups
|
||||
)
|
||||
params = grouped_params.get("parameters", {})
|
||||
return await self.send_parameters(params)
|
||||
|
||||
async def send_specific(self, param_names: List[str]) -> bool:
|
||||
params: Dict[str, str | float] = {}
|
||||
for key, parameter in self._parameters.items():
|
||||
if key in param_names or parameter.mandatory:
|
||||
params[key] = parameter.value
|
||||
return await self.send_parameters(params)
|
||||
|
||||
async def send_parameters(self, params: Dict[str, str | float]) -> bool:
|
||||
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
|
||||
ancillary_params.pop("programRules", None)
|
||||
self.appliance.sync_to_params(self.name)
|
||||
if "prStr" in params:
|
||||
params["prStr"] = self._category_name.upper()
|
||||
self.appliance.sync_command_to_params(self.name)
|
||||
try:
|
||||
result = await self.api.send_command(
|
||||
self._appliance, self._name, params, ancillary_params
|
||||
self._appliance,
|
||||
self._name,
|
||||
params,
|
||||
ancillary_params,
|
||||
self._category_name,
|
||||
)
|
||||
if not result:
|
||||
_LOGGER.error(result)
|
||||
@ -147,7 +183,7 @@ class HonCommand:
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _more_options(first: HonParameter, second: HonParameter):
|
||||
def _more_options(first: Parameter, second: Parameter) -> Parameter:
|
||||
if isinstance(first, HonParameterFixed) and not isinstance(
|
||||
second, HonParameterFixed
|
||||
):
|
||||
@ -157,8 +193,8 @@ class HonCommand:
|
||||
return first
|
||||
|
||||
@property
|
||||
def available_settings(self) -> Dict[str, HonParameter]:
|
||||
result: Dict[str, HonParameter] = {}
|
||||
def available_settings(self) -> Dict[str, Parameter]:
|
||||
result: Dict[str, Parameter] = {}
|
||||
for command in self.categories.values():
|
||||
for name, parameter in command.parameters.items():
|
||||
if name in result:
|
||||
@ -166,3 +202,7 @@ class HonCommand:
|
||||
else:
|
||||
result[name] = parameter
|
||||
return result
|
||||
|
||||
def reset(self) -> None:
|
||||
for parameter in self._parameters.values():
|
||||
parameter.reset()
|
||||
|
@ -3,7 +3,8 @@ import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
from typing import Dict, Optional, Any, List, no_type_check
|
||||
from types import TracebackType
|
||||
from typing import Dict, Optional, Any, List, no_type_check, Type
|
||||
|
||||
from aiohttp import ClientSession
|
||||
from typing_extensions import Self
|
||||
@ -36,7 +37,12 @@ class HonAPI:
|
||||
async def __aenter__(self) -> Self:
|
||||
return await self.create()
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
await self.close()
|
||||
|
||||
@property
|
||||
@ -46,13 +52,13 @@ class HonAPI:
|
||||
return self._hon.auth
|
||||
|
||||
@property
|
||||
def _hon(self):
|
||||
def _hon(self) -> HonConnectionHandler:
|
||||
if self._hon_handler is None:
|
||||
raise exceptions.NoAuthenticationException
|
||||
return self._hon_handler
|
||||
|
||||
@property
|
||||
def _hon_anonymous(self):
|
||||
def _hon_anonymous(self) -> HonAnonymousConnectionHandler:
|
||||
if self._hon_anonymous_handler is None:
|
||||
raise exceptions.NoAuthenticationException
|
||||
return self._hon_anonymous_handler
|
||||
@ -69,12 +75,16 @@ class HonAPI:
|
||||
|
||||
async def load_appliances(self) -> List[Dict[str, Any]]:
|
||||
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
|
||||
if result := await resp.json():
|
||||
return result.get("payload", {}).get("appliances", {})
|
||||
result = await resp.json()
|
||||
if result:
|
||||
appliances: List[Dict[str, Any]] = result.get("payload", {}).get(
|
||||
"appliances", {}
|
||||
)
|
||||
return appliances
|
||||
return []
|
||||
|
||||
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||
params: Dict = {
|
||||
params: Dict[str, str | int] = {
|
||||
"applianceType": appliance.appliance_type,
|
||||
"applianceModelId": appliance.appliance_model_id,
|
||||
"macAddress": appliance.mac_address,
|
||||
@ -90,7 +100,7 @@ class HonAPI:
|
||||
params["series"] = series
|
||||
url: str = f"{const.API_URL}/commands/v1/retrieve"
|
||||
async with self._hon.get(url, params=params) as response:
|
||||
result: Dict = (await response.json()).get("payload", {})
|
||||
result: Dict[str, Any] = (await response.json()).get("payload", {})
|
||||
if not result or result.pop("resultCode") != "0":
|
||||
_LOGGER.error(await response.json())
|
||||
return {}
|
||||
@ -103,76 +113,87 @@ class HonAPI:
|
||||
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
|
||||
)
|
||||
async with self._hon.get(url) as response:
|
||||
result: Dict = await response.json()
|
||||
if not result or not result.get("payload"):
|
||||
return []
|
||||
return result["payload"]["history"]
|
||||
result: Dict[str, Any] = await response.json()
|
||||
if not result or not result.get("payload"):
|
||||
return []
|
||||
command_history: List[Dict[str, Any]] = result["payload"]["history"]
|
||||
return command_history
|
||||
|
||||
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
|
||||
url: str = (
|
||||
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/favourite"
|
||||
)
|
||||
async with self._hon.get(url) as response:
|
||||
result: Dict = await response.json()
|
||||
if not result or not result.get("payload"):
|
||||
return []
|
||||
return result["payload"]["favourites"]
|
||||
result: Dict[str, Any] = await response.json()
|
||||
if not result or not result.get("payload"):
|
||||
return []
|
||||
favourites: List[Dict[str, Any]] = result["payload"]["favourites"]
|
||||
return favourites
|
||||
|
||||
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
|
||||
params: Dict = {"macAddress": appliance.mac_address}
|
||||
params: Dict[str, str] = {"macAddress": appliance.mac_address}
|
||||
async with self._hon.get(url, params=params) as response:
|
||||
result: Dict = await response.json()
|
||||
if result and (activity := result.get("attributes")):
|
||||
return activity
|
||||
result: Dict[str, Any] = await response.json()
|
||||
if result:
|
||||
activity: Dict[str, Any] = result.get("attributes", "")
|
||||
if activity:
|
||||
return activity
|
||||
return {}
|
||||
|
||||
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||
url: str = f"{const.API_URL}/commands/v1/appliance-model"
|
||||
params: Dict = {
|
||||
params: Dict[str, str] = {
|
||||
"code": appliance.code,
|
||||
"macAddress": appliance.mac_address,
|
||||
}
|
||||
async with self._hon.get(url, params=params) as response:
|
||||
result: Dict = await response.json()
|
||||
result: Dict[str, Any] = await response.json()
|
||||
if result:
|
||||
return result.get("payload", {}).get("applianceModel", {})
|
||||
appliance_data: Dict[str, Any] = result.get("payload", {}).get(
|
||||
"applianceModel", {}
|
||||
)
|
||||
return appliance_data
|
||||
return {}
|
||||
|
||||
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||
params: Dict = {
|
||||
params: Dict[str, str] = {
|
||||
"macAddress": appliance.mac_address,
|
||||
"applianceType": appliance.appliance_type,
|
||||
"category": "CYCLE",
|
||||
}
|
||||
url: str = f"{const.API_URL}/commands/v1/context"
|
||||
async with self._hon.get(url, params=params) as response:
|
||||
return (await response.json()).get("payload", {})
|
||||
attributes: Dict[str, Any] = (await response.json()).get("payload", {})
|
||||
return attributes
|
||||
|
||||
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||
params: Dict = {
|
||||
params: Dict[str, str] = {
|
||||
"macAddress": appliance.mac_address,
|
||||
"applianceType": appliance.appliance_type,
|
||||
}
|
||||
url: str = f"{const.API_URL}/commands/v1/statistics"
|
||||
async with self._hon.get(url, params=params) as response:
|
||||
return (await response.json()).get("payload", {})
|
||||
statistics: Dict[str, Any] = (await response.json()).get("payload", {})
|
||||
return statistics
|
||||
|
||||
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||
url = f"{const.API_URL}/commands/v1/maintenance-cycle"
|
||||
params = {"macAddress": appliance.mac_address}
|
||||
async with self._hon.get(url, params=params) as response:
|
||||
return (await response.json()).get("payload", {})
|
||||
maintenance: Dict[str, Any] = (await response.json()).get("payload", {})
|
||||
return maintenance
|
||||
|
||||
async def send_command(
|
||||
self,
|
||||
appliance: HonAppliance,
|
||||
command: str,
|
||||
parameters: Dict,
|
||||
ancillary_parameters: Dict,
|
||||
parameters: Dict[str, Any],
|
||||
ancillary_parameters: Dict[str, Any],
|
||||
program_name: str = "",
|
||||
) -> bool:
|
||||
now: str = datetime.utcnow().isoformat()
|
||||
data: Dict = {
|
||||
data: Dict[str, Any] = {
|
||||
"macAddress": appliance.mac_address,
|
||||
"timestamp": f"{now[:-3]}Z",
|
||||
"commandName": command,
|
||||
@ -188,9 +209,11 @@ class HonAPI:
|
||||
"parameters": parameters,
|
||||
"applianceType": appliance.appliance_type,
|
||||
}
|
||||
if command == "startProgram" and program_name:
|
||||
data.update({"programName": program_name.upper()})
|
||||
url: str = f"{const.API_URL}/commands/v1/send"
|
||||
async with self._hon.post(url, json=data) as response:
|
||||
json_data: Dict = await response.json()
|
||||
json_data: Dict[str, Any] = await response.json()
|
||||
if json_data.get("payload", {}).get("resultCode") == "0":
|
||||
return True
|
||||
_LOGGER.error(await response.text())
|
||||
@ -200,16 +223,15 @@ class HonAPI:
|
||||
async def appliance_configuration(self) -> Dict[str, Any]:
|
||||
url: str = f"{const.API_URL}/config/v1/program-list-rules"
|
||||
async with self._hon_anonymous.get(url) as response:
|
||||
result: Dict = await response.json()
|
||||
if result and (data := result.get("payload")):
|
||||
return data
|
||||
return {}
|
||||
result: Dict[str, Any] = await response.json()
|
||||
data: Dict[str, Any] = result.get("payload", {})
|
||||
return data
|
||||
|
||||
async def app_config(
|
||||
self, language: str = "en", beta: bool = True
|
||||
) -> Dict[str, Any]:
|
||||
url: str = f"{const.API_URL}/app-config"
|
||||
payload_data: Dict = {
|
||||
payload_data: Dict[str, str | int] = {
|
||||
"languageCode": language,
|
||||
"beta": beta,
|
||||
"appVersion": const.APP_VERSION,
|
||||
@ -217,17 +239,17 @@ class HonAPI:
|
||||
}
|
||||
payload: str = json.dumps(payload_data, separators=(",", ":"))
|
||||
async with self._hon_anonymous.post(url, data=payload) as response:
|
||||
if (result := await response.json()) and (data := result.get("payload")):
|
||||
return data
|
||||
return {}
|
||||
result = await response.json()
|
||||
data: Dict[str, Any] = result.get("payload", {})
|
||||
return data
|
||||
|
||||
async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
|
||||
config = await self.app_config(language=language)
|
||||
if url := config.get("language", {}).get("jsonPath"):
|
||||
async with self._hon_anonymous.get(url) as response:
|
||||
if result := await response.json():
|
||||
return result
|
||||
return {}
|
||||
if not (url := config.get("language", {}).get("jsonPath")):
|
||||
return {}
|
||||
async with self._hon_anonymous.get(url) as response:
|
||||
result: Dict[str, Any] = await response.json()
|
||||
return result
|
||||
|
||||
async def close(self) -> None:
|
||||
if self._hon_handler is not None:
|
||||
@ -237,24 +259,34 @@ class HonAPI:
|
||||
|
||||
|
||||
class TestAPI(HonAPI):
|
||||
def __init__(self, path):
|
||||
def __init__(self, path: Path):
|
||||
super().__init__()
|
||||
self._anonymous = True
|
||||
self._path: Path = path
|
||||
|
||||
def _load_json(self, appliance: HonAppliance, file) -> Dict[str, Any]:
|
||||
def _load_json(self, appliance: HonAppliance, file: str) -> Dict[str, Any]:
|
||||
directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower()
|
||||
path = f"{self._path}/{directory}/{file}.json"
|
||||
if not (path := self._path / directory / f"{file}.json").exists():
|
||||
_LOGGER.warning("Can't open %s", str(path))
|
||||
return {}
|
||||
with open(path, "r", encoding="utf-8") as json_file:
|
||||
return json.loads(json_file.read())
|
||||
text = json_file.read()
|
||||
try:
|
||||
data: Dict[str, Any] = json.loads(text)
|
||||
return data
|
||||
except json.decoder.JSONDecodeError as error:
|
||||
_LOGGER.error("%s - %s", str(path), error)
|
||||
return {}
|
||||
|
||||
async def load_appliances(self) -> List[Dict[str, Any]]:
|
||||
result = []
|
||||
for appliance in self._path.glob("*/"):
|
||||
with open(
|
||||
appliance / "appliance_data.json", "r", encoding="utf-8"
|
||||
) as json_file:
|
||||
result.append(json.loads(json_file.read()))
|
||||
file = appliance / "appliance_data.json"
|
||||
with open(file, "r", encoding="utf-8") as json_file:
|
||||
try:
|
||||
result.append(json.loads(json_file.read()))
|
||||
except json.decoder.JSONDecodeError as error:
|
||||
_LOGGER.error("%s - %s", str(file), error)
|
||||
return result
|
||||
|
||||
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||
@ -288,7 +320,14 @@ class TestAPI(HonAPI):
|
||||
self,
|
||||
appliance: HonAppliance,
|
||||
command: str,
|
||||
parameters: Dict,
|
||||
ancillary_parameters: Dict,
|
||||
parameters: Dict[str, Any],
|
||||
ancillary_parameters: Dict[str, Any],
|
||||
program_name: str = "",
|
||||
) -> bool:
|
||||
_LOGGER.info(
|
||||
"%s - %s - %s",
|
||||
str(parameters),
|
||||
str(ancillary_parameters),
|
||||
str(program_name),
|
||||
)
|
||||
return True
|
||||
|
@ -6,14 +6,16 @@ import urllib
|
||||
from contextlib import suppress
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Optional
|
||||
from typing import Dict, Optional, Any, List
|
||||
from urllib import parse
|
||||
from urllib.parse import quote
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import ClientResponse
|
||||
from yarl import URL
|
||||
|
||||
from pyhon import const, exceptions
|
||||
from pyhon.connection.device import HonDevice
|
||||
from pyhon.connection.handler.auth import HonAuthConnectionHandler
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -25,41 +27,52 @@ class HonLoginData:
|
||||
email: str = ""
|
||||
password: str = ""
|
||||
fw_uid: str = ""
|
||||
loaded: Optional[Dict] = None
|
||||
loaded: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class HonAuthData:
|
||||
access_token: str = ""
|
||||
refresh_token: str = ""
|
||||
cognito_token: str = ""
|
||||
id_token: str = ""
|
||||
|
||||
|
||||
class HonAuth:
|
||||
_TOKEN_EXPIRES_AFTER_HOURS = 8
|
||||
_TOKEN_EXPIRE_WARNING_HOURS = 7
|
||||
|
||||
def __init__(self, session, email, password, device) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
session: aiohttp.ClientSession,
|
||||
email: str,
|
||||
password: str,
|
||||
device: HonDevice,
|
||||
) -> None:
|
||||
self._session = session
|
||||
self._request = HonAuthConnectionHandler(session)
|
||||
self._login_data = HonLoginData()
|
||||
self._login_data.email = email
|
||||
self._login_data.password = password
|
||||
self._access_token = ""
|
||||
self._refresh_token = ""
|
||||
self._cognito_token = ""
|
||||
self._id_token = ""
|
||||
self._device = device
|
||||
self._expires: datetime = datetime.utcnow()
|
||||
self._auth = HonAuthData()
|
||||
|
||||
@property
|
||||
def cognito_token(self) -> str:
|
||||
return self._cognito_token
|
||||
return self._auth.cognito_token
|
||||
|
||||
@property
|
||||
def id_token(self) -> str:
|
||||
return self._id_token
|
||||
return self._auth.id_token
|
||||
|
||||
@property
|
||||
def access_token(self) -> str:
|
||||
return self._access_token
|
||||
return self._auth.access_token
|
||||
|
||||
@property
|
||||
def refresh_token(self) -> str:
|
||||
return self._refresh_token
|
||||
return self._auth.refresh_token
|
||||
|
||||
def _check_token_expiration(self, hours: int) -> bool:
|
||||
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
|
||||
@ -107,7 +120,8 @@ class HonAuth:
|
||||
async with self._request.get(url) as response:
|
||||
text = await response.text()
|
||||
self._expires = datetime.utcnow()
|
||||
if not (login_url := re.findall("url = '(.+?)'", text)):
|
||||
login_url: List[str] = re.findall("url = '(.+?)'", text)
|
||||
if not login_url:
|
||||
if "oauth/done#access_token=" in text:
|
||||
self._parse_token_data(text)
|
||||
raise exceptions.HonNoAuthenticationNeeded()
|
||||
@ -120,7 +134,7 @@ class HonAuth:
|
||||
await self._error_logger(response)
|
||||
return new_location
|
||||
|
||||
async def _handle_redirects(self, login_url) -> str:
|
||||
async def _handle_redirects(self, login_url: str) -> str:
|
||||
redirect1 = await self._manual_redirect(login_url)
|
||||
redirect2 = await self._manual_redirect(redirect1)
|
||||
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
|
||||
@ -176,18 +190,19 @@ class HonAuth:
|
||||
if response.status == 200:
|
||||
with suppress(json.JSONDecodeError, KeyError):
|
||||
result = await response.json()
|
||||
return result["events"][0]["attributes"]["values"]["url"]
|
||||
url: str = result["events"][0]["attributes"]["values"]["url"]
|
||||
return url
|
||||
await self._error_logger(response)
|
||||
return ""
|
||||
|
||||
def _parse_token_data(self, text: str) -> bool:
|
||||
if access_token := re.findall("access_token=(.*?)&", text):
|
||||
self._access_token = access_token[0]
|
||||
self._auth.access_token = access_token[0]
|
||||
if refresh_token := re.findall("refresh_token=(.*?)&", text):
|
||||
self._refresh_token = refresh_token[0]
|
||||
self._auth.refresh_token = refresh_token[0]
|
||||
if id_token := re.findall("id_token=(.*?)&", text):
|
||||
self._id_token = id_token[0]
|
||||
return True if access_token and refresh_token and id_token else False
|
||||
self._auth.id_token = id_token[0]
|
||||
return bool(access_token and refresh_token and id_token)
|
||||
|
||||
async def _get_token(self, url: str) -> bool:
|
||||
async with self._request.get(url) as response:
|
||||
@ -219,7 +234,7 @@ class HonAuth:
|
||||
return True
|
||||
|
||||
async def _api_auth(self) -> bool:
|
||||
post_headers = {"id-token": self._id_token}
|
||||
post_headers = {"id-token": self._auth.id_token}
|
||||
data = self._device.get()
|
||||
async with self._request.post(
|
||||
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
|
||||
@ -229,8 +244,8 @@ class HonAuth:
|
||||
except json.JSONDecodeError:
|
||||
await self._error_logger(response)
|
||||
return False
|
||||
self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
|
||||
if not self._cognito_token:
|
||||
self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
|
||||
if not self._auth.cognito_token:
|
||||
_LOGGER.error(json_data)
|
||||
raise exceptions.HonAuthenticationError()
|
||||
return True
|
||||
@ -252,7 +267,7 @@ class HonAuth:
|
||||
async def refresh(self) -> bool:
|
||||
params = {
|
||||
"client_id": const.CLIENT_ID,
|
||||
"refresh_token": self._refresh_token,
|
||||
"refresh_token": self._auth.refresh_token,
|
||||
"grant_type": "refresh_token",
|
||||
}
|
||||
async with self._request.post(
|
||||
@ -263,14 +278,14 @@ class HonAuth:
|
||||
return False
|
||||
data = await response.json()
|
||||
self._expires = datetime.utcnow()
|
||||
self._id_token = data["id_token"]
|
||||
self._access_token = data["access_token"]
|
||||
self._auth.id_token = data["id_token"]
|
||||
self._auth.access_token = data["access_token"]
|
||||
return await self._api_auth()
|
||||
|
||||
def clear(self) -> None:
|
||||
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
|
||||
self._request.called_urls = []
|
||||
self._cognito_token = ""
|
||||
self._id_token = ""
|
||||
self._access_token = ""
|
||||
self._refresh_token = ""
|
||||
self._auth.cognito_token = ""
|
||||
self._auth.id_token = ""
|
||||
self._auth.access_token = ""
|
||||
self._auth.refresh_token = ""
|
||||
|
@ -21,7 +21,7 @@ class HonDevice:
|
||||
return self._os_version
|
||||
|
||||
@property
|
||||
def os(self) -> str:
|
||||
def os_type(self) -> str:
|
||||
return self._os
|
||||
|
||||
@property
|
||||
@ -32,12 +32,14 @@ class HonDevice:
|
||||
def mobile_id(self) -> str:
|
||||
return self._mobile_id
|
||||
|
||||
def get(self, mobile: bool = False) -> Dict:
|
||||
result = {
|
||||
def get(self, mobile: bool = False) -> Dict[str, str | int]:
|
||||
result: Dict[str, str | int] = {
|
||||
"appVersion": self.app_version,
|
||||
"mobileId": self.mobile_id,
|
||||
"os": self.os,
|
||||
"os": self.os_type,
|
||||
"osVersion": self.os_version,
|
||||
"deviceModel": self.device_model,
|
||||
}
|
||||
return (result | {"mobileOs": result.pop("os")}) if mobile else result
|
||||
if mobile:
|
||||
result |= {"mobileOs": result.pop("os", "")}
|
||||
return result
|
||||
|
@ -1,21 +1,27 @@
|
||||
import logging
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Callable, Dict
|
||||
from typing import Dict, Any
|
||||
|
||||
import aiohttp
|
||||
from yarl import URL
|
||||
|
||||
from pyhon import const
|
||||
from pyhon.connection.handler.base import ConnectionHandler
|
||||
from pyhon.typedefs import Callback
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HonAnonymousConnectionHandler(ConnectionHandler):
|
||||
_HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
|
||||
_HEADERS: Dict[str, str] = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
|
||||
|
||||
@asynccontextmanager
|
||||
async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator:
|
||||
async def _intercept(
|
||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
|
||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
|
||||
async with method(*args, **kwargs) as response:
|
||||
async with method(url, *args, **kwargs) as response:
|
||||
if response.status == 403:
|
||||
_LOGGER.error("Can't authenticate anymore")
|
||||
yield response
|
||||
|
@ -1,12 +1,14 @@
|
||||
import logging
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional, Callable, List, Tuple
|
||||
from typing import Optional, List, Tuple, Any, Dict
|
||||
|
||||
import aiohttp
|
||||
from yarl import URL
|
||||
|
||||
from pyhon import const
|
||||
from pyhon.connection.handler.base import ConnectionHandler
|
||||
from pyhon.typedefs import Callback
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -28,9 +30,9 @@ class HonAuthConnectionHandler(ConnectionHandler):
|
||||
|
||||
@asynccontextmanager
|
||||
async def _intercept(
|
||||
self, method: Callable, *args, loop: int = 0, **kwargs
|
||||
) -> AsyncIterator:
|
||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
|
||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
|
||||
async with method(*args, **kwargs) as response:
|
||||
self._called_urls.append((response.status, response.request_info.url))
|
||||
async with method(url, *args, **kwargs) as response:
|
||||
self._called_urls.append((response.status, str(response.request_info.url)))
|
||||
yield response
|
||||
|
@ -1,18 +1,21 @@
|
||||
import logging
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional, Callable, Dict
|
||||
from types import TracebackType
|
||||
from typing import Optional, Dict, Type, Any
|
||||
|
||||
import aiohttp
|
||||
from typing_extensions import Self
|
||||
from yarl import URL
|
||||
|
||||
from pyhon import const, exceptions
|
||||
from pyhon.typedefs import Callback
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConnectionHandler:
|
||||
_HEADERS: Dict = {
|
||||
_HEADERS: Dict[str, str] = {
|
||||
"user-agent": const.USER_AGENT,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
@ -24,32 +27,52 @@ class ConnectionHandler:
|
||||
async def __aenter__(self) -> Self:
|
||||
return await self.create()
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
await self.close()
|
||||
|
||||
@property
|
||||
def session(self) -> aiohttp.ClientSession:
|
||||
if self._session is None:
|
||||
raise exceptions.NoSessionException
|
||||
return self._session
|
||||
|
||||
async def create(self) -> Self:
|
||||
if self._create_session:
|
||||
self._session = aiohttp.ClientSession()
|
||||
return self
|
||||
|
||||
@asynccontextmanager
|
||||
def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
@asynccontextmanager
|
||||
async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
if self._session is None:
|
||||
raise exceptions.NoSessionException()
|
||||
response: aiohttp.ClientResponse
|
||||
async with self._intercept(self._session.get, *args, **kwargs) as response:
|
||||
async def _intercept(
|
||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
|
||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
async with method(url, *args, **kwargs) as response:
|
||||
yield response
|
||||
|
||||
@asynccontextmanager
|
||||
async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
async def get(
|
||||
self, *args: Any, **kwargs: Any
|
||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
if self._session is None:
|
||||
raise exceptions.NoSessionException()
|
||||
response: aiohttp.ClientResponse
|
||||
async with self._intercept(self._session.post, *args, **kwargs) as response:
|
||||
args = self._session.get, *args
|
||||
async with self._intercept(*args, **kwargs) as response:
|
||||
yield response
|
||||
|
||||
@asynccontextmanager
|
||||
async def post(
|
||||
self, *args: Any, **kwargs: Any
|
||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
if self._session is None:
|
||||
raise exceptions.NoSessionException()
|
||||
response: aiohttp.ClientResponse
|
||||
args = self._session.post, *args
|
||||
async with self._intercept(*args, **kwargs) as response:
|
||||
yield response
|
||||
|
||||
async def close(self) -> None:
|
||||
|
@ -2,15 +2,17 @@ import json
|
||||
import logging
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional, Callable, Dict
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
import aiohttp
|
||||
from typing_extensions import Self
|
||||
from yarl import URL
|
||||
|
||||
from pyhon.connection.auth import HonAuth
|
||||
from pyhon.connection.device import HonDevice
|
||||
from pyhon.connection.handler.base import ConnectionHandler
|
||||
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException
|
||||
from pyhon.typedefs import Callback
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -41,10 +43,10 @@ class HonConnectionHandler(ConnectionHandler):
|
||||
|
||||
async def create(self) -> Self:
|
||||
await super().create()
|
||||
self._auth = HonAuth(self._session, self._email, self._password, self._device)
|
||||
self._auth = HonAuth(self.session, self._email, self._password, self._device)
|
||||
return self
|
||||
|
||||
async def _check_headers(self, headers: Dict) -> Dict:
|
||||
async def _check_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
|
||||
if not (self.auth.cognito_token and self.auth.id_token):
|
||||
await self.auth.authenticate()
|
||||
headers["cognito-token"] = self.auth.cognito_token
|
||||
@ -53,17 +55,18 @@ class HonConnectionHandler(ConnectionHandler):
|
||||
|
||||
@asynccontextmanager
|
||||
async def _intercept(
|
||||
self, method: Callable, *args, loop: int = 0, **kwargs
|
||||
) -> AsyncIterator:
|
||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Any
|
||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
loop: int = kwargs.pop("loop", 0)
|
||||
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
|
||||
async with method(*args, **kwargs) as response:
|
||||
async with method(url, *args, **kwargs) as response:
|
||||
if (
|
||||
self.auth.token_expires_soon or response.status in [401, 403]
|
||||
) and loop == 0:
|
||||
_LOGGER.info("Try refreshing token...")
|
||||
await self.auth.refresh()
|
||||
async with self._intercept(
|
||||
method, *args, loop=loop + 1, **kwargs
|
||||
method, url, *args, loop=loop + 1, **kwargs
|
||||
) as result:
|
||||
yield result
|
||||
elif (
|
||||
@ -77,7 +80,7 @@ class HonConnectionHandler(ConnectionHandler):
|
||||
)
|
||||
await self.create()
|
||||
async with self._intercept(
|
||||
method, *args, loop=loop + 1, **kwargs
|
||||
method, url, *args, loop=loop + 1, **kwargs
|
||||
) as result:
|
||||
yield result
|
||||
elif loop >= 2:
|
||||
@ -92,11 +95,11 @@ class HonConnectionHandler(ConnectionHandler):
|
||||
try:
|
||||
await response.json()
|
||||
yield response
|
||||
except json.JSONDecodeError:
|
||||
except json.JSONDecodeError as exc:
|
||||
_LOGGER.warning(
|
||||
"%s - JsonDecodeError %s - %s",
|
||||
response.request_info.url,
|
||||
response.status,
|
||||
await response.text(),
|
||||
)
|
||||
raise HonAuthenticationError("Decode Error")
|
||||
raise HonAuthenticationError("Decode Error") from exc
|
||||
|
@ -2,9 +2,11 @@ AUTH_API = "https://account2.hon-smarthome.com"
|
||||
API_URL = "https://api-iot.he.services"
|
||||
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
|
||||
APP = "hon"
|
||||
# All seen id's (different accounts, different devices) are the same, so I guess this hash is static
|
||||
CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
|
||||
APP_VERSION = "2.0.10"
|
||||
CLIENT_ID = (
|
||||
"3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9."
|
||||
"HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
|
||||
)
|
||||
APP_VERSION = "2.4.7"
|
||||
OS_VERSION = 31
|
||||
OS = "android"
|
||||
DEVICE_MODEL = "exynos9820"
|
||||
|
@ -5,7 +5,7 @@ import shutil
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, List, Tuple
|
||||
|
||||
from pyhon import helper
|
||||
from pyhon import printer
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon.appliance import HonAppliance
|
||||
@ -29,7 +29,7 @@ def anonymize_data(data: str) -> str:
|
||||
for match in re.findall(f'"{sensible}.*?":\\s"?(.+?)"?,?\\n', data):
|
||||
replace = re.sub("[a-z]", "x", match)
|
||||
replace = re.sub("[A-Z]", "X", replace)
|
||||
replace = re.sub("\\d", "0", replace)
|
||||
replace = re.sub("\\d", "1", replace)
|
||||
data = data.replace(match, replace)
|
||||
return data
|
||||
|
||||
@ -38,7 +38,7 @@ async def load_data(appliance: "HonAppliance", topic: str) -> Tuple[str, str]:
|
||||
return topic, await getattr(appliance.api, f"load_{topic}")(appliance)
|
||||
|
||||
|
||||
def write_to_json(data: str, topic: str, path: Path, anonymous: bool = False):
|
||||
def write_to_json(data: str, topic: str, path: Path, anonymous: bool = False) -> Path:
|
||||
json_data = json.dumps(data, indent=4)
|
||||
if anonymous:
|
||||
json_data = anonymize_data(json_data)
|
||||
@ -65,14 +65,17 @@ async def appliance_data(
|
||||
return [write_to_json(data, topic, path, anonymous) for topic, data in api_data]
|
||||
|
||||
|
||||
async def zip_archive(appliance: "HonAppliance", path: Path, anonymous: bool = False):
|
||||
async def zip_archive(
|
||||
appliance: "HonAppliance", path: Path, anonymous: bool = False
|
||||
) -> str:
|
||||
data = await appliance_data(appliance, path, anonymous)
|
||||
shutil.make_archive(str(path), "zip", path)
|
||||
shutil.rmtree(path)
|
||||
return f"{data[0].parent.stem}.zip"
|
||||
archive = data[0].parent
|
||||
shutil.make_archive(str(archive), "zip", archive)
|
||||
shutil.rmtree(archive)
|
||||
return f"{archive.stem}.zip"
|
||||
|
||||
|
||||
def yaml_export(appliance: "HonAppliance", anonymous=False) -> str:
|
||||
def yaml_export(appliance: "HonAppliance", anonymous: bool = False) -> str:
|
||||
data = {
|
||||
"attributes": appliance.attributes.copy(),
|
||||
"appliance": appliance.info,
|
||||
@ -86,12 +89,11 @@ def yaml_export(appliance: "HonAppliance", anonymous=False) -> str:
|
||||
if anonymous:
|
||||
for sensible in ["serialNumber", "coords"]:
|
||||
data.get("appliance", {}).pop(sensible, None)
|
||||
data = {
|
||||
"data": data,
|
||||
"commands": helper.create_command(appliance.commands),
|
||||
"rules": helper.create_rules(appliance.commands),
|
||||
}
|
||||
result = helper.pretty_print(data)
|
||||
result = printer.pretty_print({"data": data})
|
||||
if commands := printer.create_commands(appliance.commands):
|
||||
result += printer.pretty_print({"commands": commands})
|
||||
if rules := printer.create_rules(appliance.commands):
|
||||
result += printer.pretty_print({"rules": rules})
|
||||
if anonymous:
|
||||
result = anonymize_data(result)
|
||||
return result
|
||||
|
@ -1,80 +1,3 @@
|
||||
def key_print(data, key="", start=True):
|
||||
result = ""
|
||||
if isinstance(data, list):
|
||||
for i, value in enumerate(data):
|
||||
result += key_print(value, key=f"{key}.{i}", start=False)
|
||||
elif isinstance(data, dict):
|
||||
for k, value in sorted(data.items()):
|
||||
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
|
||||
else:
|
||||
result += f"{key}: {data}\n"
|
||||
return result
|
||||
|
||||
|
||||
# yaml.dump() would be done the same, but needs an additional dependency...
|
||||
def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "):
|
||||
result = ""
|
||||
if isinstance(data, list):
|
||||
if key:
|
||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
|
||||
intend += 1
|
||||
for i, value in enumerate(data):
|
||||
result += pretty_print(
|
||||
value, intend=intend, is_list=True, whitespace=whitespace
|
||||
)
|
||||
elif isinstance(data, dict):
|
||||
if key:
|
||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
|
||||
intend += 1
|
||||
for i, (key, value) in enumerate(sorted(data.items())):
|
||||
if is_list and not i:
|
||||
result += pretty_print(
|
||||
value, key=key, intend=intend, is_list=True, whitespace=whitespace
|
||||
)
|
||||
elif is_list:
|
||||
result += pretty_print(
|
||||
value, key=key, intend=intend + 1, whitespace=whitespace
|
||||
)
|
||||
else:
|
||||
result += pretty_print(
|
||||
value, key=key, intend=intend, whitespace=whitespace
|
||||
)
|
||||
else:
|
||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
|
||||
return result
|
||||
|
||||
|
||||
def create_command(commands, concat=False):
|
||||
result = {}
|
||||
for name, command in commands.items():
|
||||
for parameter, data in command.available_settings.items():
|
||||
if data.typology == "enum":
|
||||
value = data.values
|
||||
elif data.typology == "range":
|
||||
value = {"min": data.min, "max": data.max, "step": data.step}
|
||||
else:
|
||||
continue
|
||||
if not concat:
|
||||
result.setdefault(name, {})[parameter] = value
|
||||
else:
|
||||
result[f"{name}.{parameter}"] = value
|
||||
return result
|
||||
|
||||
|
||||
def create_rules(commands, concat=False):
|
||||
result = {}
|
||||
for name, command in commands.items():
|
||||
for parameter, data in command.available_settings.items():
|
||||
value = data.triggers
|
||||
if not value:
|
||||
continue
|
||||
if not concat:
|
||||
result.setdefault(name, {})[parameter] = value
|
||||
else:
|
||||
result[f"{name}.{parameter}"] = value
|
||||
return result
|
||||
|
||||
|
||||
def str_to_float(string: str | float) -> float:
|
||||
try:
|
||||
return int(string)
|
||||
|
@ -7,9 +7,10 @@ from typing import List, Optional, Dict, Any, Type
|
||||
from aiohttp import ClientSession
|
||||
from typing_extensions import Self
|
||||
|
||||
from pyhon import HonAPI, exceptions
|
||||
from pyhon.appliance import HonAppliance
|
||||
from pyhon.connection.api import HonAPI
|
||||
from pyhon.connection.api import TestAPI
|
||||
from pyhon.exceptions import NoAuthenticationException
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -43,7 +44,7 @@ class Hon:
|
||||
@property
|
||||
def api(self) -> HonAPI:
|
||||
if self._api is None:
|
||||
raise exceptions.NoAuthenticationException
|
||||
raise NoAuthenticationException
|
||||
return self._api
|
||||
|
||||
@property
|
||||
@ -70,11 +71,11 @@ class Hon:
|
||||
return self._appliances
|
||||
|
||||
@appliances.setter
|
||||
def appliances(self, appliances) -> None:
|
||||
def appliances(self, appliances: List[HonAppliance]) -> None:
|
||||
self._appliances = appliances
|
||||
|
||||
async def _create_appliance(
|
||||
self, appliance_data: Dict[str, Any], api: HonAPI, zone=0
|
||||
self, appliance_data: Dict[str, Any], api: HonAPI, zone: int = 0
|
||||
) -> None:
|
||||
appliance = HonAppliance(api, appliance_data, zone=zone)
|
||||
if appliance.mac_address == "":
|
||||
|
@ -7,12 +7,21 @@ if TYPE_CHECKING:
|
||||
class HonParameter:
|
||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||
self._key = key
|
||||
self._category: str = attributes.get("category", "")
|
||||
self._typology: str = attributes.get("typology", "")
|
||||
self._mandatory: int = attributes.get("mandatory", 0)
|
||||
self._attributes = attributes
|
||||
self._category: str = ""
|
||||
self._typology: str = ""
|
||||
self._mandatory: int = 0
|
||||
self._value: str | float = ""
|
||||
self._group: str = group
|
||||
self._triggers: Dict[str, List[Tuple[Callable, "HonRule"]]] = {}
|
||||
self._triggers: Dict[
|
||||
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
|
||||
] = {}
|
||||
self._set_attributes()
|
||||
|
||||
def _set_attributes(self) -> None:
|
||||
self._category = self._attributes.get("category", "")
|
||||
self._typology = self._attributes.get("typology", "")
|
||||
self._mandatory = self._attributes.get("mandatory", 0)
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
@ -51,20 +60,23 @@ class HonParameter:
|
||||
def group(self) -> str:
|
||||
return self._group
|
||||
|
||||
def add_trigger(self, value, func, data):
|
||||
def add_trigger(
|
||||
self, value: str, func: Callable[["HonRule"], None], data: "HonRule"
|
||||
) -> None:
|
||||
if self._value == value:
|
||||
func(data)
|
||||
self._triggers.setdefault(value, []).append((func, data))
|
||||
|
||||
def check_trigger(self, value) -> None:
|
||||
if str(value) in self._triggers:
|
||||
for trigger in self._triggers[str(value)]:
|
||||
def check_trigger(self, value: str | float) -> None:
|
||||
triggers = {str(k).lower(): v for k, v in self._triggers.items()}
|
||||
if str(value).lower() in triggers:
|
||||
for trigger in triggers[str(value)]:
|
||||
func, args = trigger
|
||||
func(args)
|
||||
|
||||
@property
|
||||
def triggers(self):
|
||||
result = {}
|
||||
def triggers(self) -> Dict[str, Any]:
|
||||
result: Dict[str, Any] = {}
|
||||
for value, rules in self._triggers.items():
|
||||
for _, rule in rules:
|
||||
if rule.extras:
|
||||
@ -81,3 +93,6 @@ class HonParameter:
|
||||
param[rule.param_key] = rule.param_data.get("defaultValue", "")
|
||||
|
||||
return result
|
||||
|
||||
def reset(self) -> None:
|
||||
self._set_attributes()
|
||||
|
@ -3,19 +3,26 @@ from typing import Dict, Any, List
|
||||
from pyhon.parameter.base import HonParameter
|
||||
|
||||
|
||||
def clean_value(value):
|
||||
def clean_value(value: str | float) -> str:
|
||||
return str(value).strip("[]").replace("|", "_").lower()
|
||||
|
||||
|
||||
class HonParameterEnum(HonParameter):
|
||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||
super().__init__(key, attributes, group)
|
||||
self._default = attributes.get("defaultValue")
|
||||
self._value = self._default or "0"
|
||||
self._values: List[str] = attributes.get("enumValues", [])
|
||||
self._default: str | float = ""
|
||||
self._value: str | float = ""
|
||||
self._values: List[str] = []
|
||||
self._set_attributes()
|
||||
if self._default and clean_value(self._default.strip("[]")) not in self.values:
|
||||
self._values.append(self._default)
|
||||
|
||||
def _set_attributes(self) -> None:
|
||||
super()._set_attributes()
|
||||
self._default = self._attributes.get("defaultValue", "")
|
||||
self._value = self._default or "0"
|
||||
self._values = self._attributes.get("enumValues", [])
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__} (<{self.key}> {self.values})"
|
||||
|
||||
@ -24,7 +31,7 @@ class HonParameterEnum(HonParameter):
|
||||
return [clean_value(value) for value in self._values]
|
||||
|
||||
@values.setter
|
||||
def values(self, values) -> None:
|
||||
def values(self, values: List[str]) -> None:
|
||||
self._values = values
|
||||
|
||||
@property
|
||||
@ -41,4 +48,4 @@ class HonParameterEnum(HonParameter):
|
||||
self._value = value
|
||||
self.check_trigger(value)
|
||||
else:
|
||||
raise ValueError(f"Allowed values {self._values}")
|
||||
raise ValueError(f"Allowed values: {self._values} But was: {value}")
|
||||
|
@ -6,14 +6,19 @@ from pyhon.parameter.base import HonParameter
|
||||
class HonParameterFixed(HonParameter):
|
||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||
super().__init__(key, attributes, group)
|
||||
self._value = attributes.get("fixedValue", None)
|
||||
self._value: str | float = ""
|
||||
self._set_attributes()
|
||||
|
||||
def _set_attributes(self) -> None:
|
||||
super()._set_attributes()
|
||||
self._value = self._attributes.get("fixedValue", "")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__} (<{self.key}> fixed)"
|
||||
|
||||
@property
|
||||
def value(self) -> str | float:
|
||||
return self._value if self._value is not None else "0"
|
||||
return self._value if self._value != "" else "0"
|
||||
|
||||
@value.setter
|
||||
def value(self, value: str | float) -> None:
|
||||
|
@ -28,7 +28,7 @@ class HonParameterProgram(HonParameterEnum):
|
||||
if value in self.values:
|
||||
self._command.category = value
|
||||
else:
|
||||
raise ValueError(f"Allowed values {self.values}")
|
||||
raise ValueError(f"Allowed values: {self.values} But was: {value}")
|
||||
|
||||
@property
|
||||
def values(self) -> List[str]:
|
||||
@ -36,19 +36,21 @@ class HonParameterProgram(HonParameterEnum):
|
||||
return sorted(values)
|
||||
|
||||
@values.setter
|
||||
def values(self, values) -> None:
|
||||
return
|
||||
def values(self, values: List[str]) -> None:
|
||||
raise ValueError("Cant set values {values}")
|
||||
|
||||
@property
|
||||
def ids(self) -> Dict[int, str]:
|
||||
values = {
|
||||
int(p.parameters["prCode"].value): n
|
||||
for i, (n, p) in enumerate(self._programs.items())
|
||||
if "iot_" not in n
|
||||
and p.parameters.get("prCode")
|
||||
and not ((fav := p.parameters.get("favourite")) and fav.value == "1")
|
||||
}
|
||||
values: Dict[int, str] = {}
|
||||
for name, parameter in self._programs.items():
|
||||
if "iot_" in name:
|
||||
continue
|
||||
if parameter.parameters.get("prCode"):
|
||||
continue
|
||||
if (fav := parameter.parameters.get("favourite")) and fav.value == "1":
|
||||
continue
|
||||
values[int(parameter.parameters["prCode"].value)] = name
|
||||
return dict(sorted(values.items()))
|
||||
|
||||
def set_value(self, value: str):
|
||||
def set_value(self, value: str) -> None:
|
||||
self._value = value
|
||||
|
@ -7,13 +7,22 @@ from pyhon.parameter.base import HonParameter
|
||||
class HonParameterRange(HonParameter):
|
||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||
super().__init__(key, attributes, group)
|
||||
self._min: float = str_to_float(attributes["minimumValue"])
|
||||
self._max: float = str_to_float(attributes["maximumValue"])
|
||||
self._step: float = str_to_float(attributes["incrementValue"])
|
||||
self._default: float = str_to_float(attributes.get("defaultValue", self.min))
|
||||
self._value: float = self._default
|
||||
self._min: float = 0
|
||||
self._max: float = 0
|
||||
self._step: float = 0
|
||||
self._default: float = 0
|
||||
self._value: float = 0
|
||||
self._set_attributes()
|
||||
|
||||
def __repr__(self):
|
||||
def _set_attributes(self) -> None:
|
||||
super()._set_attributes()
|
||||
self._min = str_to_float(self._attributes.get("minimumValue", 0))
|
||||
self._max = str_to_float(self._attributes.get("maximumValue", 0))
|
||||
self._step = str_to_float(self._attributes.get("incrementValue", 0))
|
||||
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
|
||||
self._value = self._default
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
|
||||
|
||||
@property
|
||||
@ -49,11 +58,14 @@ class HonParameterRange(HonParameter):
|
||||
@value.setter
|
||||
def value(self, value: str | float) -> None:
|
||||
value = str_to_float(value)
|
||||
if self.min <= value <= self.max and not (value - self.min) % self.step:
|
||||
if self.min <= value <= self.max and not ((value - self.min) * 100) % (
|
||||
self.step * 100
|
||||
):
|
||||
self._value = value
|
||||
self.check_trigger(value)
|
||||
else:
|
||||
raise ValueError(f"Allowed: min {self.min} max {self.max} step {self.step}")
|
||||
allowed = f"min {self.min} max {self.max} step {self.step}"
|
||||
raise ValueError(f"Allowed: {allowed} But was: {value}")
|
||||
|
||||
@property
|
||||
def values(self) -> List[str]:
|
||||
|
87
pyhon/printer.py
Normal file
87
pyhon/printer.py
Normal file
@ -0,0 +1,87 @@
|
||||
from typing import Dict, Any, TYPE_CHECKING, List
|
||||
|
||||
from pyhon.parameter.enum import HonParameterEnum
|
||||
from pyhon.parameter.range import HonParameterRange
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon.commands import HonCommand
|
||||
|
||||
|
||||
def key_print(data: Any, key: str = "", start: bool = True) -> str:
|
||||
result = ""
|
||||
if isinstance(data, list):
|
||||
for i, value in enumerate(data):
|
||||
result += key_print(value, key=f"{key}.{i}", start=False)
|
||||
elif isinstance(data, dict):
|
||||
for k, value in sorted(data.items()):
|
||||
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
|
||||
else:
|
||||
result += f"{key}: {data}\n"
|
||||
return result
|
||||
|
||||
|
||||
# yaml.dump() would be done the same, but needs an additional dependency...
|
||||
def pretty_print(
|
||||
data: Any,
|
||||
key: str = "",
|
||||
intend: int = 0,
|
||||
is_list: bool = False,
|
||||
whitespace: str = " ",
|
||||
) -> str:
|
||||
result = ""
|
||||
space = whitespace * intend
|
||||
if isinstance(data, (dict, list)) and key:
|
||||
result += f"{space}{'- ' if is_list else ''}{key}:\n"
|
||||
intend += 1
|
||||
if isinstance(data, list):
|
||||
for i, value in enumerate(data):
|
||||
result += pretty_print(
|
||||
value, intend=intend, is_list=True, whitespace=whitespace
|
||||
)
|
||||
elif isinstance(data, dict):
|
||||
for i, (list_key, value) in enumerate(sorted(data.items())):
|
||||
result += pretty_print(
|
||||
value,
|
||||
key=list_key,
|
||||
intend=intend + (is_list if i else 0),
|
||||
is_list=is_list and not i,
|
||||
whitespace=whitespace,
|
||||
)
|
||||
else:
|
||||
result += f"{space}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
|
||||
return result
|
||||
|
||||
|
||||
def create_commands(
|
||||
commands: Dict[str, "HonCommand"], concat: bool = False
|
||||
) -> Dict[str, Any]:
|
||||
result: Dict[str, Any] = {}
|
||||
for name, command in commands.items():
|
||||
for parameter, data in command.available_settings.items():
|
||||
if isinstance(data, HonParameterEnum):
|
||||
value: List[str] | Dict[str, str | float] = data.values
|
||||
elif isinstance(data, HonParameterRange):
|
||||
value = {"min": data.min, "max": data.max, "step": data.step}
|
||||
else:
|
||||
continue
|
||||
if not concat:
|
||||
result.setdefault(name, {})[parameter] = value
|
||||
else:
|
||||
result[f"{name}.{parameter}"] = value
|
||||
return result
|
||||
|
||||
|
||||
def create_rules(
|
||||
commands: Dict[str, "HonCommand"], concat: bool = False
|
||||
) -> Dict[str, Any]:
|
||||
result: Dict[str, Any] = {}
|
||||
for name, command in commands.items():
|
||||
for parameter, data in command.available_settings.items():
|
||||
value = data.triggers
|
||||
if not value:
|
||||
continue
|
||||
if not concat:
|
||||
result.setdefault(name, {})[parameter] = value
|
||||
else:
|
||||
result[f"{name}.{parameter}"] = value
|
||||
return result
|
0
pyhon/py.typed
Normal file
0
pyhon/py.typed
Normal file
102
pyhon/rules.py
102
pyhon/rules.py
@ -3,9 +3,11 @@ from typing import List, Dict, TYPE_CHECKING, Any, Optional
|
||||
|
||||
from pyhon.parameter.enum import HonParameterEnum
|
||||
from pyhon.parameter.range import HonParameterRange
|
||||
from pyhon.typedefs import Parameter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon.commands import HonCommand
|
||||
from pyhon.parameter.base import HonParameter
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -18,18 +20,28 @@ class HonRule:
|
||||
|
||||
|
||||
class HonRuleSet:
|
||||
def __init__(self, command: "HonCommand", rule):
|
||||
def __init__(self, command: "HonCommand", rule: Dict[str, Any]):
|
||||
self._command: "HonCommand" = command
|
||||
self._rules: Dict[str, List[HonRule]] = {}
|
||||
self._parse_rule(rule)
|
||||
|
||||
def _parse_rule(self, rule):
|
||||
@property
|
||||
def rules(self) -> Dict[str, List[HonRule]]:
|
||||
return self._rules
|
||||
|
||||
def _parse_rule(self, rule: Dict[str, Any]) -> None:
|
||||
for param_key, params in rule.items():
|
||||
param_key = self._command.appliance.options.get(param_key, param_key)
|
||||
for trigger_key, trigger_data in params.items():
|
||||
self._parse_conditions(param_key, trigger_key, trigger_data)
|
||||
|
||||
def _parse_conditions(self, param_key, trigger_key, trigger_data, extra=None):
|
||||
def _parse_conditions(
|
||||
self,
|
||||
param_key: str,
|
||||
trigger_key: str,
|
||||
trigger_data: Dict[str, Any],
|
||||
extra: Optional[Dict[str, str]] = None,
|
||||
) -> None:
|
||||
trigger_key = trigger_key.replace("@", "")
|
||||
trigger_key = self._command.appliance.options.get(trigger_key, trigger_key)
|
||||
for multi_trigger_value, param_data in trigger_data.items():
|
||||
@ -44,18 +56,28 @@ class HonRuleSet:
|
||||
extra[trigger_key] = trigger_value
|
||||
for extra_key, extra_data in param_data.items():
|
||||
self._parse_conditions(param_key, extra_key, extra_data, extra)
|
||||
else:
|
||||
param_data = {"typology": "fixed", "fixedValue": param_data}
|
||||
self._create_rule(
|
||||
param_key, trigger_key, trigger_value, param_data, extra
|
||||
)
|
||||
|
||||
def _create_rule(
|
||||
self, param_key, trigger_key, trigger_value, param_data, extras=None
|
||||
):
|
||||
self,
|
||||
param_key: str,
|
||||
trigger_key: str,
|
||||
trigger_value: str,
|
||||
param_data: Dict[str, Any],
|
||||
extras: Optional[Dict[str, str]] = None,
|
||||
) -> None:
|
||||
if param_data.get("fixedValue") == f"@{param_key}":
|
||||
return
|
||||
self._rules.setdefault(trigger_key, []).append(
|
||||
HonRule(trigger_key, trigger_value, param_key, param_data, extras)
|
||||
)
|
||||
|
||||
def _duplicate_for_extra_conditions(self):
|
||||
new = {}
|
||||
def _duplicate_for_extra_conditions(self) -> None:
|
||||
new: Dict[str, List[HonRule]] = {}
|
||||
for rules in self._rules.values():
|
||||
for rule in rules:
|
||||
if rule.extras is None:
|
||||
@ -71,35 +93,53 @@ class HonRuleSet:
|
||||
for rule in rules:
|
||||
self._rules.setdefault(key, []).append(rule)
|
||||
|
||||
def _add_trigger(self, parameter, data):
|
||||
def apply(rule: HonRule):
|
||||
if rule.extras is not None:
|
||||
for key, value in rule.extras.items():
|
||||
if str(self._command.parameters.get(key)) != str(value):
|
||||
return
|
||||
if param := self._command.parameters.get(rule.param_key):
|
||||
if value := rule.param_data.get("fixedValue", ""):
|
||||
if isinstance(param, HonParameterEnum) and set(param.values) != {
|
||||
str(value)
|
||||
}:
|
||||
param.values = [str(value)]
|
||||
elif isinstance(param, HonParameterRange):
|
||||
param.value = float(value)
|
||||
return
|
||||
param.value = str(value)
|
||||
elif rule.param_data.get("typology") == "enum":
|
||||
if isinstance(param, HonParameterEnum):
|
||||
if enum_values := rule.param_data.get("enumValues"):
|
||||
param.values = enum_values.split("|")
|
||||
if default_value := rule.param_data.get("defaultValue"):
|
||||
param.value = default_value
|
||||
def _extra_rules_matches(self, rule: HonRule) -> bool:
|
||||
if rule.extras:
|
||||
for key, value in rule.extras.items():
|
||||
if not self._command.parameters.get(key):
|
||||
return False
|
||||
if str(self._command.parameters.get(key)) != str(value):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _apply_fixed(self, param: Parameter, value: str | float) -> None:
|
||||
if isinstance(param, HonParameterEnum) and set(param.values) != {str(value)}:
|
||||
param.values = [str(value)]
|
||||
param.value = str(value)
|
||||
elif isinstance(param, HonParameterRange):
|
||||
if float(value) < param.min:
|
||||
param.min = float(value)
|
||||
elif float(value) > param.max:
|
||||
param.max = float(value)
|
||||
param.value = float(value)
|
||||
return
|
||||
param.value = str(value)
|
||||
|
||||
def _apply_enum(self, param: Parameter, rule: HonRule) -> None:
|
||||
if not isinstance(param, HonParameterEnum):
|
||||
return
|
||||
if enum_values := rule.param_data.get("enumValues"):
|
||||
param.values = enum_values.split("|")
|
||||
if default_value := rule.param_data.get("defaultValue"):
|
||||
param.value = default_value
|
||||
|
||||
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
|
||||
def apply(rule: HonRule) -> None:
|
||||
if not self._extra_rules_matches(rule):
|
||||
return
|
||||
if not (param := self._command.parameters.get(rule.param_key)):
|
||||
return
|
||||
if fixed_value := rule.param_data.get("fixedValue", ""):
|
||||
self._apply_fixed(param, fixed_value)
|
||||
elif rule.param_data.get("typology") == "enum":
|
||||
self._apply_enum(param, rule)
|
||||
|
||||
parameter.add_trigger(data.trigger_value, apply, data)
|
||||
|
||||
def patch(self):
|
||||
def patch(self) -> None:
|
||||
self._duplicate_for_extra_conditions()
|
||||
for name, parameter in self._command.parameters.items():
|
||||
if name not in self._rules:
|
||||
continue
|
||||
for data in self._rules.get(name):
|
||||
for data in self._rules.get(name, []):
|
||||
self._add_trigger(parameter, data)
|
||||
|
27
pyhon/typedefs.py
Normal file
27
pyhon/typedefs.py
Normal file
@ -0,0 +1,27 @@
|
||||
from typing import Union, Any, TYPE_CHECKING, Protocol
|
||||
|
||||
import aiohttp
|
||||
from yarl import URL
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon.parameter.base import HonParameter
|
||||
from pyhon.parameter.enum import HonParameterEnum
|
||||
from pyhon.parameter.fixed import HonParameterFixed
|
||||
from pyhon.parameter.program import HonParameterProgram
|
||||
from pyhon.parameter.range import HonParameterRange
|
||||
|
||||
|
||||
class Callback(Protocol): # pylint: disable=too-few-public-methods
|
||||
def __call__(
|
||||
self, url: str | URL, *args: Any, **kwargs: Any
|
||||
) -> aiohttp.client._RequestContextManager:
|
||||
...
|
||||
|
||||
|
||||
Parameter = Union[
|
||||
"HonParameter",
|
||||
"HonParameterRange",
|
||||
"HonParameterEnum",
|
||||
"HonParameterFixed",
|
||||
"HonParameterProgram",
|
||||
]
|
@ -1,2 +1,3 @@
|
||||
aiohttp==3.8.4
|
||||
yarl==1.8.2
|
||||
aiohttp>=3.8
|
||||
yarl>=1.8
|
||||
typing-extensions>=4.8
|
||||
|
@ -1,4 +1,5 @@
|
||||
black==23.3.0
|
||||
flake8==6.0.0
|
||||
mypy==1.2.0
|
||||
pylint==2.17.2
|
||||
black>=22.12
|
||||
flake8>=6.0
|
||||
mypy>=0.991
|
||||
pylint>=2.15
|
||||
setuptools>=62.3
|
||||
|
7
setup.py
7
setup.py
@ -2,12 +2,12 @@
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
with open("README.md", "r") as f:
|
||||
with open("README.md", "r", encoding="utf-8") as f:
|
||||
long_description = f.read()
|
||||
|
||||
setup(
|
||||
name="pyhOn",
|
||||
version="0.14.0",
|
||||
version="0.15.15",
|
||||
author="Andre Basche",
|
||||
description="Control hOn devices with python",
|
||||
long_description=long_description,
|
||||
@ -21,7 +21,7 @@ setup(
|
||||
packages=find_packages(),
|
||||
include_package_data=True,
|
||||
python_requires=">=3.10",
|
||||
install_requires=["aiohttp"],
|
||||
install_requires=["aiohttp>=3.8", "typing-extensions>=4.8", "yarl>=1.8"],
|
||||
classifiers=[
|
||||
"Development Status :: 4 - Beta",
|
||||
"Environment :: Console",
|
||||
@ -30,6 +30,7 @@ setup(
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
],
|
||||
entry_points={
|
||||
|
Loading…
Reference in New Issue
Block a user