Update
I have uploaded a working version of the connector code on github : https://github.com/jon1012/mihome
What is it ?
The Xiaomi smart home solution is a low-cost zigbee ecosystem of devices that can stay on for a very long time with deep sleep on little batteries.
Best part about it is the price, less than 40 euros for the gateway (hub), and less than 20 for the individual sensors (you can maybe even find better prices online, just avoid gearbest!).
Available sensors :
- Door Sensor
- Motion sensor (Body sensor)
- Wireless switch (a button that support clicks, double clicks and long presses)
- Magic Cube (I haven't got one to test yet)
- Probably others
1st tentative
This is my naïve test according to some specs found on the web.
import socket import binascii UDP_IP = "192.168.0.107" UDP_PORT_FROM = 54322 UDP_PORT = 54321 MESSAGE = binascii.unhexlify('21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff') print ("UDP target IP:", UDP_IP) print ("UDP target port:", UDP_PORT) print ("message:\n\t", binascii.hexlify(MESSAGE)) sock = socket.socket(socket.AF_INET, # Internet socket.SOCK_DGRAM) # UDP sock.bind(("0.0.0.0", UDP_PORT_FROM)) sock.sendto(MESSAGE, (UDP_IP, UDP_PORT)) data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes print ("received message:\n\t", binascii.hexlify(data)) token = binascii.hexlify(data)[28:32] print ("token:", token) #is it the key really ? # now, tentative stuff... magic = [0x20, 0x59, 0x5C, 0xD3, 0x24, 0x10, 0x5D, 0x54, 0x14, 0xC6, 0xD4, 0xE3, 0xC8, 0x80, 0xC6, 0xF0] key = hashlib.md5(bytes(magic)).digest() iv = hashlib.md5(key + bytes(magic)).digest() # here be dragons...
2nd tentative
A few weeks ago, someone published a project to interface the aqara bridge (xiaomi hub, found the aqara name a bit by surprise) with homebridge from apple.
It explains that you should activate developer mode as explained in this link : http://bbs.xiaomi.cn/t-13198850 (in chinese).
Some important points :
- You should activate "local network mode"
- Get a password
- Get a mac address
Test code (working to receive raw messages) :
import socket import binascii import struct UDP_IP = "192.168.0.107" UDP_PORT_FROM = 54322 UDP_PORT = 54321 MULTICAST_PORT = 9898 SERVER_PORT = 4321 MULTICAST_ADDRESS = '224.0.0.50' SOCKET_BUFSIZE = 1024 MESSAGE = binascii.unhexlify('21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff') sock = socket.socket(socket.AF_INET, # Internet socket.SOCK_DGRAM) # UDP sock.bind(("0.0.0.0", MULTICAST_PORT)) mreq = struct.pack("=4sl", socket.inet_aton(MULTICAST_ADDRESS), socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) while True: data, addr = sock.recvfrom(SOCKET_BUFSIZE) # buffer size is 1024 bytes print ("received message:", data)
Example of messages received :
{'cmd': 'heartbeat', 'token': 'ydfnrsd4cjgIVcTj', 'model': 'gateway', 'sid': 'f0b429b3d7cc', 'data': '{"ip":"192.168.0.107"}', 'short_id': '0'} {'cmd': 'report', 'model': 'motion', 'data': '{"status":"motion"}', 'short_id': 23233, 'sid': '158d000118863e'}
Small Proof-of-concept MQTT bridge
This is a work in progress small proof of concept mqtt bridge for xiaomi router with local protocol enabled :
# Just a note, please credit me (Jonathan Schemoul, HackSpark.fr) if you use this code. # Oh, and I'm going to make a real project out of it (I'm working on it), so you'll be able to really use it soon. import socket import binascii import struct import json import paho.mqtt.client as mqtt MULTICAST_PORT = 9898 SERVER_PORT = 4321 MULTICAST_ADDRESS = '224.0.0.50' SOCKET_BUFSIZE = 1024 MQTT_SERVER = "192.168.0.149" MQTT_PORT = 1883 PATH_FMT = "xiaomi/{model}/{sid}/{prop}" # short_id or sid ? def prepare_socket(): sock = socket.socket(socket.AF_INET, # Internet socket.SOCK_DGRAM) # UDP sock.bind(("0.0.0.0", MULTICAST_PORT)) mreq = struct.pack("=4sl", socket.inet_aton(MULTICAST_ADDRESS), socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) return sock def prepare_mqtt(): client = mqtt.Client() client.connect(MQTT_SERVER, MQTT_PORT, 60) return client LAST_TOKEN = None def push_data(client, model, sid, cmd, data): #if cmd == "report": for key, value in data.items(): path = PATH_FMT.format(model=model, sid=sid, cmd=cmd, prop=key) client.publish(path, payload=value) #elif cmd == "heartbeat": # pass def handle_incoming_data(client, payload): global LAST_TOKEN if isinstance(payload.get('data', None), str): push_data(client, payload["model"], payload["sid"], payload["cmd"], json.loads(payload["data"])) if "token" in payload: LAST_TOKEN = payload['token'] if __name__ == "__main__": sock = prepare_socket() client = prepare_mqtt() while True: data, addr = sock.recvfrom(SOCKET_BUFSIZE) # buffer size is 1024 bytes try: payload = json.loads(data.decode("utf-8")) handle_incoming_data(client, payload) except Exception as e: print("Can't handle message %r (%r)" % (data, e))
Example push in the mqtt log :
xiaomi/switch/158d00010d53a6/status double_click xiaomi/switch/158d00010d53a6/status click xiaomi/switch/158d00010d53a6/status click xiaomi/switch/158d00010d53a6/status long_click_press xiaomi/switch/158d00010d53a6/status long_click_release xiaomi/switch/158d00010d53a6/status click xiaomi/motion/158d000118863e/status motion xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/magnet/158d000105b898/status open xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/magnet/158d000105b898/status close xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107
158d00010d53a6 is a button switch
158d000118863e is a motion sensor
and 158d000105b898 is a door sensor (magnet)
finally, obsviously, f0b429b3d7cc is the gateway.
It workkkssss, finally ! :D
Now, on to a real project with config files and all on github, I'll post link here once it's there :)
Oh, and obviously, next step is to add those sensor to the excellent home-assistant app.
Getting There (new edit) :
I still have to convert to asyncio, but discovery is here and protocol code is decoupled from mqtt (first step toward home assistant plugin)
import socket import binascii import struct import json import paho.mqtt.client as mqtt MQTT_SERVER = "192.168.0.149" MQTT_PORT = 1883 PATH_FMT = "xiaomi/{model}/{sid}/{prop}" # short_id or sid ? def prepare_mqtt(): client = mqtt.Client() client.connect(MQTT_SERVER, MQTT_PORT, 60) return client def push_data(client, model, sid, cmd, data): for key, value in data.items(): path = PATH_FMT.format(model=model, sid=sid, cmd=cmd, prop=key) client.publish(path, payload=value, qos=0) class XiaomiConnector: MULTICAST_PORT = 9898 SERVER_PORT = 4321 MULTICAST_ADDRESS = '224.0.0.50' SOCKET_BUFSIZE = 1024 def __init__(self, data_callback=None, auto_discover=True): self.data_callback = data_callback self.last_tokens = dict() self.socket = self._prepare_socket() self.nodes = dict() def _prepare_socket(self): sock = socket.socket(socket.AF_INET, # Internet socket.SOCK_DGRAM) # UDP sock.bind(("0.0.0.0", self.MULTICAST_PORT)) mreq = struct.pack("=4sl", socket.inet_aton(self.MULTICAST_ADDRESS), socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.SOCKET_BUFSIZE) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) return sock def check_incoming(self): print("checking") data, addr = self.socket.recvfrom(self.SOCKET_BUFSIZE) try: payload = json.loads(data.decode("utf-8")) print(payload) self.handle_incoming_data(payload) except Exception as e: raise print("Can't handle message %r (%r)" % (data, e)) def handle_incoming_data(self, payload): if isinstance(payload.get('data', None), str): cmd = payload["cmd"] if cmd in ["heartbeat", "report", "read_ack"]: if self.data_callback is not None: self.data_callback(payload["model"], payload["sid"], payload["cmd"], json.loads(payload["data"])) if cmd == "read_ack" and payload["sid"] not in self.nodes: self.nodes[payload["sid"]] = dict(model=payload["model"]) if cmd == "heartbeat" and payload["sid"] not in self.nodes: self.request_sids(payload["sid"]) self.nodes[payload["sid"]] = json.loads(payload["data"]) self.nodes[payload["sid"]]["model"] = payload["model"] self.nodes[payload["sid"]]["sensors"] = [] if cmd == "get_id_list_ack": device_sids = json.loads(payload["data"]) self.nodes[payload["sid"]]["nodes"] = device_sids for sid in device_sids:Wireless switch self.request_current_status(sid) if "token" in payload: self.last_tokens[payload["sid"]] = payload['token'] def request_sids(self, sid): self.send_command({"cmd":"get_id_list", sid: sid}) def request_current_status(self, device_sid): self.send_command({"cmd":"read", "sid": device_sid}) def send_command(self, data): self.socket.sendto(json.dumps(data).encode("utf-8"), (self.MULTICAST_ADDRESS, self.MULTICAST_PORT)) def get_nodes(self): return self.nodes if __name__ == "__main__": client = prepare_mqtt() cb = lambda m, s, c, d: push_data(client, m, s, c, d) connector = XiaomiConnector(data_callback=cb) while True: connector.check_incoming()
Links:
- NEW: homebridge-aqara (link the xiaomi hub to homebridge) : https://github.com/snOOrz/homebridge-aqara/blob/master/README.md
- Protocol description ? http://drops.xmd5.com/static/drops/tips-10450.html
- Android sdk : https://github.com/MiEcosystem/mijiaSDK
- Previous try (on the plug that seems to have the same api) : https://github.com/DameMirai/DameStudy/wiki/Xiaomi-socket-protocol-finding-%5Bfailed-now...%5D