Update
I have uploaded a working version of the connector code on github : https://github.com/jon1012/mihome
What is it ?
The Xiaomi smart home solution is a low-cost zigbee ecosystem of devices that can stay on for a very long time with deep sleep on little batteries.
Best part about it is the price, less than 40 euros for the gateway (hub), and less than 20 for the individual sensors (you can maybe even find better prices online, just avoid gearbest!).
Available sensors :
Door Sensor
Motion sensor (Body sensor)
Wireless switch (a button that support clicks, double clicks and long presses)
- Magic Cube (I haven't got one to test yet)
- Probably others
1st tentative
This is my naïve test according to some specs found on the web.
import socket
import binascii
UDP_IP = "192.168.0.107"
UDP_PORT_FROM = 54322
UDP_PORT = 54321
MESSAGE = binascii.unhexlify('21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff')
print ("UDP target IP:", UDP_IP)
print ("UDP target port:", UDP_PORT)
print ("message:\n\t", binascii.hexlify(MESSAGE))
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind(("0.0.0.0", UDP_PORT_FROM))
sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
print ("received message:\n\t", binascii.hexlify(data))
token = binascii.hexlify(data)[28:32]
print ("token:", token) #is it the key really ?
# now, tentative stuff...
magic = [0x20, 0x59, 0x5C, 0xD3, 0x24, 0x10, 0x5D, 0x54,
0x14, 0xC6, 0xD4, 0xE3, 0xC8, 0x80, 0xC6, 0xF0]
key = hashlib.md5(bytes(magic)).digest()
iv = hashlib.md5(key + bytes(magic)).digest()
# here be dragons...2nd tentative
A few weeks ago, someone published a project to interface the aqara bridge (xiaomi hub, found the aqara name a bit by surprise) with homebridge from apple.
It explains that you should activate developer mode as explained in this link : http://bbs.xiaomi.cn/t-13198850 (in chinese).
Some important points :
- You should activate "local network mode"
- Get a password
- Get a mac address
Test code (working to receive raw messages) :
import socket
import binascii
import struct
UDP_IP = "192.168.0.107"
UDP_PORT_FROM = 54322
UDP_PORT = 54321
MULTICAST_PORT = 9898
SERVER_PORT = 4321
MULTICAST_ADDRESS = '224.0.0.50'
SOCKET_BUFSIZE = 1024
MESSAGE = binascii.unhexlify('21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff')
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind(("0.0.0.0", MULTICAST_PORT))
mreq = struct.pack("=4sl", socket.inet_aton(MULTICAST_ADDRESS), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
data, addr = sock.recvfrom(SOCKET_BUFSIZE) # buffer size is 1024 bytes
print ("received message:", data)Example of messages received :
{'cmd': 'heartbeat', 'token': 'ydfnrsd4cjgIVcTj', 'model': 'gateway', 'sid': 'f0b429b3d7cc', 'data': '{"ip":"192.168.0.107"}', 'short_id': '0'}
{'cmd': 'report', 'model': 'motion', 'data': '{"status":"motion"}', 'short_id': 23233, 'sid': '158d000118863e'}Small Proof-of-concept MQTT bridge
This is a work in progress small proof of concept mqtt bridge for xiaomi router with local protocol enabled :
# Just a note, please credit me (Jonathan Schemoul, HackSpark.fr) if you use this code.
# Oh, and I'm going to make a real project out of it (I'm working on it), so you'll be able to really use it soon.
import socket
import binascii
import struct
import json
import paho.mqtt.client as mqtt
MULTICAST_PORT = 9898
SERVER_PORT = 4321
MULTICAST_ADDRESS = '224.0.0.50'
SOCKET_BUFSIZE = 1024
MQTT_SERVER = "192.168.0.149"
MQTT_PORT = 1883
PATH_FMT = "xiaomi/{model}/{sid}/{prop}" # short_id or sid ?
def prepare_socket():
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind(("0.0.0.0", MULTICAST_PORT))
mreq = struct.pack("=4sl", socket.inet_aton(MULTICAST_ADDRESS),
socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
return sock
def prepare_mqtt():
client = mqtt.Client()
client.connect(MQTT_SERVER, MQTT_PORT, 60)
return client
LAST_TOKEN = None
def push_data(client, model, sid, cmd, data):
#if cmd == "report":
for key, value in data.items():
path = PATH_FMT.format(model=model,
sid=sid,
cmd=cmd,
prop=key)
client.publish(path, payload=value)
#elif cmd == "heartbeat":
# pass
def handle_incoming_data(client, payload):
global LAST_TOKEN
if isinstance(payload.get('data', None), str):
push_data(client,
payload["model"],
payload["sid"],
payload["cmd"],
json.loads(payload["data"]))
if "token" in payload:
LAST_TOKEN = payload['token']
if __name__ == "__main__":
sock = prepare_socket()
client = prepare_mqtt()
while True:
data, addr = sock.recvfrom(SOCKET_BUFSIZE) # buffer size is 1024 bytes
try:
payload = json.loads(data.decode("utf-8"))
handle_incoming_data(client, payload)
except Exception as e:
print("Can't handle message %r (%r)" % (data, e))Example push in the mqtt log :
xiaomi/switch/158d00010d53a6/status double_click xiaomi/switch/158d00010d53a6/status click xiaomi/switch/158d00010d53a6/status click xiaomi/switch/158d00010d53a6/status long_click_press xiaomi/switch/158d00010d53a6/status long_click_release xiaomi/switch/158d00010d53a6/status click xiaomi/motion/158d000118863e/status motion xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/magnet/158d000105b898/status open xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/magnet/158d000105b898/status close xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107 xiaomi/gateway/f0b429b3d7cc/ip 192.168.0.107
158d00010d53a6 is a button switch
158d000118863e is a motion sensor
and 158d000105b898 is a door sensor (magnet)
finally, obsviously, f0b429b3d7cc is the gateway.
It workkkssss, finally ! :D
Now, on to a real project with config files and all on github, I'll post link here once it's there :)
Oh, and obviously, next step is to add those sensor to the excellent home-assistant app.
Getting There (new edit) :
I still have to convert to asyncio, but discovery is here and protocol code is decoupled from mqtt (first step toward home assistant plugin)
import socket
import binascii
import struct
import json
import paho.mqtt.client as mqtt
MQTT_SERVER = "192.168.0.149"
MQTT_PORT = 1883
PATH_FMT = "xiaomi/{model}/{sid}/{prop}" # short_id or sid ?
def prepare_mqtt():
client = mqtt.Client()
client.connect(MQTT_SERVER, MQTT_PORT, 60)
return client
def push_data(client, model, sid, cmd, data):
for key, value in data.items():
path = PATH_FMT.format(model=model,
sid=sid,
cmd=cmd,
prop=key)
client.publish(path, payload=value, qos=0)
class XiaomiConnector:
MULTICAST_PORT = 9898
SERVER_PORT = 4321
MULTICAST_ADDRESS = '224.0.0.50'
SOCKET_BUFSIZE = 1024
def __init__(self, data_callback=None, auto_discover=True):
self.data_callback = data_callback
self.last_tokens = dict()
self.socket = self._prepare_socket()
self.nodes = dict()
def _prepare_socket(self):
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind(("0.0.0.0", self.MULTICAST_PORT))
mreq = struct.pack("=4sl", socket.inet_aton(self.MULTICAST_ADDRESS),
socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
self.SOCKET_BUFSIZE)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
return sock
def check_incoming(self):
print("checking")
data, addr = self.socket.recvfrom(self.SOCKET_BUFSIZE)
try:
payload = json.loads(data.decode("utf-8"))
print(payload)
self.handle_incoming_data(payload)
except Exception as e:
raise
print("Can't handle message %r (%r)" % (data, e))
def handle_incoming_data(self, payload):
if isinstance(payload.get('data', None), str):
cmd = payload["cmd"]
if cmd in ["heartbeat", "report", "read_ack"]:
if self.data_callback is not None:
self.data_callback(payload["model"],
payload["sid"],
payload["cmd"],
json.loads(payload["data"]))
if cmd == "read_ack" and payload["sid"] not in self.nodes:
self.nodes[payload["sid"]] = dict(model=payload["model"])
if cmd == "heartbeat" and payload["sid"] not in self.nodes:
self.request_sids(payload["sid"])
self.nodes[payload["sid"]] = json.loads(payload["data"])
self.nodes[payload["sid"]]["model"] = payload["model"]
self.nodes[payload["sid"]]["sensors"] = []
if cmd == "get_id_list_ack":
device_sids = json.loads(payload["data"])
self.nodes[payload["sid"]]["nodes"] = device_sids
for sid in device_sids:Wireless switch
self.request_current_status(sid)
if "token" in payload:
self.last_tokens[payload["sid"]] = payload['token']
def request_sids(self, sid):
self.send_command({"cmd":"get_id_list", sid: sid})
def request_current_status(self, device_sid):
self.send_command({"cmd":"read", "sid": device_sid})
def send_command(self, data):
self.socket.sendto(json.dumps(data).encode("utf-8"),
(self.MULTICAST_ADDRESS, self.MULTICAST_PORT))
def get_nodes(self):
return self.nodes
if __name__ == "__main__":
client = prepare_mqtt()
cb = lambda m, s, c, d: push_data(client, m, s, c, d)
connector = XiaomiConnector(data_callback=cb)
while True:
connector.check_incoming()Links:
- NEW: homebridge-aqara (link the xiaomi hub to homebridge) : https://github.com/snOOrz/homebridge-aqara/blob/master/README.md
- Protocol description ? http://drops.xmd5.com/static/drops/tips-10450.html
- Android sdk : https://github.com/MiEcosystem/mijiaSDK
- Previous try (on the plug that seems to have the same api) : https://github.com/DameMirai/DameStudy/wiki/Xiaomi-socket-protocol-finding-%5Bfailed-now...%5D