░█▀█░▀█▀░█▀▄░█▄█░█▀█░█▀▀░█░█░░░░█▀▄░█▀▀
░█▀█░░█░░█▀▄░█░█░█▀█░█░░░█▀▄░░░░█░█░█▀▀
░▀░▀░▀▀▀░▀░▀░▀░▀░▀░▀░▀▀▀░▀░▀░▀░░▀▀░░▀▀▀


Integrating Kaiser Nienhaus Roller Blinds into Home Assistant

Aus der Kategorie Hacking

Begin

The holiday season often brings the joy of family gatherings—and a recurring role as the family IT support. This year’s project was integrating a Kaiser Nienhaus Wi-Fi roller blind into Home Assistant. Since no official integration exists and documentation about the communication protocol is lacking, the task required reverse engineering, crafting a proof of concept, and building a custom Home Assistant component. ## Using the Official Tools (KN Connect)

The first step was to use the official app. After installing the KN Connect app, registering an account, and adding the roller blind to the Wi-Fi network, basic operations (open/close) worked seamlessly. Testing showed that the roller blind could still be controlled locally after blocking internet access via a firewall. This confirmed the possibility of a local solution and set the stage for further exploration. ## Reverse Engineering the KN Connect App

Attempts to gather information about the roller blind using `nmap` scans failed. However, the KN Connect app revealed two communication mechanisms: MQTT and a multicast-based protocol. The multicast service operates over UDP on ports `32100` and `32101`, broadcasting to the group `238.0.0.18`. By running a simple listener on these ports, messages between the app and the roller blind were intercepted.

Code


#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
import socket

MCAST_GRP = "238.0.0.18"
MCAST_PORT = 32100

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", MCAST_PORT))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(MCAST_GRP) + socket.inet_aton("0.0.0.0"))

while True:
  data, addr = sock.recvfrom(1024)
  print(f"Message from {addr}: {data.decode()}")


## Proof of Concept

With the protocol understood, a script was written to send commands directly to the roller blind using the multicast service. The `AccessToken` is critical for authenticating commands. Below is a Python script for closing the blind:

Code


#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024

import socket
import struct
import datetime

# Multicast group and port
multicast_group = '238.0.0.18'
port = 32100

now = datetime.datetime.now()
datetime_string = str.encode(
  f"{now.year}{now.month}{now.day}{now.hour}{now.minute}{now.second}{int(now.microsecond / 1000)}"
)

CLOSE = b"0"

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
group = socket.inet_aton(multicast_group)
mreq = struct.pack('4s4s', group, socket.inet_aton('0.0.0.0'))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

message = (
  b'{"msgType":"WriteDevice","mac":"XXXXXXXXXXXX","deviceType":"22000002",'
  b'"AccessToken":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX","msgID":"'
  + datetime_string
  + b'","data":{"operation":'
  + CLOSE
  + b'}}'
)

try:
  sock.sendto(message, (multicast_group, port))
finally:
  sock.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, mreq)
  sock.close()


This script successfully commands the roller blind to close. Operations for opening and stopping can be implemented by modifying the `operation` field.## Integrating the Proof of Concept into Home Assistant

To integrate the roller blind into Home Assistant, a custom component was created. The Docker container running Home Assistant couldn't use UDP multicast, so direct IP commands were used instead. Assigning a static IP to the roller blind in the DHCP server resolved this issue.## Steps to Create the Integration

Code


{
 "domain": "kn_cover",
 "name": "KN Cover",
 "codeowners": ["Friedrich Hust"],
 "dependencies": [],
 "documentation": "https://airmack.de",
 "iot_class": "local_polling",
 "requirements": [],
 "version": "0.9.0"
}



Code


#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
from __future__ import annotations
import logging
import struct
import datetime
import socket
from datetime import timedelta

