I've expanded my Lutron Caseta system with a 2nd dimmer switch (I'm not a huge fan of the switches, the buttons feel kind of cheap and they can be confusing to use) but they are offer 2-way communication, they work reliably and they do not require a neutral wire which is a big plus.for older homes like mine.
So my Lutron App stopped working a while ago because my hub needed an update which I figured was a bad idea(TM) and since I rarely used the app anyway it didn't seem that important.
Then I decided to do an all around firmware update including the hub and my fears were confirmed. Since a pro hub that has an API costs twice as much I wasn't really running out to replace what I had.
Lutron has removed the SSH access (well it's still listening on the SSH port but it doesn't accept the key that was floating around the internet anymore) and replaced it's app communication with a SSL based web type service called LEAP.
On the plus side this method is more secure than having a single key that works on every hub, but on the down side you need their server to create a certificate for you now to gain access to the LEAP service.
Fortunately the folks at Home Assistant along with the folks over a Github maintaining the pylutron-caseta module have released an update and a script for fetching the key/cert files for your hub from Lutron.
So first you need to get your private key file, as well as a certificate with your devices MAC in it signed by Lutron and the CA certificate from Lutron. You will also need the local CA certificate of your hub. These are all valid until 2038 so you should really only need to do this once if you block your device from internet access which I recommend you do. I'm not sure what will happen when the certs expire but you may need to offer up a fake NTP server with an old date if Lutron isn't around or has discontinued this service by then.
There is a little Python script you can get here which will have you sign into your Lutron account and get the OAuth string so it can request the required keys and certificates. It will then write out the files for you. If you are reading this I suggest you get these files even if you don't need them as they may make it harder to get in the future.
The newest pylutron_caseta on Github has been written for Python 3 and I really don't know enough Py3 to re-write my whole app... I fought with it for a while and I found an intermediate update which will switch the existing Py2.7 code from SSH to SSL, so I have updated my library and re-written a small portion of my code to work with the new one.
One item of note is that you need to "ping" the hub every once in a while (I believe 15 mins is the timeout) via the web interface or it will stop sending you fresh data and you will need to close and re-open the connection.
I had a hell of a time finding any documentation, support or example code for this library so there was a lot of trial and error involved as well as digging through source and HA's modules.
This code works on both Windows and Linux but you will need Python 2.7.9 or higher as older versions don't support the SSL version used by the hub.
So here is the step by step guide with code:
1) Download the zip from HA linked above to get your 3 certificates and your private key. Run the script and follow the instructions. It will confirm it was able to communicate with your bridge and output the following files: caseta.key, caseta-bridge.crt and caseta.crt.
2) Create the smarbridge.py module with the following code:
# I did not write this code, this came from Github, I just added the ping stuff to it.
"""Provides an API to interact with the Lutron Caseta Smart Bridge."""
import json
import logging
import threading
import ssl
import socket
#from pylutron_caseta import _LEAP_DEVICE_TYPES
_LEAP_DEVICE_TYPES = {'light': ['WallDimmer', 'PlugInDimmer'],
'switch': ['WallSwitch'],
'cover': ['SerenaHoneycombShade', 'SerenaRollerShade',
'TriathlonHoneycombShade',
'TriathlonRollerShade', 'QsWirelessShade'],
'sensor': ['Pico1Button', 'Pico2Button',
'Pico2ButtonRaiseLower', 'Pico3Button',
'Pico3ButtonRaiseLower', 'Pico4Button',
'Pico4ButtonScene', 'Pico4ButtonZone',
'Pico4Button2Group', 'FourGroupRemote']}
_LOG = logging.getLogger('smartbridge')
_LOG.setLevel(logging.DEBUG)
class Smartbridge:
"""
A representation of the Lutron Caseta Smart Bridge.
It uses an SSH interface known as the LEAP server.
"""
def __init__(self, hostname, keyfile, certfile, ca_certs):
"""Initialize the Smart Bridge."""
self.devices = {}
self.scenes = {}
self._hostname = hostname
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self.logged_in = False
self._ssl_sock = None
self._login()
self._load_devices()
self._load_scenes()
_LOG.debug(self.devices)
_LOG.debug(self.scenes)
monitor = threading.Thread(target=self._monitor)
monitor.setDaemon(True)
monitor.start()
for _id in self.devices:
self.get_value(_id)
self._subscribers = {}
def add_subscriber(self, device_id, callback_):
"""
Add a listener to be notified of state changes.
:param device_id: device id, e.g. 5
:param callback_: callback to invoke
"""
self._subscribers[device_id] = callback_
def get_devices(self):
"""Will return all known devices connected to the Smart Bridge."""
return self.devices
def get_devices_by_domain(self, domain):
"""
Return a list of devices for the given domain.
:param domain: one of 'light', 'switch', 'cover' or 'sensor'
:returns list of zero or more of the devices
"""
devs = []
# return immediately if not a supported domain
if domain not in _LEAP_DEVICE_TYPES:
return devs
# loop over all devices and check their type
for device_id in self.devices:
if self.devices[device_id]['type'] in _LEAP_DEVICE_TYPES[domain]:
devs.append(self.devices[device_id])
return devs
def get_devices_by_type(self, type_):
"""
Will return all devices of a given device type.
:param type_: LEAP device type, e.g. WallSwitch
"""
devs = []
for device_id in self.devices:
if self.devices[device_id]['type'] == type_:
devs.append(self.devices[device_id])
return devs
def get_devices_by_types(self, types):
"""
Will return all devices of for a list of given device types.
:param types: list of LEAP device types such as WallSwitch, WallDimmer
"""
devs = []
for device_id in self.devices:
if self.devices[device_id]['type'] in types:
devs.append(self.devices[device_id])
return devs
def get_device_by_id(self, device_id):
"""
Will return a device with the given ID.
:param device_id: device id, e.g. 5
"""
return self.devices[device_id]
def get_scenes(self):
"""Will return all known scenes from the Smart Bridge."""
return self.scenes
def get_scene_by_id(self, scene_id):
"""
Will return a scene with the given scene ID.
:param scene_id: scene id, e.g 23
"""
return self.scenes[scene_id]
def ping(self):
"""
Pings the device to keep alive
"""
cmd = '{"CommuniqueType":"ReadRequest",' \
'"Header":{"Url":"/server/1/status/ping"}}\n'
return self._send_command(cmd)
def get_value(self, device_id):
"""
Will return the current level value for the device with the given ID.
:param device_id: device id, e.g. 5
:returns level value from 0 to 100
:rtype int
"""
zone_id = self._get_zone_id(device_id)
cmd = '{"CommuniqueType":"ReadRequest",' \
'"Header":{"Url":"/zone/%s/status"}}\n' % zone_id
if zone_id:
return self._send_command(cmd)
def is_connected(self):
"""Will return True if currently connected to the Smart Bridge."""
return self.logged_in
def is_on(self, device_id):
"""
Will return True is the device with the given ID is 'on'.
:param device_id: device id, e.g. 5
:returns True if level is greater than 0 level, False otherwise
"""
return self.devices[device_id]['current_state'] > 0
def set_value(self, device_id, value):
"""
Will set the value for a device with the given ID.
:param device_id: device id to set the value on
:param value: integer value from 0 to 100 to set
"""
zone_id = self._get_zone_id(device_id)
if zone_id:
cmd = '{"CommuniqueType":"CreateRequest",' \
'"Header":{"Url":"/zone/%s/commandprocessor"},' \
'"Body":{"Command":{"CommandType":"GoToLevel",' \
'"Parameter":[{"Type":"Level",' \
'"Value":%s}]}}}\n' % (zone_id, value)
return self._send_command(cmd)
def turn_on(self, device_id):
"""
Will turn 'on' the device with the given ID.
:param device_id: device id to turn on
"""
return self.set_value(device_id, 100)
def turn_off(self, device_id):
"""
Will turn 'off' the device with the given ID.
:param device_id: device id to turn off
"""
return self.set_value(device_id, 0)
def activate_scene(self, scene_id):
"""
Will activate the scene with the given ID.
:param scene_id: scene id, e.g. 23
"""
if scene_id in self.scenes:
cmd = '{"CommuniqueType":"CreateRequest",' \
'"Header":{"Url":"/virtualbutton/%s/commandprocessor"},' \
'"Body":{"Command":{"CommandType":"PressAndRelease"}}}' \
'\n' % scene_id
return self._send_command(cmd)
def _get_zone_id(self, device_id):
"""
Return the zone id for an given device.
:param device_id: device id for which to retrieve a zone id
"""
device = self.devices[device_id]
if 'zone' in device:
return device['zone']
return None
def _send_command(self, cmd):
"""Send a command to the bridge."""
self._ssl_sock.send(cmd)
def _monitor(self):
"""Event monitoring loop."""
while True:
try:
# require a certificate from the server
ssl_output = self._ssl_sock.recv(1)
response = ssl_output
while ssl_output != "\n":
ssl_output = self._ssl_sock.recv(1)
response += ssl_output
_LOG.debug(response)
resp_parts = response.split(b'\r\n')
try:
for resp in resp_parts:
if resp:
resp_json = json.loads(resp.decode("UTF-8"))
self._handle_response(resp_json)
except ValueError:
_LOG.error("Invalid response "
"from SmartBridge: " + response.decode("UTF-8"))
except Exception as e:
print "Shit something fucked up!"
print e
except ConnectionError:
self.logged_in = False
def _handle_response(self, resp_json):
"""
Handle an event from the ssl interface.
If a zone level was changed either by external means such as a Pico
remote or by a command sent from us, the new level will appear on the
SSH shell and the response is handled by this function.
:param resp_json: full JSON response from the SSH shell
"""
comm_type = resp_json['CommuniqueType']
if comm_type == 'ReadResponse':
body = resp_json['Body']
try:
if body['PingResponse'] != "":
print "*PONG*"
return 0
except:
pass
zone = body['ZoneStatus']['Zone']['href']
zone = zone[zone.rfind('/') + 1:]
level = body['ZoneStatus']['Level']
_LOG.debug('zone=%s level=%s', zone, level)
for _device_id in self.devices:
device = self.devices[_device_id]
if 'zone' in device:
if zone == device['zone']:
device['current_state'] = level
if _device_id in self._subscribers:
self._subscribers[_device_id]()
def _login(self):
"""Connect and login to the Smart Bridge LEAP server using SSL."""
if self.logged_in:
return
_LOG.debug("Connecting to Smart Bridge via SSL")
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# require a certificate from the server
self._ssl_sock = ssl.wrap_socket(connection,
keyfile=self._keyfile,
certfile=self._certfile,
ca_certs=self._ca_certs,
cert_reqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_TLSv1_2)
self._ssl_sock.connect((self._hostname, 8081))
_LOG.debug("Successfully connected to Smart Bridge.")
self.logged_in = True
def _load_devices(self):
"""Load the device list from the SSL LEAP server interface."""
_LOG.debug("Loading devices")
self._ssl_sock.send(
'{"CommuniqueType":"ReadRequest","Header":{"Url":"/device"}}\n')
ssl_output = self._ssl_sock.recv(1)
response = ssl_output
while ssl_output != "\n":
ssl_output = self._ssl_sock.recv(1)
response += ssl_output
_LOG.debug(response)
device_json = json.loads(response.decode("UTF-8"))
for device in device_json['Body']['Devices']:
_LOG.debug(device)
device_id = device['href'][device['href'].rfind('/') + 1:]
device_zone = None
if 'LocalZones' in device:
device_zone = device['LocalZones'][0]['href']
device_zone = device_zone[device_zone.rfind('/') + 1:]
device_name = device['Name']
device_type = device['DeviceType']
self.devices[device_id] = {'device_id': device_id,
'name': device_name,
'type': device_type,
'zone': device_zone,
'current_state': -1}
def _load_scenes(self):
"""
Load the scenes from the Smart Bridge.
Scenes are known as virtual buttons in the SSL LEAP interface.
"""
_LOG.debug("Loading scenes from the Smart Bridge")
self._ssl_sock.send(
'{"CommuniqueType":"ReadRequest","Header":'
'{"Url":"/virtualbutton"}}\n')
ssl_output = self._ssl_sock.recv(1)
response = ssl_output
while ssl_output != "\n":
ssl_output = self._ssl_sock.recv(1)
response += ssl_output
_LOG.debug(response)
scene_json = json.loads(response.decode("UTF-8"))
for scene in scene_json['Body']['VirtualButtons']:
_LOG.debug(scene)
if scene['IsProgrammed']:
scene_id = scene['href'][scene['href'].rfind('/') + 1:]
scene_name = scene['Name']
self.scenes[scene_id] = {'scene_id': scene_id,
'name': scene_name}
import smartbridge, time
# This will open a connection to your bridge. You will need to put in the static IP of your bridge in here #and you can add paths to the crt files in needed.
hub = smartbridge.Smartbridge('IP OF YOUR BRIDGE', 'caseta.key', 'caseta.crt', 'caseta-bridge.crt')
lastping = time.time()
while 1:
# Check if the hub has been pinged in the last minute, if not go ahead and ping it
if time.time() > lastping + 60:
print "*PING*"
lastping = time.time()
hub.ping() # keep alive
for item in fetch:
print fetch[item] # this will print out the dict of each known device.
print "Name: %s Brightness: %s" % (fetch[item]['name'],fetch[item]['current_state']) # this
# will print out the name and brightness (0-100) of each device in the list
hub.set_value(item, 100) # This will set each item to 100% brightness
# "Item" in this case is the DID or Device ID, and the 100 above is the brightness level of #that device. You can easily check for a specific device 'name' and only alter that device.
# The Smartbridge module supports other functions too but there doesn't seem to be # documentation on them and I think most use cases don't need them. This should be #enough to get most people started fetching real-time status data and controlling lights, if #you want to dig into the code above to see what else it can do be my guest!
4) That should be enough to get you started, if you found this info useful please leave a comment and let me know.
Huge thanks to gurumitts & mdonoughe for their time and effort in figuring this out even though his readme is crap :P
Important Links:
https://github.com/mdonoughe/pylutron-caseta Github Page for Pylutron_caseta
https://home-assistant.io/components/lutron_caseta/ Home Assistant Lutron Caseta module page
This doesn't work for me. I get an inscrutible ssl error.
ReplyDeleteThe certs work in homeassistant. The latest version of the library looks like it was developed specifically as a home assistant asyncio component.
I've used homeassistant, but now I want to make something lightweight.
Hey Dude thanks for taking the time to put this together. It helps a great deal.
ReplyDeleteFrom the comment above its not working so i'm a bit hesitant to try it. If you can kindly provide some feedback that would be great. Thanks Moe.
Looks like the code has changed and the functions are a bit different. For anyone who's stumbling upon this in the future, the following works. The 'current_state' doesn't look valid, nor does 'fan_speed', but it at least connects and gets a list of all of the devices. Hopefully additional functionality shouldn't be too hard to figure out:
ReplyDelete# git clone https://github.com/gurumitts/pylutron-caseta.git
# cd pylutron-caseta
# python3 setup.py install
import asyncio
HUB_IP_ADDRESS='192.168.0.xx'
async def lutron():
from pylutron_caseta.smartbridge import Smartbridge
bridge = Smartbridge.create_tls(HUB_IP_ADDRESS, './lutron_certs/caseta.key', './lutron_certs/caseta.crt', './lutron_certs/caseta-bridge.crt')
await bridge.connect()
fetch = bridge.get_devices()
for item in fetch:
print(fetch[item]) # this will print out the dict of each known device.
print("Name: %s" % (fetch[item]['name']))
asyncio.get_event_loop().run_until_complete(lutron())