feat: add cloud api for SmartHome app

This commit is contained in:
sususweet
2025-11-11 14:24:10 +08:00
parent 7476d6ad1d
commit 37ace3d764
3 changed files with 123 additions and 46 deletions

View File

@@ -258,6 +258,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry):
protocol=info.get(CONF_PROTOCOL) or 2, protocol=info.get(CONF_PROTOCOL) or 2,
model=info.get(CONF_MODEL), model=info.get(CONF_MODEL),
subtype=info.get(CONF_MODEL_NUMBER), subtype=info.get(CONF_MODEL_NUMBER),
manufacturer_code=info.get(CONF_MANUFACTURER_CODE),
sn=info.get(CONF_SN), sn=info.get(CONF_SN),
sn8=info.get(CONF_SN8), sn8=info.get(CONF_SN8),
lua_file=file, lua_file=file,

View File

@@ -27,6 +27,7 @@ clouds = {
"app_key": "ac21b9f9cbfe4ca5a88562ef25e2b768", "app_key": "ac21b9f9cbfe4ca5a88562ef25e2b768",
"iot_key": bytes.fromhex(format(7882822598523843940, 'x')).decode(), "iot_key": bytes.fromhex(format(7882822598523843940, 'x')).decode(),
"hmac_key": bytes.fromhex(format(117390035944627627450677220413733956185864939010425, 'x')).decode(), "hmac_key": bytes.fromhex(format(117390035944627627450677220413733956185864939010425, 'x')).decode(),
# "api_url": "https://mp-eu-prod.appsmb.com/mas/v5/app/proxy?alias=",
"api_url": "https://mp-prod.appsmb.com/mas/v5/app/proxy?alias=", "api_url": "https://mp-prod.appsmb.com/mas/v5/app/proxy?alias=",
}, },
} }
@@ -636,6 +637,55 @@ class MSmartHomeCloud(MideaCloud):
fp.write(stream) fp.write(stream)
return fnm return fnm
async def get_device_status(
self,
appliance_code: int,
device_type: int,
sn: str,
model_number: str | None,
manufacturer_code: str = "0000",
query: dict = {}
) -> dict | None:
data = {
"clientType": "1",
"appId": self.APP_ID,
"format": "2",
"deviceId": self._device_id,
"iotAppId": self.APP_ID,
"applianceMFCode": manufacturer_code,
"applianceType": "0x%02X" % device_type,
"modelNumber": model_number,
"applianceSn": self._security.aes_encrypt_with_fixed_key(sn.encode("ascii")).hex(),
"version": "0",
"encryptedType ": "2",
"applianceCode": appliance_code,
"command": {
"query": query
}
}
if response := await self._api_request(
endpoint="/v1/device/status/lua/get",
data=data
):
# 预期返回形如 { ... 状态键 ... }
return response
return None
async def send_device_control(self, appliance_code: int, control: dict, status: dict | None = None) -> bool:
data = {
"applianceCode": str(appliance_code),
"command": {
"control": control
}
}
if status and isinstance(status, dict):
data["command"]["status"] = status
response = await self._api_request(
endpoint="/v1/device/lua/control",
data=data
)
return response is not None
def get_midea_cloud(cloud_name: str, session: ClientSession, account: str, password: str) -> MideaCloud | None: def get_midea_cloud(cloud_name: str, session: ClientSession, account: str, password: str) -> MideaCloud | None:
cloud = None cloud = None

View File