from homeassistant.components.sensor import (
  SensorDeviceClass,
  SensorEntity,
  SensorStateClass,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.util.hass_dict import HassKey

from homeassistant.components.cover import (
  ATTR_POSITION,
  ATTR_TILT_POSITION,
  DOMAIN as COVER_DOMAIN,
  CoverDeviceClass,
  CoverEntity,
  CoverState,
  CoverEntityFeature,
)

from homeassistant.const import ( # noqa: F401
  SERVICE_CLOSE_COVER,
  SERVICE_CLOSE_COVER_TILT,
  SERVICE_OPEN_COVER,
  SERVICE_OPEN_COVER_TILT,
  SERVICE_SET_COVER_POSITION,
  SERVICE_SET_COVER_TILT_POSITION,
  SERVICE_STOP_COVER,
  SERVICE_STOP_COVER_TILT,
  SERVICE_TOGGLE,
  SERVICE_TOGGLE_COVER_TILT,
  STATE_CLOSED,
  STATE_CLOSING,
  STATE_OPEN,
  STATE_OPENING,
)
DOMAIN = "cover"


DATA_COMPONENT: HassKey[EntityComponent[CoverEntity]] = HassKey(DOMAIN)
SCAN_INTERVAL = timedelta(seconds=15)

_LOGGER = logging.getLogger(__name__)
IP = "192.168.0.2" # <--- ADD IP HERE
TOKEN = b"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" # <--- ADD TOKEN HERE


def setup_platform(
  hass: HomeAssistant,
  config: ConfigType,
  add_entities: AddEntitiesCallback,
  discovery_info: DiscoveryInfoType | None = None
) -> None:
  """Set up the sensor platform."""
  add_entities([KNCover(IP, 32100)])


async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
  """Track states and offer events for covers."""
  component = hass.data[DATA_COMPONENT] = EntityComponent[CoverEntity](
    _LOGGER, DOMAIN, hass, SCAN_INTERVAL
  )

  await component.async_setup(config)


class KNCover(CoverEntity):

  _attr_device_class = CoverDeviceClass.BLIND
  _attr_supported_features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
  _attr_is_closing = False
  _attr_is_opening = False
  _attr_assumed_state = True

  CLOSE = b"0"
  OPEN = b"1"
  STOP = b"2"

  def __init__(self, ip: str = "238.0.0.18", port: int = 32100):
    # self.state = None
    self._attr_name = "KN Cover"
    self._attr_has_entity_name = True
    self._attr_entity_registry_enabled_default = True
    self._attr_assumed_state = True
    self._attr_available = True
    self._attr_is_closed = None

    self._state = None
    self._attr_unique_id = "xxxxxxxxxxxx" # <--- ADD MAC HERE
    self.ip = ip
    self.port = port
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    self.group = socket.inet_aton(ip)
    self.mreq = struct.pack('4s4s', self.group, socket.inet_aton('0.0.0.0')) # 0.0.0.0 bedeutet "alle Schnittstellen"
    self.accessToken = TOKEN
    self.open_cover()

  @property
  def is_closed(self) -> None:
    return self._state == CoverState.CLOSED

  @property
  def assumed_state(self) -> bool:
    """Return True if unable to access real state of the entity."""
    return True

  def __del__(self):
    self.sock.close()

  def send(self, message):
    self.sock.sendto(message, (self.ip, self.port))

  def open_cover(self, **kwargs):
    """Open the cover."""
    self._state = CoverState.OPEN
    self._attr_is_closed = False
    message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.OPEN + b'}}'

    self.send(message)

  def stop_cover(self, **kwargs):
    """Stop the cover."""
    self._state = CoverState.OPEN # guesstimation
    self._attr_is_closed = False
    message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.STOP + b'}}'

    self.send(message)

  def close_cover(self, **kwargs):
    """Close cover."""
    self._state = CoverState.CLOSED
    self._attr_is_closed = True
    message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.CLOSE + b'}}'

    self.send(message)

  def getDate(self):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    second = now.second
    millisecond = int(now.microsecond / 1000) # Convert microseconds to milliseconds
    datetime_string = str.encode(f"{year}{month}{day}{hour}{minute}{second}{millisecond}")
    return datetime_string


cover:

- platform: kn_cover

||a



# Latest article


=> gemini://airmack.de/single/67613b8c19d5343156cb43e58148e8bff7f911e91ba889620694894749c2fd25.gmi Integrating Kaiser Nienhaus Roller Blinds into Home Assistant erstellt am 03. January 2025

=> gemini://airmack.de/single/dac1268f8e7bde940ad3a3b940a533902b8db49c5325066725b3e98712344a2f.gmi Fix for zfs-dkms-git in arch for kernel 5.15.7 erstellt am 11. December 2021

=> gemini://airmack.de/single/756ff8257045e620ce61d62af5e8103fdf505bd94711fd404dd79b64b78e1302.gmi Project gemini erstellt am 13. Februar 2021

=> gemini://airmack.de/single/6e406b4de7aae2b2bcfe6b3c6bbd2eff40384b560947e3b90202539b04d2545e.gmi Eat Sleep PWN Repeat erstellt am 07. Dezember 2020

=> gemini://airmack.de/single/77e35efe5b143e517d0f9e4b1780cb8da3f0a63ee6a358869035fdd8e1090a0b.gmi Physico-chemically motivated parameterization and modelling of real-time capable lithium-ion battery models: a case study on the Tesla Model S battery erstellt am 06. Dezember 2020

=> gemini://airmack.de/single/794453ac87870d7c6e6d3367a14244fb7805dfcd1ab00a7950e30ae9199a50bc.gmi Breaking out of cisco anyconnect no local network restriction erstellt am 8. April 2018

=> gemini://airmack.de/single/4abb9ece859f35c242547c24413532731d6a00cb50057d0c6de416b6c8e757ff.gmi Insomni Hack 2015(Lost In Memories) writeup erstellt am 23. Maerz 2014

# Footer


=> gemini://airmack.de/index.gmi Hauptseite

=> gemini://airmack.de/single/impressum.gmi Impressum

=> gemini://airmack.de/single/uebersicht.gmi Übersicht

=> gemini://airmack.de/single/bio.gmi Bio