Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions tests/test_xiaomi.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@
import zhaquirks.xiaomi.aqara.motion_acn001
import zhaquirks.xiaomi.aqara.motion_agl02
import zhaquirks.xiaomi.aqara.motion_agl04
from zhaquirks.xiaomi.aqara.motion_agl8 import (
AqaraFP300ManuCluster,
FP300DetectionRangeCluster,
)
import zhaquirks.xiaomi.aqara.motion_aq2
import zhaquirks.xiaomi.aqara.motion_aq2b
import zhaquirks.xiaomi.aqara.plug
Expand Down Expand Up @@ -2282,3 +2286,170 @@ async def test_lumi_magnet_sensor_aq2_bad_direction(zigpy_device_from_quirk, cap

# Our matching logic should be forgiving
assert listener.attribute_updates == [(0, t.Bool.true)]


def test_fp300_clusters(zigpy_device_from_v2_quirk):
"""Test Aqara FP300 clusters."""
# create device with endpoint 1 only and verify we don't get a KeyError
device = zigpy_device_from_v2_quirk(AQARA, "lumi.sensor_occupy.agl8")

assert AqaraFP300ManuCluster.cluster_id in device.endpoints[1].in_clusters
assert FP300DetectionRangeCluster.cluster_id in device.endpoints[1].in_clusters


def test_aqara_fp300_battery_from_e1_tlv(zigpy_device_from_v2_quirk):
"""Test Aqara FP300 battery reporting from Aqara E1 TLV (0x00F7)."""

device = zigpy_device_from_v2_quirk(AQARA, "lumi.sensor_occupy.agl8")

manu_cluster = device.endpoints[1].in_clusters[AqaraFP300ManuCluster.cluster_id]

power_cluster = device.endpoints[1].power
power_listener = ClusterListener(power_cluster)

zcl_power_voltage_id = PowerConfiguration.AttributeDefs.battery_voltage.id
zcl_power_percent_id = (
PowerConfiguration.AttributeDefs.battery_percentage_remaining.id
)

manu_cluster.update_attribute(
XIAOMI_AQARA_ATTRIBUTE_E1,
create_aqara_attr_report({23: 306, 24: 100}),
)

assert len(power_listener.attribute_updates) == 2
assert power_listener.attribute_updates[0][0] == zcl_power_voltage_id
assert power_listener.attribute_updates[0][1] == 3.1

assert power_listener.attribute_updates[1][0] == zcl_power_percent_id
assert power_listener.attribute_updates[1][1] == 200 # 100 % * 2


@pytest.mark.parametrize(
"raw_payload, expected_segments",
(
(t.LVBytes(bytes.fromhex("0003ffffff")), [True, True, True, True, True, True]),
(t.LVBytes(bytes.fromhex("0003f0ffff")), [False, True, True, True, True, True]),
(
t.LVBytes(bytes.fromhex("000300ffff")),
[False, False, True, True, True, True],
),
(
t.LVBytes(bytes.fromhex("000300f0ff")),
[False, False, False, True, True, True],
),
(
t.LVBytes(bytes.fromhex("00030000ff")),
[False, False, False, False, True, True],
),
(
t.LVBytes(bytes.fromhex("00030000f0")),
[False, False, False, False, False, True],
),
(
t.LVBytes(bytes.fromhex("0003000000")),
[False, False, False, False, False, False],
),
(
t.LVBytes(),
[True, True, True, True, True, True],
), # invalid case defaults to all enabled
(
123,
[True, True, True, True, True, True],
), # invalid case defaults to all enabled
),
)
def test_aqara_fp300_detection_range_decode(
zigpy_device_from_v2_quirk, raw_payload, expected_segments
):
"""Test FP300 detection range decoding from 0x019A into 6 local switches."""

device = zigpy_device_from_v2_quirk(AQARA, "lumi.sensor_occupy.agl8")

manu_cluster = device.endpoints[1].in_clusters[AqaraFP300ManuCluster.cluster_id]
dr_cluster = device.endpoints[1].in_clusters[FP300DetectionRangeCluster.cluster_id]

manu_cluster.update_attribute(
AqaraFP300ManuCluster.AttributeDefs.detection_range_raw.id,
raw_payload,
)

seg_ids = [
FP300DetectionRangeCluster.AttributeDefs.range_0_1m.id,
FP300DetectionRangeCluster.AttributeDefs.range_1_2m.id,
FP300DetectionRangeCluster.AttributeDefs.range_2_3m.id,
FP300DetectionRangeCluster.AttributeDefs.range_3_4m.id,
FP300DetectionRangeCluster.AttributeDefs.range_4_5m.id,
FP300DetectionRangeCluster.AttributeDefs.range_5_6m.id,
]

actual = [bool(dr_cluster._attr_cache.get(attr_id, False)) for attr_id in seg_ids]
assert actual == expected_segments


@pytest.mark.parametrize(
"prefix, segments, expected_mask",
(
(0x0300, [True, True, True, True, True, True], 0xFFFFFF),
(0x0300, [False, True, True, True, True, True], 0xFFFFF0),
(0x0300, [False, False, True, True, True, True], 0xFFFF00),
(0x0300, [False, False, False, True, True, True], 0xFFF000),
(0x0300, [False, False, False, False, True, True], 0xFF0000),
(0x0300, [False, False, False, False, False, True], 0xF00000),
(0x0300, [False, False, False, False, False, False], 0x000000),
("not_an_int", [False, False, True, True, True, True], 0xFFFF00),
),
)
@pytest.mark.asyncio
async def test_aqara_fp300_detection_range_encode(
zigpy_device_from_v2_quirk, prefix, segments, expected_mask
):
"""Test FP300 detection range encoding from 6 switches into raw 0x019A via write_attributes."""

device = zigpy_device_from_v2_quirk(AQARA, "lumi.sensor_occupy.agl8")

manu_cluster = device.endpoints[1].in_clusters[AqaraFP300ManuCluster.cluster_id]
dr_cluster = device.endpoints[1].in_clusters[FP300DetectionRangeCluster.cluster_id]

manu_cluster._write_attributes = mock.AsyncMock()

# Prefix im LocalDataCluster setzen (wie Quirk-Default: 0x0300)
dr_cluster._update_attribute(
FP300DetectionRangeCluster.AttributeDefs.prefix.id,
prefix,
)

seg_ids = [
FP300DetectionRangeCluster.AttributeDefs.range_0_1m.id,
FP300DetectionRangeCluster.AttributeDefs.range_1_2m.id,
FP300DetectionRangeCluster.AttributeDefs.range_2_3m.id,
FP300DetectionRangeCluster.AttributeDefs.range_3_4m.id,
FP300DetectionRangeCluster.AttributeDefs.range_4_5m.id,
FP300DetectionRangeCluster.AttributeDefs.range_5_6m.id,
]
attr_dict = dict(zip(seg_ids, segments))

prefix_int = 0x0300
expected_bytes = prefix_int.to_bytes(2, "little") + expected_mask.to_bytes(
3, "little"
)

expected_attr_def = manu_cluster.find_attribute(
AqaraFP300ManuCluster.AttributeDefs.detection_range_raw.id
)
expected = foundation.Attribute(
AqaraFP300ManuCluster.AttributeDefs.detection_range_raw.id,
foundation.TypeValue(),
)
expected.value.type = foundation.DataType.from_python_type(
expected_attr_def.type
).type_id
expected.value.value = expected_attr_def.type(expected_bytes)

await dr_cluster.write_attributes(attr_dict, manufacturer=0x115F)

manu_cluster._write_attributes.assert_awaited_with(
[expected],
manufacturer=0x115F,
)
Loading
Loading