@@ -3,7 +3,7 @@ import socket
import traceback import traceback
from enum import IntEnum from enum import IntEnum
from .cloud import MideaCloud from .cloud import MideaCloud, MSmartHomeCloud
from .security import LocalSecurity, MSGTYPE_HANDSHAKE_REQUEST, MSGTYPE_ENCRYPTED_REQUEST from .security import LocalSecurity, MSGTYPE_HANDSHAKE_REQUEST, MSGTYPE_ENCRYPTED_REQUEST
from .packet_builder import PacketBuilder from .packet_builder import PacketBuilder
from .message import MessageQuestCustom from .message import MessageQuestCustom
@@ -42,6 +42,7 @@ class MiedaDevice(threading.Thread):
protocol: int, protocol: int,
model: str | None, model: str | None,
subtype: int | None, subtype: int | None,
manufacturer_code: str | None,
connected: bool, connected: bool,
sn: str | None, sn: str | None,
sn8: str | None, sn8: str | None,
@@ -65,6 +66,7 @@ class MiedaDevice(threading.Thread):
self._subtype = subtype self._subtype = subtype
self._sn = sn self._sn = sn
self._sn8 = sn8 self._sn8 = sn8
self._manufacturer_code = manufacturer_code
self._attributes = { self._attributes = {
"device_type": "T0x%02X" % device_type, "device_type": "T0x%02X" % device_type,
"sn": sn, "sn": sn,
@@ -269,48 +271,70 @@ class MiedaDevice(threading.Thread):
async def refresh_status(self): async def refresh_status(self):
for query in self._queries: for query in self._queries:
if self._lua_runtime is not None: cloud = self._cloud
if query_cmd := self._lua_runtime.build_query(query): if cloud and hasattr(cloud, "get_device_status"):
await self._build_send(query_cmd) if isinstance(cloud, MSmartHomeCloud):
status = await cloud.get_device_status(
appliance_code=self._device_id,
device_type=self.device_type,
sn=self.sn,
model_number=self.subtype,
manufacturer_code=self._manufacturer_code,
query=query
)
else:
status = await cloud.get_device_status(
appliance_code=self._device_id,
query=query
)
def _parse_cloud_message(self, decrypted): self._parse_cloud_message(status)
# if self._lua_runtime is not None:
# if query_cmd := self._lua_runtime.build_query(query):
# try:
# await self._build_send(query_cmd)
# return
# except Exception as e:
# traceback.print_exc()
def _parse_cloud_message(self, status):
# MideaLogger.debug(f"Received: {decrypted}") # MideaLogger.debug(f"Received: {decrypted}")
if status := self._lua_runtime.decode_status(dec_string_to_bytes(decrypted).hex()): new_status = {}
MideaLogger.debug(f"Decoded: {status}") for single in status.keys():
new_status = {} value = status.get(single)
for single in status.keys(): if single not in self._attributes or self._attributes[single] != value:
value = status.get(single) self._attributes[single] = value
if single not in self._attributes or self._attributes[single] != value: new_status[single] = value
self._attributes[single] = value if len(new_status) > 0:
new_status[single] = value for c in self._calculate_get:
if len(new_status) > 0: lvalue = c.get("lvalue")
for c in self._calculate_get: rvalue = c.get("rvalue")
lvalue = c.get("lvalue") if lvalue and rvalue:
rvalue = c.get("rvalue") calculate = False
if lvalue and rvalue: for s, v in new_status.items():
calculate = False if rvalue.find(f"[{s}]") >= 0:
for s, v in new_status.items(): calculate = True
if rvalue.find(f"[{s}]") >= 0: break
calculate = True if calculate:
break calculate_str1 = \
if calculate: (f"{lvalue.replace('[', 'self._attributes[').replace("]", "\"]")} = "
calculate_str1 = \ f"{rvalue.replace('[', 'float(self._attributes[').replace(']', "\"])")}") \
(f"{lvalue.replace('[', 'self._attributes[').replace("]", "\"]")} = " .replace("[", "[\"")
f"{rvalue.replace('[', 'float(self._attributes[').replace(']', "\"])")}") \ calculate_str2 = \
.replace("[", "[\"") (f"{lvalue.replace('[', 'new_status[').replace("]", "\"]")} = "
calculate_str2 = \ f"{rvalue.replace('[', 'float(self._attributes[').replace(']', "\"])")}") \
(f"{lvalue.replace('[', 'new_status[').replace("]", "\"]")} = " .replace("[", "[\"")
f"{rvalue.replace('[', 'float(self._attributes[').replace(']', "\"])")}") \ try:
.replace("[", "[\"") exec(calculate_str1)
try: exec(calculate_str2)
exec(calculate_str1) except Exception as e:
exec(calculate_str2) traceback.print_exc()
except Exception as e: MideaLogger.warning(
traceback.print_exc() f"Calculation Error: {lvalue} = {rvalue}, calculate_str1: {calculate_str1}, calculate_str2: {calculate_str2}",
MideaLogger.warning( self._device_id
f"Calculation Error: {lvalue} = {rvalue}, calculate_str1: {calculate_str1}, calculate_str2: {calculate_str2}", self._device_id )
) self._update_all(new_status)
self._update_all(new_status)
return ParseMessageResult.SUCCESS return ParseMessageResult.SUCCESS
def _parse_message(self, msg): def _parse_message(self, msg):
@@ -372,11 +396,13 @@ class MiedaDevice(threading.Thread):
async def _send_message(self, data): async def _send_message(self, data):
if reply := await self._cloud.send_cloud(self._device_id, data): if reply := await self._cloud.send_cloud(self._device_id, data):
result = self._parse_cloud_message(reply) if reply_dec := self._lua_runtime.decode_status(dec_string_to_bytes(reply).hex()):
if result == ParseMessageResult.ERROR: MideaLogger.debug(f"Decoded: {reply_dec}")
MideaLogger.debug(f"Message 'ERROR' received") result = self._parse_cloud_message(reply_dec)
elif result == ParseMessageResult.SUCCESS: if result == ParseMessageResult.ERROR:
timeout_counter = 0 MideaLogger.debug(f"Message 'ERROR' received")
elif result == ParseMessageResult.SUCCESS:
timeout_counter = 0
# if self._protocol == 3: # if self._protocol == 3:
# self._send_message_v3(data, msg_type=MSGTYPE_ENCRYPTED_REQUEST) # self._send_message_v3(data, msg_type=MSGTYPE_ENCRYPTED_REQUEST)