diff options
-rw-r--r-- | annotated_example.py | 96 |
1 files changed, 88 insertions, 8 deletions
diff --git a/annotated_example.py b/annotated_example.py index f8e4400..98d77b1 100644 --- a/annotated_example.py +++ b/annotated_example.py @@ -11,6 +11,7 @@ # tiles generated at runtime, and responding to button state change events. """ This is my annotation of this file to get an understanding of how we are supposed to interact with the Stream Deck. Note that I will for simplicity's sake only consider the original stream deck for code expansion. +Note that if you ever see something like self.device, this refers to the HIDAPI object """ import os @@ -84,6 +85,12 @@ def update_key_image(deck, key, state): # Prints key state change information, updates rhe key image and performs any # associated actions when a key is pressed. +""" +Seems that any call back function must use have the arguments +deck +key +state +""" def key_change_callback(deck, key, state): # Print new key state print("Deck {} Key {} = {}".format(deck.id(), key, state), flush=True) @@ -128,32 +135,36 @@ if __name__ == "__main__": self._reset_key_stream() // Part of clean up to prevent corruption self._setup_reader(self._read) // I believe this sets up the communication channel with the device + // Check bottom of file for explanation of self._read Expansion of deck.reset: payload = bytearray(17) payload[0:2] = [0x0B, 0x63] self.device.write_feature(payload) + /************************************************/ Expansion of _reset_key_stream: payload = bytearray(self.IMAGE_REPORT_LENGTH) payload[0] = 0x02 self.device.write(payload) + /************************************************/ - Expansion of _setup_reader: - if self.read_thread is not None: - self.run_read_thread = False + Expansion of _setup_reader, it receives one argument, some function called "callback": + if self.read_thread is not None: // Does our read thread exist? Proceed with if body if it exists + self.run_read_thread = False // Interesting try: - self.read_thread.join() + self.read_thread.join() // We wait for the read thread to exit except RuntimeError: pass if callback is not None: - self.run_read_thread = True - self.read_thread = threading.Thread(target=callback) - self.read_thread.daemon = True - self.read_thread.start():write + self.run_read_thread = True // We have run the read thread + self.read_thread = threading.Thread(target=callback) // set the read thread's work + self.read_thread.daemon = True // We consider the read thread a daemon??? + self.read_thread.start() // Start the read thread + /************************************************/ Breakdown So in our list of things that we got from line 112, 'deck' is an object that represents data about the streamdeck. @@ -174,6 +185,9 @@ if __name__ == "__main__": update_key_image(deck, key, False) # Register callback function for when a key state changes. + """ + This just modifies deck.key_callback + """ deck.set_key_callback(key_change_callback) # Wait until all application threads have terminated (for this example, @@ -183,3 +197,69 @@ if __name__ == "__main__": t.join() except RuntimeError: pass + +""" +Expansion of _read +/************************************************/ +while self.run_read_thread: // While the read thread is supposed to be running + try: + control_states = self._read_control_states() + if control_states is None: + time.sleep(1.0 / self.read_poll_hz) + continue + + if ControlType.KEY in control_states: // What this is asking is, "did we poll for any KEY changes?" I guess some stream deck devices don't have keys so they shouldn't have this check + for k, (old, new) in enumerate(zip(self.last_key_states, control_states[ControlType.KEY])): + if old == new: // If the state didn't change + continue + + self.last_key_states[k] = new + + if self.key_callback is not None: // So we have confirmed a key change and are execting the callback + self.key_callback(self, k, new) + + elif ControlType.DIAL in control_states: + if DialEventType.PUSH in control_states[ControlType.DIAL]: + for k, (old, new) in enumerate(zip(self.last_dial_states, control_states[ControlType.DIAL][DialEventType.PUSH])): + if old == new: + continue + + self.last_dial_states[k] = new + + if self.dial_callback is not None: + self.dial_callback(self, k, DialEventType.PUSH, new) + + if DialEventType.TURN in control_states[ControlType.DIAL]: + for k, amount in enumerate(control_states[ControlType.DIAL][DialEventType.TURN]): + if amount == 0: + continue + + if self.dial_callback is not None: + self.dial_callback(self, k, DialEventType.TURN, amount) + + elif ControlType.TOUCHSCREEN in control_states: + if self.touchscreen_callback is not None: + self.touchscreen_callback(self, *control_states[ControlType.TOUCHSCREEN]) + + except TransportError: + self.run_read_thread = False + self.close() +""" +""" +Expansion of _read_control_states (note that implementation changes with different models) +/************************************************/ +states = self.device.read(1 + self.KEY_COUNT) // we are reading self.KEY_COUNT + 1 bytes +if states is None: // This means we failed to read?? + return None + +states = states[1:] We skip the first byte +return { // We are returning a dict I think + ControlType.KEY: [bool(states[s]) for s in map(self._convert_key_id_origin, range(self.KEY_COUNT))] +} +ControlType is a user defined data structure that has the following fields: + KEY + DIAL + TOUCHSCREEN +KEY's value is simply set to 1. So I believe we just get an array of true and false's. +""" + |