#!/usr/bin/env python3 # Python Stream Deck Library # Released under the MIT license # # dean [at] fourwalledcubicle [dot] com # www.fourwalledcubicle.com # # Example script showing basic library usage - updating key images with new # tiles generated at runtime, and responding to button state change events. """ This is my annotation of this file to get an understanding of how we are supposed to interact with the Stream Deck. Note that I will for simplicity's sake only consider the original stream deck for code expansion. Note that if you ever see something like self.device, this refers to the HIDAPI object """ import os import threading from PIL import Image, ImageDraw, ImageFont from StreamDeck.DeviceManager import DeviceManager from StreamDeck.ImageHelpers import PILHelper # Folder location of image assets used by this example. ASSETS_PATH = os.path.join(os.path.dirname(__file__), "Assets") # Generates a custom tile with run-time generated text and custom image via the # PIL module. def render_key_image(deck, icon_filename, font_filename, label_text): # Resize the source image asset to best-fit the dimensions of a single key, # leaving a margin at the bottom so that we can draw the key title # afterwards. icon = Image.open(icon_filename) image = PILHelper.create_scaled_key_image(deck, icon, margins=[0, 0, 20, 0]) # Load a custom TrueType font and use it to overlay the key index, draw key # label onto the image a few pixels from the bottom of the key. draw = ImageDraw.Draw(image) font = ImageFont.truetype(font_filename, 14) draw.text((image.width / 2, image.height - 5), text=label_text, font=font, anchor="ms", fill="white") return PILHelper.to_native_key_format(deck, image) # Returns styling information for a key based on its position and state. def get_key_style(deck, key, state): # Last button in the example application is the exit button. exit_key_index = deck.key_count() - 1 if key == exit_key_index: name = "exit" icon = "{}.png".format("Exit") font = "Roboto-Regular.ttf" label = "Bye" if state else "Exit" else: name = "emoji" icon = "{}.png".format("Pressed" if state else "Released") font = "Roboto-Regular.ttf" label = "Pressed!" if state else "Key {}".format(key) return { "name": name, "icon": os.path.join(ASSETS_PATH, icon), "font": os.path.join(ASSETS_PATH, font), "label": label } # Creates a new key image based on the key index, style and current key state # and updates the image on the StreamDeck. def update_key_image(deck, key, state): # Determine what icon and label to use on the generated key. key_style = get_key_style(deck, key, state) # Generate the custom key with the requested image and label. image = render_key_image(deck, key_style["icon"], key_style["font"], key_style["label"]) # Use a scoped-with on the deck to ensure we're the only thread using it # right now. with deck: # Update requested key with the generated image. deck.set_key_image(key, image) # Prints key state change information, updates rhe key image and performs any # associated actions when a key is pressed. """ Seems that any call back function must use have the arguments deck key state """ def key_change_callback(deck, key, state): # Print new key state print("Deck {} Key {} = {}".format(deck.id(), key, state), flush=True) # Update the key image based on the new key state. update_key_image(deck, key, state) # Check if the key is changing to the pressed state. if state: key_style = get_key_style(deck, key, state) # When an exit button is pressed, close the application. if key_style["name"] == "exit": # Use a scoped-with on the deck to ensure we're the only thread # using it right now. with deck: # Reset deck, clearing all button images. deck.reset() # Close deck handle, terminating internal worker threads. deck.close() if __name__ == "__main__": """ Breakdown of the line below: What this does is it goes into DeviceManager.py, creates a giant list of all of the possible devices that it may connect to, then use the enumerate() method provided by hidapi to check if there is a device with said ids connected to the machine. We receive essentially a list of tuples. The first item is the index, the next is an OBJECT representing the StreamDeck (not directly interacting with the stuff) """ streamdecks = DeviceManager().enumerate() # So this is a list of Stream Deck objects print("Found {} Stream Deck(s).\n".format(len(streamdecks))) for index, deck in enumerate(streamdecks): # Now we are going through each device that we detected and attempt to open it and establish a connection # This example only works with devices that have screens. if not deck.is_visual(): continue """ Expansion of deck.open: self.device.open() // Open the device (this calls hidapi's open function) self._reset_key_stream() // Part of clean up to prevent corruption self._setup_reader(self._read) // I believe this sets up the communication channel with the device // Check bottom of file for explanation of self._read Expansion of deck.reset: payload = bytearray(17) payload[0:2] = [0x0B, 0x63] self.device.write_feature(payload) /************************************************/ Expansion of _reset_key_stream: payload = bytearray(self.IMAGE_REPORT_LENGTH) payload[0] = 0x02 self.device.write(payload) /************************************************/ Expansion of _setup_reader, it receives one argument, some function called "callback": if self.read_thread is not None: // Does our read thread exist? Proceed with if body if it exists self.run_read_thread = False // Interesting try: self.read_thread.join() // We wait for the read thread to exit except RuntimeError: pass if callback is not None: self.run_read_thread = True // We have run the read thread self.read_thread = threading.Thread(target=callback) // set the read thread's work self.read_thread.daemon = True // We consider the read thread a daemon??? self.read_thread.start() // Start the read thread /************************************************/ Breakdown So in our list of things that we got from line 112, 'deck' is an object that represents data about the streamdeck. so self.device.open() attempts to open the device. """ deck.open() deck.reset() print("Opened '{}' device (serial number: '{}', fw: '{}')".format( deck.deck_type(), deck.get_serial_number(), deck.get_firmware_version() )) # Set initial screen brightness to 30%. deck.set_brightness(30) # Set initial key images. for key in range(deck.key_count()): update_key_image(deck, key, False) # Register callback function for when a key state changes. """ This just modifies deck.key_callback """ deck.set_key_callback(key_change_callback) # Wait until all application threads have terminated (for this example, # this is when all deck handles are closed). for t in threading.enumerate(): try: t.join() except RuntimeError: pass """ Expansion of _read (no parameters other than self) /************************************************/ while self.run_read_thread: // While the read thread is supposed to be running try: control_states = self._read_control_states() if control_states is None: time.sleep(1.0 / self.read_poll_hz) continue if ControlType.KEY in control_states: // What this is asking is, "did we poll for any KEY changes?" I guess some stream deck devices don't have keys so they shouldn't have this check for k, (old, new) in enumerate(zip(self.last_key_states, control_states[ControlType.KEY])): if old == new: // If the state didn't change continue self.last_key_states[k] = new if self.key_callback is not None: // So we have confirmed a key change and are execting the callback self.key_callback(self, k, new) elif ControlType.DIAL in control_states: if DialEventType.PUSH in control_states[ControlType.DIAL]: for k, (old, new) in enumerate(zip(self.last_dial_states, control_states[ControlType.DIAL][DialEventType.PUSH])): if old == new: continue self.last_dial_states[k] = new if self.dial_callback is not None: self.dial_callback(self, k, DialEventType.PUSH, new) if DialEventType.TURN in control_states[ControlType.DIAL]: for k, amount in enumerate(control_states[ControlType.DIAL][DialEventType.TURN]): if amount == 0: continue if self.dial_callback is not None: self.dial_callback(self, k, DialEventType.TURN, amount) elif ControlType.TOUCHSCREEN in control_states: if self.touchscreen_callback is not None: self.touchscreen_callback(self, *control_states[ControlType.TOUCHSCREEN]) except TransportError: self.run_read_thread = False self.close() """ """ Expansion of _read_control_states (note that implementation changes with different models) /************************************************/ states = self.device.read(1 + self.KEY_COUNT) // we are reading self.KEY_COUNT + 1 bytes if states is None: // This means we failed to read?? return None states = states[1:] We skip the first byte return { // We are returning a dict I think ControlType.KEY: [bool(states[s]) for s in map(self._convert_key_id_origin, range(self.KEY_COUNT))] } ControlType is a user defined data structure that has the following fields: KEY DIAL TOUCHSCREEN KEY's value is simply set to 1. So I believe we just get an array of true and false's. """