====== BLOCKER CODE - GPL3 Licence ====== The Python BLOCKER! code is [[:brainbox:young-creators:technology-blocker:blocker-code|available here]] under the [[https://www.gnu.org/licenses/gpl-3.0.en.html|GPL3 Open Source License]] (Includes enhancements to control 433MHZ RC sitch + keyfob devices) #!/usr/bin/env python3 #-*-coding: utf-8 -*- # ----------------------------------------------------------------- # Last Updated: 15 AUG 2017 # Modified view.js so that Ctrl key will clear/redraw screen # ----------------------------------------------------------------- # ----------------------------------------------------------------- """ BLOCKER! Monitors state of matrix rows & columns, returns X Y co-ordinates when row/column intersections (points) are active. Co-ordinates are used to simulate a virtual mouse cursor + butn to add notes to a Chrome browser-based sound sequencer & synth. """ __author__ = 'schools.nsw@gmail.com (Newsy Wales)' __licence__ = 'GPL3' # ----------------------------------------------------------------- # ---------------------------------------- # HELP (references)... # ---------------------------------------- # 1. GPIO pigpio code: http://abyz.co.uk/rpi/pigpio/python.html # 2. Threading: https://www.raspberrypi.org/forums/viewtopic.php?f=32&t=137364&p=915928 # 3. Python data types: http://www.diveintopython3.net/native-datatypes.html # 4. Dictionaries: http://www.pythonforbeginners.com/dictionary/how-to-use-dictionaries-in-python/ # 5. Sets: https://en.wikibooks.org/wiki/Python_Programming/Sets # ---------------------------------------- # Simple matrix switch examples: # ---------------------------------------- # 1. https://raspberrypi.stackexchange.com/questions/14035/8x8-matrix-of-buttons # 2. http://crumpspot.blogspot.com.au/2013/05/using-3x4-matrix-keypad-with-raspberry.html # ---------------------------------------- # IMPORTANT TO ENSURE FULL MOUSE POSITION SYNC & FULL PAGE ZOOM: # ---------------------------------------- # Install Chrome extension iMove or Autozoom to always maximise window content # http://www.tothepc.com/archives/zoom-in-webpages-in-google-chrome/ # ---------------------------------------- # Reference: Sending shortcut keyboard commands to Chrome... # ---------------------------------------- # Chrome short-cuts... # - Close Chrome [Ctrl + Shift + q] # - Open home page in the current tab [Alt + Home] # - Return all on page to default size [Ctrl + 0] # - Zoom larger chrome (Ctl and +) [Ctrl + plus] # - Zoom smaller chrome (Ctl and - [Ctrl + minus] # - Redo (clear + redraw default notes) [Shift + Ctrl + z] # - Undo (clear notes and leave blank) [Ctrl + z] # - Maximise the current window [Alt + space + x] # - Minimise the current window [Alt + space + n] # - Toggle bookmarks bar [Ctrl + Shift + b] # + Toggle full-screen mode on or off [F11] # + Send top of page to top of window [Home] # + Go to bottom of web page [End] # + Jump to the address bar (Omnibox) [F6] # + Reload the current page [F5] or [Ctrl + r] # + Close the current TAB [Ctrl + F4] # + Reload, ignoring cached content [Shift + F5] or [Ctrl + Shift + r] # + Scroll window contents to the left! [Right] # + Scroll window contents to the right![Left] # + Scroll contents of the window up [Up] # + Scroll contents of the window down [Down] # # Synthogram short-cuts... # + Toggle play/pause key space # + redo (recover last addition) key Ctrl # + undo (remove last addition) key Shift | z # + undo (remove last 3 additions) key z z z # + clear (clear screen completely) key Delete # # To clear screen but restore clean defaults, do lots of 'undo's, then one 'redo' # before any notes are entered - Then, do a single 'undo', 'redo' [key z Ctrl] anytime.... # # Javascript key codes... # https://www.cambiaresearch.com/articles/15/javascript-char-codes-key-codes # ------------------------------------------ # For os_exit, clear, external commands, json config files... import os import time import subprocess from subprocess import Popen, PIPE # ---------------------------------------- # Store time when script started... # ---------------------------------------- start_time = time.asctime(time.localtime(time.time())) # ---------------------------------------- # Library to generate random sounds/images... # ---------------------------------------- from random import randint, randrange # ---------------------------------------- # Where xdotool does not work with all key combos, use pyuserinput... # Delay between keystrokes. Default is 12ms # xdotool --delay milliseconds # xdotool list valid key names... # xev -event keyboard # ---------------------------------------- # try: from pykeyboard import PyKeyboard from pymouse import PyMouse # # # Initialise mouse + keyboard classes... k = PyKeyboard() m = PyMouse() # ---------------------------------------- # Are DEBUG messages printed to screen (or not)... # ---------------------------------------- # Display error messages to std out (True|1=on False|0=off)... # DEBUG = False DEBUG= True # PRINTME = False # PRINTME = True # DATABASE = '/tmp/flaskr.db' # SECRET_KEY = 'development key' # ---------------------------------------- # ---------------------------------------- # IMPORTANT - Raspberry Pi pigpio support: # ---------------------------------------- # This script may be run on a Raspberry Pi or on any OS # that can connect to a remote Raspberry PI using pigpiod. # Raspberry Pi pigpio uses GPIOnn BCM numbering convention: # Documentation: http://abyz.co.uk/rpi/pigpio/index.html # http://elinux.org/RPi_Low-level_peripherals#Model_A.2B.2C_B.2B_and_B2 # ---------------------------------------- # ---------------------------------------- # 'False' if connecting to remote Raspberry Pi socket. # ---------------------------------------- IS_LOCAL_PI = False # ---------------------------------------- # pigpiod _433 keyfob module... # ---------------------------------------- try: import pigpio import _433 if DEBUG: print("\n" + '='*40) print("START: {0}" .format(start_time)) print('='*40) pass except RuntimeError: print("Error importing pigpio! Is pigpiod daemon running?") # ---------------------------------------- # ---------------------------------------- # Specify local/remote pigpio ipaddress + socket... # ---------------------------------------- # If this device is a Raspberry Pi, connect direct to local pigpiod daemon... if IS_LOCAL_PI: pi = pigpio.pi() else: # Connect to REMOTE raspberry pi configured & running pigpiod daemon via socket... # The local machine is NOT a Raspberry Pi.... remote_pi_ip = "192.168.1.25" remote_pi_port = "8888" print("Connecting to remote pigpiod daemon ", remote_pi_ip) pi = pigpio.pi(str(remote_pi_ip), int(remote_pi_port)) # ---------------------------------------- # ---------------------------------------- # Get local ipaddress - NOTE: Must use correct ifname [eth0/eno1/...] # ---------------------------------------- # Optional - Port for Flask/webserver if not using default port number (80)... localport = 88 # Convert localport integer to string (int required below)... hostname = "192.168.1.29:88" # ---------------------------------------- # Optional - Address of remote IP camera/video stream... # ---------------------------------------- videohost = "192.168.1.25" # ---------------------------------------- # Initialise loop count + timer variables... # ---------------------------------------- count = 0 loop_num = 0 sound_num = 0 max_elapsed_time = 0 ptime = time.process_time() # ---------------------------------------- # Exit if we fail to connect to pigpiod socket after n tries... # ---------------------------------------- while not pi.connected: count+=1 print(count) time.sleep(2.0) if count > 5: print("ERROR - Raspberry Pi pigpio not connected to {0}\n =============================\n".format(remote_pi_ip)) exit() else: if DEBUG: print("SUCCESS - Raspberry Pi pigpio socket connected to {0}".format(remote_pi_ip)) # Optionally, disable connection warnings... # pigpio.exceptions = False # ---------------------------------------- # Create a dictionary to store BLOCKER! switch PADS (INPUT/CALLBACK SIGNALS) info... # ---------------------------------------- # Dictionary to store GPIO socket GPIO numbers as INPUTs for BLOCKER! test callbacks. # On old PI's, GPIO2 + GPIO3 have a pouul-up resistor, so default input=high PUD_UP (low to turn on) pads = { 2: {'name': 'PAD CB1 [GPIO_2 p03] LOOP_1', 'note': '1', 'inout': 'INPUT', 'state': 'PUD_UP'}, 3: {'name': 'PAD CB2 [GPIO_3 p05] LOOP_2', 'note': '2', 'inout': 'INPUT', 'state': 'PUD_UP'}, 4: {'name': 'PAD CB3 [GPIO_4 p07] LOOP_3', 'note': '3', 'inout': 'INPUT', 'state': 'PUD_UP'}, 7: {'name': 'PAD CB4 [GPIO_7 p26] LOOP_4', 'note': '4', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, 8: {'name': 'PAD CB5 [GPIO_8 p24] LOOP_5', 'note': '5', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, 9: {'name': 'PAD CB6 [GPIO_9 p21] RLY_14', 'note': '14', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, 10: {'name': 'PAD CB7 [GPIO10 p19] RLY_15', 'note': '15', 'inout': 'INPUT', 'state': 'PUD_DOWN'} } # 7: {'name': 'PAD CB4 [GPIO_7 p26] RLY_12', 'note': '12', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, # 8: {'name': 'PAD CB5 [GPIO_8 p24] RLY_13', 'note': '13', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, # 9: {'name': 'PAD CB6 [GPIO_9 p21] RLY_14', 'note': '14', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, # 10: {'name': 'PAD CB7 [GPIO10 p19] RLY_15', 'note': '15', 'inout': 'INPUT', 'state': 'PUD_DOWN'} # HINT: IF COL/ROW NUMBERS OUT OF SYNC, EDIT THE 'note' number RATHER THAN RE-WIRING THE SWITCHES. # Numbers 2 and 4 may need to be transposed if using straight-through/crossover CATx cables. # ---------------------------------------- # Create a dictionary to store BLOCKER! switch matrix COLUMNS info... # ---------------------------------------- # COLUMNS - default configured as input. Internal pull-ups are enabled on each column... columns = { 11: {'name': 'BLOCKER COL 01 [GPIO11 p23] COL_1', 'note': '1', 'inout': 'OUTPUT', 'state': '0'}, 14: {'name': 'BLOCKER COL 03 [GPIO14 p08] COL_3', 'note': '3', 'inout': 'OUTPUT', 'state': '0'}, 15: {'name': 'BLOCKER COL 04 [GPIO15 p10] COL_4', 'note': '4', 'inout': 'OUTPUT', 'state': '0'}, 22: {'name': 'BLOCKER COL 02 [GPIO22 p15] COL_2', 'note': '2', 'inout': 'OUTPUT', 'state': '0'}, } # ---------------------------------------- # Create a dictionary to store BLOCKER! switch matrix ROWS info... # ---------------------------------------- # ROWS - default configured as input. Internal pull-ups are enabled on each row... rows = { 23: {'name': 'BLOCKER ROW 01 [GPIO23 p16] ROW_1', 'note': '1', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, 24: {'name': 'BLOCKER ROW 02 [GPIO24 p18] ROW_2', 'note': '2', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, 25: {'name': 'BLOCKER ROW 03 [GPIO25 p22] ROW_3', 'note': '3', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, 27: {'name': 'BLOCKER ROW 04 [GPIO27 p13] ROW_4', 'note': '4', 'inout': 'INPUT', 'state': 'PUD_DOWN'}, } # ---------------------------------------- # INITIALISE local/remote Rasberry Pi GPIOs # Read each GPIO from above dictionaries and set them to a nominated default state... # ---------------------------------------- def bbps_set_gpios(gpio_type="unknown"): for gpio in gpio_type: gpio = int(gpio) if DEBUG: print("Config: GPIO{0} {1} {2}".format(gpio_type[gpio]['inout'], gpio, gpio_type[gpio]['state'])) # Set output initially LOW (0)... if gpio_type[gpio]['inout'] == "OUTPUT" and gpio_type[gpio]['state'] == "0": pi.set_mode(gpio, pigpio.OUTPUT) pi.write(gpio, 0) elif gpio_type[gpio]['inout'] == "OUTPUT" and gpio_type[gpio]['state'] == "1": pi.set_mode(gpio, pigpio.OUTPUT) pi.write(gpio, 1) # Set input default [HIGH (1)|LOW (0)|PUD_UP|PUD_DOWN] using software pull-up/down resistor... elif gpio_type[gpio]['inout'] == "INPUT" and gpio_type[gpio]['state'] == "PUD_DOWN": pi.set_mode(gpio, pigpio.INPUT) pi.set_pull_up_down(gpio, pigpio.PUD_DOWN) elif gpio_type[gpio]['inout'] == "INPUT" and gpio_type[gpio]['state'] == "PUD_UP": pi.set_mode(gpio, pigpio.INPUT) pi.set_pull_up_down(gpio, pigpio.PUD_UP) else: print("ERROR: GPIO rows channel configuration is incorrect") # ---------------------------------------- # Send 433 RF codes via pigpiod _433 module... # ---------------------------------------- # 433 transmit on GPIO17 [pin11]... gpio_433_tx=17 # 433 receive on GPIO18 [pin12]... gpio_433_rx=18 # Set the 433 RF GPIOs to correct RX/TX mode... pi.set_mode(gpio_433_tx, pigpio.OUTPUT) pi.set_mode(gpio_433_rx, pigpio.INPUT) # ---------------------------------------- # Absolute width + height of matrix grid (area where notes displayed on screen) in pixels. # Settings can be modified to define/limit area on screen where notes are added... # ---------------------------------------- # Define web page size (in pixels) of matrix grid where notes are drawn on screen. # These are absolute starting values that may be zoomed or compressed by other functions... grid_width = 512 grid_height = 240 # ---------------------------------------- # Limit area where notes are drawn into grid space on screen. # Adjust grid size to tweak placement of notes on screen... # ---------------------------------------- # A width value of 0.66 restrict notes to only left & centre parts of grid space... # grid_width = round(grid_width * 0.66) # A height value of 0.5 means notes only placed in top half of grid on screen... # grid_height = round(grid_height * 0.6) grid_height = round(grid_height * 0.6) # ---------------------------------------- # The max_count number of notes added before screen refresh... # ---------------------------------------- note_count = 0 max_notes = 20 # ---------------------------------------- # Set matrix grid/cell dimensions... # ---------------------------------------- # Get the size of the matrix... total_cols = int(len(columns)) total_rows = int(len(rows)) # ---------------------------------------- # If monitor screen display is zoomed (say 960px page size zoomed to 200%)... # ---------------------------------------- zoom = 2 # ---------------------------------------- # Pixels from top left of browser window to top left corner of the on-screen matrix grid... # ---------------------------------------- x_offset = 125 * zoom y_offset = 135 * zoom # ---------------------------------------- # Width & height (in pixels) of the grid where musical note(s) displayed, say 512px by 240px. # The grid is sub-divided into 'cells' whose size depends on zoom & number of columns & rows... # ---------------------------------------- # To place note, get size of each cell (width of each col x height of each row)... cell_width = round((grid_width * zoom) / total_cols) cell_height = round((grid_height * zoom) / total_rows) # ---------------------------------------- # Reduce x & y values by half cell width/height to align cursor near centre & middle of cell... # ---------------------------------------- align_w = round(cell_width * 0.5) align_h = round(cell_height * 0.5) if DEBUG: print("\nScreen grid dimensions: Total COLS[X]:{0} ROWS[Y]:{1} Each CELL[W:{2}px x H:{3}px] [X_OFF:{4} Y_OFF:{5}] ZOOM:{6}".format(total_cols, total_rows, cell_width, cell_height, x_offset, y_offset, zoom)) # ---------------------------------------- # Record each switch turned on in the current session (reset/clear when screen refreshed)... # ---------------------------------------- set_session_on = set() # ---------------------------------------- # Keep track of the start time, number of notes added & timeout for screen refresh... # ---------------------------------------- now = time.time() # ---------------------------------------- # Refresh screen & reset to default sounds after refresh_timeout (seconds)... # ---------------------------------------- # timeout = now + 60*3 # refresh_timeout = 180 refresh_timeout = 90 timeout = now + refresh_timeout # Initialise time record of the most recent callback event (for any callback)... time_locked = now # ---------------------------------------- # THIS BLOCK FOR REFERENCE ONLY - NOT USED: # ---------------------------------------- # Define 1st threaded callback function... # GPIO 0-31 The GPIO which has changed state # level 0-2 0 = change to low (a falling edge) # 1 = change to high (a rising edge) # 2 = no level change (a watchdog timeout) # tick 32 bit The number of microseconds since boot (resets every 72mins) # # A default 'tally' callback is provided which simply counts edges. # The count may be retrieved by calling the tally function. # The count may be reset to zero by calling the reset_tally function. # ---------------------------------------- # ---------------------------------------- # pigpio(callbacks) call user supplied function whenever edge detected on specified GPIO... # ---------------------------------------- # ---------------------------------------- # 433 pigpio(callbacks) call user supplied function whenever edge detected on specified GPIO... # --------------------------------------- def bbps_receive_rf(gpio_433_rx='18'): if DEBUG: print("\nReceive GPIO{0}".format(gpio)) gpio = int(gpio_433_rx) _433.rx(pi, gpio=gpio_433_rx, callback=rx_callback) def bbps_send_rf_32bit(gpio_433_tx='17', code='3850831082'): # Command example: ./bbps-433D-tx-rx -h 127.0.0.1 -t14 -b 32 -g 8050 -0 255 -1 725 -x 3 3850831082 gpio = int(gpio_433_tx) tx = _433.tx(pi, gpio=gpio, repeats=3, bits=32, gap=8050, t0=255, t1=725) if DEBUG: print("\tSend rf_32bit GPIO{0} TX:{1} CODE: {2}".format(gpio, tx, code)) tx.send(int(code)) tx.cancel() cb_done = True return cb_done def bbps_send_rf_24bit(gpio_433_tx='17', code='5592512'): # Command example: ./bbps-433D-tx-rx -h 127.0.0.1 -t14 -b 24 -g 9000 -0 300 -1 900 -x 3 5592512 gpio = int(gpio_433_tx) tx = _433.tx(pi, gpio=gpio, repeats=4, bits=24, gap=8050, t0=255, t1=725) if DEBUG: print("~~~ Send rf_24bit GPIO{0} TX:{1} CODE: {2}".format(gpio, tx, code)) tx.send(int(code)) tx.cancel() cb_done = True return cb_done def rx_callback(code, bits, gap, t0, t1): # code = int(code) if DEBUG: print("code={} bits={} (gap={} t0={} t1={})".format(code, bits, gap, t0, t1)) # pause() # ---------------------------------------- # ---------------------------------------- # Define one-shot SOUNDS callback function... # ---------------------------------------- def bbps_callback_sounds(gpio='3', level='0', tick='1', cb_num='3', tick_last='5', time_last='10'): # Keep a running total of events... global sound_num, time_locked sound_num += 1 # Get elapsed time from pigpiod callback function... tick_delta = int(tick) - tick_last cb_num = int(cb_num) # Get & store current time/interval in microseconds... time_now = time.time() time_delta = time_now - time_last lock_delta = time_now - time_locked # Ignore any repeat event until timeout (interval since last event) expires. # These are 1 -3 second sounds, so set timeout to more than 1 second ( > 1000000 microseconds)... cb_timeout = 2.25 if DEBUG: print("\n\t*** Now={0} Last={1} Delta={2} Timeout={3}".format(time_now, time_last, time_delta, cb_timeout)) # Flag to return and notify if callback done... cb_done = False # if time_delta > cb_timeout and lock_delta > cb_timeout: if time_delta > cb_timeout: if DEBUG: print("\tCallback_{0} TIMEOUT PASSED: [TIME_DELTA={1} > {2}] TICK_DELTA={3} LEVEL={4}".format(cb_num, time_delta, cb_timeout, tick_delta, level)) # Format leading zero for all numbers with less than 2 digits... # r = randint(1,8) # file = "sound-{0:02d}.mp3".format(str(r) # file = "sound-{0:02d}.mp3".format(cb_num) file = "sound-{0:02d}.mp3".format(cb_num) path = '/opt/bbps/static/snd/mp3/' soundfile = path + file # ---------------------------------------- # If short duration sound file, loop mutiple times (mjpg123 --loop 3)... # ---------------------------------------- # if cb_num > 7 : # subprocess.Popen(['mpg123', '-q', '--loop', '2', soundfile], close_fds=True) # else: # subprocess.Popen(['mpg123', '-q', soundfile], close_fds=True) subprocess.Popen(['mpg123', '-a', 'hw:1,0', '-q', soundfile], close_fds=True) if DEBUG: print("\t+++ CALLBACK_{0} PASSED - EDGE DETECTED on GPIO{1} loops {2} event={3}".format(cb_num, gpio, file, sound_num)) # Update time of most recent successful callback event (usually only update for loops)... # time_locked = time_now status = "Callback_{0} on GPIO{1} clicked".format(cb_num, gpio) cb_done = True else: if DEBUG: print("\t--- Callback_{0} BLOCKED: [TIME_DELTA={1} & LOCK_DELTA {2} < CB_TIMEOUT {3}] TICK_DELTA={4}".format(cb_num, time_delta, lock_delta, cb_timeout, tick_delta)) # Flag activation done True/False... return cb_done # ---------------------------------------- # ---------------------------------------- # Define double LOOPS callback function... # ---------------------------------------- def bbps_callback_loops(gpio='2', level='0', tick='1', cb_num='1', tick_last='1', time_last='1'): # Keep a running total of events... global loop_num, time_locked loop_num += 1 # Get elapsed time from pigpiod callback function... tick_delta = int(tick) - tick_last cb_num = int(cb_num) # Get & store current time/interval in microseconds... time_now = time.time() time_delta = time_now - time_last lock_delta = time_now - time_locked # Ignore any repeat event until timeout (interval since last event) expires. # This is a 5-10 second sound loop played twice, so set timeout to more than 15 million microseconds... cb_timeout = 20.6 if DEBUG: print("\n\t*** Now={0} Last={1} Time_Delta={2} Lock_Delta={3} CB_Timeout={4}".format(time_now, time_last, time_delta, lock_delta, cb_timeout)) # Flag to return and notify if callback done... cb_done = False # if tick_delta > cb_timeout or interval > cb_timeout: if time_delta > cb_timeout and lock_delta > cb_timeout: if DEBUG: print("\tCALLBACK_{0} PASSED: [TIME_DELTA={1} & LOCK_DELTA {2} > CB_TIMEOUT{3}] TICK_DELTA={4} LEVEL={5} EVENT={6}".format(cb_num, time_delta, lock_delta, cb_timeout, tick_delta, level, loop_num)) # Format leading zero for all numbers with less than 2 digits... # r = randint(1,8) # file = "loop-{0:02d}.mp3".format(str(r) # file = "loop-{0:02d}.mp3".format(cb_num) file = "loop-{0:02d}.mp3".format(cb_num) path = '/opt/bbps/static/loop/' soundfile = path + file # ---------------------------------------- # If short duration sound file, loop mutiple times (mjpg123 --loop 3)... # ---------------------------------------- if cb_num == 3 : subprocess.Popen(['mpg123', '-a', 'hw:1,0', '-q', '--loop', '3', soundfile], close_fds=True) elif cb_num == 4 : subprocess.Popen(['mpg123', '-a', 'hw:1,0', '-q', '--loop', '2', soundfile], close_fds=True) elif cb_num == 5 : subprocess.Popen(['mpg123', '-a', 'hw:1,0', '-q', '--loop', '3', soundfile], close_fds=True) else: subprocess.Popen(['mpg123', '-a', 'hw:1,0', '-q', soundfile], close_fds=True) if DEBUG: print("\t+++ CALLBACK_{0} PASSED - EDGE DETECTED on GPIO{1} loops {2} event={3}".format(cb_num, gpio, file, loop_num)) # Update time of most recent successful callback event... time_locked = time_now time.sleep(0.5) cb_done = True # ---------------------------------------- else: if DEBUG: print("\t--- Callback_{0} BLOCKED: [TIME_DELTA={1} & LOCK_DELTA {2} < CB_TIMEOUT {3}] TICK_DELTA={4}".format(cb_num, time_delta, lock_delta, cb_timeout, tick_delta)) # Flag activation done True/False... return cb_done # ---------------------------------------- # ---------------------------------------- # Filter signal on designated callback GPIO(s)... # NOTE: To avoid multiple events when event held, prefer 'glitch' to 'noise' filter # ---------------------------------------- # Syntax: set_glitch_filter(user_gpio, steady)... # Level changes on GPIO ignored until level has been stable for more than steady microseconds. # The level is then reported. Level changes of less than steady microseconds are ignored. # pi.set_glitch_filter(11, 200) # Syntax: set_noise_filter(user_gpio, steady, active)... # Level changes on GPIO are ignored until a level has been stable for steady microseconds # Level changes on the GPIO are then reported for active microseconds, then process repeats # pi.set_noise_filter(11, 1000, 5000) # pi.set_noise_filter(11, 500, 2000) # Set glitch/noise filter(s) for each pad callback... for gpio in pads: gpio = int(gpio) # pi.set_glitch_filter(gpio, 10000) # pi.set_noise_filter(11, 500, 2000) pi.set_glitch_filter(gpio, 50000) pass # ---------------------------------------- # ---------------------------------------- # Initialise callback function timeouts (in micro secs). # ---------------------------------------- # Assign same value to three different variables (names)... # To discover if two names are naming the same object, use the 'is' operator: # >>> a=b=c=10 >>> a is b True # # Callbacks use native pigpiod ticks (in microseconds). Others use 'time.time()'. # Callbacks 1-3 are approx 24 second (24 million micro seconds) loops. # Set initial startup timeouts (minimum timeout between each callback event). # These default values will be re-set on first callback after initial startup... # cb1_tick_last = cb2_tick_last = cb3_tick_last = cb4_tick_last = cb5_tick_last = 24000000 cb12_tick_last = cb13_tick_last = cb14_tick_last = cb15_tick_last = 20000000 # Establish t1 for timeout using t2-t1 time_delta microseconds via browser... cb1_time_last = cb2_time_last = cb3_time_last = cb4_time_last = cb5_time_last = time.time() cb12_time_last = cb13_time_last = cb14_time_last = cb15_time_last = time.time() # ---------------------------------------- # Define 1st LOOP threaded callback function... # ---------------------------------------- def bbps_callback_1(gpio='2', level='0', tick='1'): global cb1_tick_last, cb1_time_last # Run loops callback function... cb_done = bbps_callback_loops(gpio, level, tick, '1', cb1_tick_last, cb1_time_last) # Update globals & return status... if cb_done: cb1_tick_last = int(tick) cb1_time_last = time.time() # --------------------------------------- if DEBUG: print("\tCALLBACK_1 done = {0}\n".format(cb_done)) return cb_done # --------------------------------------- # ---------------------------------------- # Define 2nd LOOP threaded callback function... # ---------------------------------------- def bbps_callback_2(gpio='3', level='0', tick='1'): global cb2_tick_last, cb2_time_last # Run loops callback function... cb_done = bbps_callback_loops(gpio, level, tick, '2', cb2_tick_last, cb2_time_last) # Update globals & return status... if cb_done: cb2_tick_last = int(tick) cb2_time_last = time.time() # --------------------------------------- if DEBUG: print("\tCALLBACK_2 done = {0}".format(cb_done)) return cb_done # --------------------------------------- # ---------------------------------------- # Define 3rd LOOP threaded callback function... # ---------------------------------------- def bbps_callback_3(gpio='4', level='0', tick='1'): global cb3_tick_last, cb3_time_last # Run loops callback function... # cb_done = bbps_callback_loops(gpio, level, tick, '3', cb3_tick_last, cb3_time_last) cb_done = bbps_callback_sounds(gpio, level, tick, '3', cb3_tick_last, cb3_time_last) # Update globals & return status... if cb_done: cb3_tick_last = int(tick) cb3_time_last = time.time() # --------------------------------------- if DEBUG: print("\tCALLBACK_3 done = {0}\n".format(cb_done)) return cb_done # --------------------------------------- # ---------------------------------------- # Define 4th LOOP threaded callback function... # ---------------------------------------- def bbps_callback_4(gpio='7', level='0', tick='1'): global cb4_tick_last, cb4_time_last # Run loops callback function... # cb_done = bbps_callback_loops(gpio, level, tick, '4', cb4_tick_last, cb4_time_last) cb_done = bbps_callback_sounds(gpio, level, tick, '4', cb4_tick_last, cb4_time_last) # Update globals & return status... if cb_done: cb4_tick_last = int(tick) cb4_time_last = time.time() # --------------------------------------- if DEBUG: print("\tCALLBACK_4 done = {0}\n".format(cb_done)) return cb_done # --------------------------------------- # ---------------------------------------- # Define 5th LOOP threaded callback function... # ---------------------------------------- def bbps_callback_5(gpio='8', level='0', tick='1'): global cb5_tick_last, cb5_time_last # Run loops callback function... # cb_done = bbps_callback_loops(gpio, level, tick, '5', cb5_tick_last, cb5_time_last) cb_done = bbps_callback_sounds(gpio, level, tick, '5', cb5_tick_last, cb5_time_last) # Update globals & return status... if cb_done: cb5_tick_last = int(tick) cb5_time_last = time.time() # --------------------------------------- if DEBUG: print("\tCALLBACK_5 done = {0}\n".format(cb_done)) return cb_done # --------------------------------------- # ---------------------------------------- # Define 12th RF RELAY threaded callback function... # ---------------------------------------- def bbps_callback_12(gpio='7', level='0', tick='1'): global cb12_tick_last, cb12_time_last cb_done = False # ----------------------------------------------- # AK-RK045-12 4CH learning code relay - Keyfob buttons Unit #1 - 24 bit + default pins ... # ----------------------------------------------- device_type = 'rf_relays' device_id = 'relay_a' # Relays are toggle ON/OFF... device_action = 'on' # ----------------------------------------------- # Send RF code using pigpiod... # ----------------------------------------------- # Get & store current time/interval in microseconds... time_now = time.time() time_delta = time_now - cb12_time_last cb_timeout = 1.25 # ----------------------------------------------- # Iterate JSON dictionary to get code & send via RF (433MHZ) # "A": "8475137", "B": "8475138", "C": "8475140", "D": "8475144" # ----------------------------------------------- if time_delta > cb_timeout: # send_code = data[device_type][device_id][0][device_action] send_code = "8475137" if DEBUG: print("\nSend 433Mhz code for {0} {1} switch {2} with code: {3}".format(device_type, device_id, device_action, send_code)) cb_done = bbps_send_rf_24bit(gpio_433_tx, send_code) cb_done = True else: if DEBUG: print("CB12 timeout fail") if cb_done: cb12_tick_last = int(tick) cb12_time_last = time.time() return cb_done # ---------------------------------------- # Define 13th RF RELAY threaded callback function... # ---------------------------------------- def bbps_callback_13(gpio='25', level='0', tick='1'): global cb13_tick_last, cb13_time_last cb_done = False cb_timeout = 1.25 # AK-RK045-12 4CH learning code relay - Keyfob buttons Unit #1 - 24 bit + default pins ... device_type = 'rf_relays' device_id = 'relay_b' device_action = 'on' # ----------------------------------------------- # Send RF code using pigpiod... # ----------------------------------------------- # Get & store current time/interval in microseconds... time_now = time.time() time_delta = time_now - cb13_time_last # ----------------------------------------------- # Iterate JSON dictionary to get code & send via RF (433MHZ) # ----------------------------------------------- if time_delta > cb_timeout: # send_code = data[device_type][device_id][0][device_action] send_code = "8475138" if DEBUG: print("\nSend 433Mhz code for {0} {1} switch {2} with code: {3}".format(device_type, device_id, device_action, send_code)) cb_done = bbps_send_rf_24bit(gpio_433_tx, send_code) cb_done = True else: if DEBUG: print("CB13 timeout fail") if cb_done: cb13_tick_last = int(tick) cb13_time_last = time.time() return cb_done # ---------------------------------------- # Define 14th RF RELAY threaded callback function... # ---------------------------------------- def bbps_callback_14(gpio='7', level='0', tick='1'): global cb14_tick_last, cb14_time_last cb_done = False cb_timeout = 1.25 # AK-RK045-12 4CH learning code relay - Keyfob buttons Unit1 24 bit + default pins ... device_type = 'rf_relays' device_id = 'relay_c' device_action = 'on' # ----------------------------------------------- # Send RF code using pigpiod... # ----------------------------------------------- # Get & store current time/interval in microseconds... time_now = time.time() time_delta = time_now - cb14_time_last # ----------------------------------------------- # Iterate JSON dictionary to get code & send via RF (433MHZ) # ----------------------------------------------- if time_delta > cb_timeout: # send_code = data[device_type][device_id][0][device_action] send_code = "8475140" if DEBUG: print("\nSend 433Mhz code for {0} {1} switch {2} with code: {3}".format(device_type, device_id, device_action, send_code)) cb_done = bbps_send_rf_24bit(gpio_433_tx, send_code) cb_done = True else: if DEBUG: print("CB14 timeout fail") if cb_done: cb14_tick_last = int(tick) cb14_time_last = time.time() return cb_done # ---------------------------------------- # Define 15th RF RELAY threaded callback function... # ---------------------------------------- def bbps_callback_15(gpio='8', level='0', tick='1'): global cb15_tick_last, cb15_time_last cb_done = False # AK-RK045-12 4CH learning code relay - Keyfob buttons Unit1 24 bit + default pins ... device_type = 'rf_relays' device_id = 'relay_d' device_action = 'on' # ----------------------------------------------- # Send RF code using pigpiod... # ----------------------------------------------- # Get & store current time/interval in microseconds... time_now = time.time() time_delta = time_now - cb15_time_last cb_timeout = 1.25 # ----------------------------------------------- # Iterate JSON dictionary to get code & send via RF (433MHZ) # ----------------------------------------------- if time_delta > cb_timeout: # send_code = data[device_type][device_id][0][device_action] send_code = "8475144" if DEBUG: print("\nSend 433Mhz code for {0} {1} switch {2} with code: {3}".format(device_type, device_id, device_action, send_code)) cb_done = bbps_send_rf_24bit(gpio_433_tx, send_code) cb_done = True else: if DEBUG: print("CB15 timeout fail") if cb_done: cb15_tick_last = int(tick) cb15_time_last = time.time() return cb_done # ---------------------------------------- # How charlieplex/multiplex is implemented in this script... # ---------------------------------------- # 1. All column & row GPIOs are first set to INPUT HIGH with PUD_UP # 2. Next, set the left-most COLUMN to OUTPUT mode LOW & then, # for each ROW, read value: If val = 0 then switch 'on'. # Reset COLUMN to INPUT mode HIGH with PUD_UP # 3. Then repeat items 1-3 for each COLUMN... # Each co-ordinate point (matrix intersection) value is calculated # in pixels. Values are tweaked/fixed & stored in a 'lookup' dictionary. # The x y values for each row and each column are read from dictionary. # New 'notes' are drawn on screen and retained until screen refreshed by # a 'timeout' or after a pre-configured number of notes have been added. # ---------------------------------------- # ---------------------------------------- # Create dictionaries to store x-y co-ordinates... # ---------------------------------------- def bbps_lookup_init(): # ---------------------------------------- # This function initialises & stores global col row co-ordinates in dictionaries.... # ---------------------------------------- global col_lookup, row_lookup if DEBUG: print("\nScreen grid dimensions: Total COLS[X]:{0} ROWS[Y]:{1} Each CELL[W:{2}px x H:{3}px] [X_OFF:{4} Y_OFF:{5}] ZOOM:{6}".format(total_cols, total_rows, cell_width, cell_height, x_offset, y_offset, zoom)) if DEBUG: print("Create lookups (hash tables) of x y co-ordinates (pixels) for each col & row number") # ---------------------------------------- # Store every x y (val: pixels) and col row (key: numbers) into dictionaries for global lookup... # ---------------------------------------- # The 'key' values are the column/row numbers corresponding with real-world matrix. # Total number of cols/rows depend on the number of entries in the GPIO columns & rows dictionaries. # Dictionary indexes start at '0', but by default, col & row KEY values start at '1': # To make sure key values start at '1', configure range with start=1 & stop=len()+1 ... col_lookup = {x: ((x * cell_width ) + x_offset - align_w) for x in range(1, len(columns)+1)} row_lookup = {y: ((y * cell_height) + y_offset - align_h) for y in range(1, len(rows)+1)} # pprint.pprint(col_lookup) # pprint.pprint(row_lookup) if DEBUG: print('='*40) if DEBUG: print("X COL LOOKUP:", col_lookup) if DEBUG: print("Y ROW LOOKUP:", row_lookup) if DEBUG: print('='*40) # ---------------------------------------- # Get x y values (in pixels) for specified col row [number]... # ---------------------------------------- x = col_lookup[1] y = row_lookup[1] if DEBUG: print("\nDictionary Test: Col 1={0} Row 1={1} co-ordinates (in pixels)".format(x,y)) # ---------------------------------------- # ---------------------------------------- # Check each column:row conbination... # ---------------------------------------- def bbps_multiplex(gpio_sw=14): # The % operator gives the remainder after performing integer division. # For '11 % 2', the 11 divided by 2 is 5 with a remainder of 1, so the result here is 1. # 'count % 10' returns a remainder between 0 - 9: we only print every 10th val to std-out... remainder = count % 10 PRINTME = False if DEBUG and remainder > 8: PRINTME = True # ---------------------------------------- # Initialise real-time switch tracking list(s) - (simpler) than using dictionary/set(s)... # ---------------------------------------- if PRINTME: print("Initialise & clear list/dictionary ready to store (col, row, x, y) values...") switched_on = [] new_on = [] # Clear all/any entries from the 'switched_on' list/dictionary (requires python3) # Required to compare old switch status with new status at some time later... switched_on.clear() new_on.clear() # Initialise a quit/break flag (this script exits when quit flag is True/1)... quit = False # Initialise local CRC check variable... crc = 0 # ---------------------------------------- # Cycle once through each column & each row to read on/off value for each point on matrix. # The 'rows' & 'columns' dictionaries are global & only need to read, not modify here... # ---------------------------------------- for col_gpio in columns: gpio = int(col_gpio) # ---------------------------------------- # GET EACH COLUMN NUMBER (stored in the 'note' entry in 'columns' dictionary)... # ---------------------------------------- col_num = int(columns[col_gpio]['note']) # Sequentially, switch each COLUMN GPIO from default 'INPUT PUD_DOWN' to 'OUTPUT HIGH'... # pi.set_mode(gpio, pigpio.OUTPUT) pi.write(gpio, 1) #if DEBUG: print("\tSLEEP HIGH...") # Send a trigger pulse to a GPIO. The GPIO is set to level for pulse_len microseconds # and then reset to not level - Syntax: gpio_trigger(user_gpio, pulse_len, level) # pi.gpio_trigger(gpio, 100000, 1) PRINTME = True if PRINTME: print("\nSet NAME:{0} MATRIX COL:{1} GPIO:{2} MODE:{3} [0=IN 1=OUT] CUR STATE:{4} ".format(columns[col_gpio]['name'], col_num, col_gpio, pi.get_mode(col_gpio), pi.read(col_gpio))) # Enable delay so that humans have time to read messages - For Debug only... # time.sleep(2.5) # time.sleep(10) # time.sleep(5) # ---------------------------------------- # Read each row in this column to check if row is 'on'... # ---------------------------------------- for row_gpio in rows: gpio = int(row_gpio) # ---------------------------------------- # GET EACH ROW NUMBER (stored in the 'note' entry in 'columns' dictionary)... # ---------------------------------------- row_num = int(rows[row_gpio]['note']) # ---------------------------------------- # Concatenate col+row as unique value to check if this combo already seen in this session... # ---------------------------------------- col_row_num = str(col_num)+str(row_num) # ---------------------------------------- # If GPIO state = 1 for the row, then corresponding key/switch is 'on',... # ---------------------------------------- if pi.read(row_gpio) == 1: if PRINTME: print("+++ NAME:{0} MATRIX [COL:{1} ROW:{2}] COL_GPIO:{3} ROW_GPIO:{4} MODE:{5} [0=IN 1=OUT] CUR STATE:{6}".format(rows[row_gpio]['name'], row_num, col_num, col_gpio, row_gpio, pi.get_mode(row_gpio), pi.read(row_gpio))) # ---------------------------------------- # QUIT: If row:col combo = quit flag, set flag to True to break out of while loop... # ---------------------------------------- #if col_row_num == "44" : # if DEBUG: print("Someone pressed 'quit' COL:{0} ROW;{1}".format(str(col_num), str(row_num))) # quit = True # ---------------------------------------- # Update crc value to test later if col/row combinations changed since last iteration... # ---------------------------------------- crc += int(col_row_num) #if DEBUG: # ---------------------------------------- # Get x y pixel values for specified col row from col/row_lookup dictionaries... # ---------------------------------------- x = col_lookup[col_num] y = row_lookup[row_num] # ---------------------------------------- # Append ALL switches currently 'on' to the 'switched on' tracking list.. # ---------------------------------------- switched_on.append((col_num, row_num, x, y)) # ---------------------------------------- # If 'on' but NOT already in global 'set_session_on' set... # ---------------------------------------- # Need to declare here to enable write to the global 'set_session_on' set ... global set_session_on # Is this the first time this switch has been 'on' in this session? if col_row_num not in set_session_on: # Not yet in 'set_session_on' so first, add NEW switch to 'new_on' list.. # if not DEBUG: # x = col_lookup[col_num] # y = row_lookup[row_num] new_on.append((col_num, row_num, x, y)) # After 'new_on' added, add NEW switch to global 'set_session_on' tracking set... set_session_on.add(col_row_num) # ----------------------------------------------- # Optionally alert users (audibly), that a new note has been added... # ----------------------------------------------- #cmd = ['mpg123', '-q', '/opt/bbps/static/snd/mp3/alert-01-1s.mp3'] #subprocess.Popen(cmd, close_fds=True) # # OR... call a designated callback function.... # bbps_callback_1(11) # ---------------------------------------- else: # ---------------------------------------- # No switch(es) detected as 'on', for this col & row combination... # ---------------------------------------- if PRINTME: print("--- NAME:{0} COL_GPIO:{1} ROW_GPIO:{2} MODE:{3} [0=IN 1=OUT] CUR STATE:{4}".format(rows[row_gpio]['name'], col_gpio, row_gpio, pi.get_mode(row_gpio), pi.read(row_gpio))) # ---------------------------------------- # ---------------------------------------- # !!!VERY IMPORTANT!!! At end of each loop, MUST re-set COL_GPIO default mode = LOW: # That is, either [OUTPUT mode OUPUT='0'] or [INPUT mode 'INPUT PUD_DOWN']... # ---------------------------------------- # pi.set_mode(int(col_gpio), pigpio.INPUT) # pi.set_pull_up_down(int(col_gpio), pigpio.PUD_DOWN) # pi.set_mode(int(col_gpio), pigpio.OUTPUT) pi.write(int(col_gpio), 0) # time.sleep(0.025) if PRINTME: print("Rst COLNAME:{0} GPIO:{1} MODE:{2} (0=IN 1=OUT) STATE:{3}".format( columns[col_gpio]['name'], col_gpio, pi.get_mode(col_gpio), pi.read(col_gpio) )) # ---------------------------------------- # ---------------------------------------- # Now completed one loop through all column & row combinations... # ---------------------------------------- # if PRINTME: print(" Total number of switches 'on' in this session = ",len(set_session_on)) if PRINTME: print(" Total number of switches currently 'on' = ",len(switched_on)) # Display performance info... if DEBUG: elapsed_time = time.process_time() - ptime print(" Multiplex loop process time:", elapsed_time) # ---------------------------------------- # RETURN the quit flag, pseudo crc & switch tracking values... # ---------------------------------------- # Called by: quit, crc_new, switched_on, new_on = bbps_multiplex() return quit, crc, switched_on, new_on # ---------------------------------------- # ----------------------------------------------- # Break out of loop if user pressed key (or key combo) to request 'quit'... # ----------------------------------------------- def bbps_quit(): if DEBUG: print("Doing cleanup before quit") # bbps_chrome_shortcuts(pagetitle="BLOCKER!", shortcut="F5") # time.sleep(2.0) # bbps_chrome_shortcuts(pagetitle="BLOCKER!", shortcut="Ctrl+Shift+q") # bbps_chrome_shortcuts(pagetitle="BLOCKER!", shortcut="Ctrl+Space+n") # ----------------------------------------------- # Create a pyuserinput (pykeyboard) Alt+F4 combo to close chrome (xdotool fails)... # ----------------------------------------------- k.tap_key(k.function_keys[5]) time.sleep(1.0) k.press_key(k.alt_key) k.tap_key(k.function_keys[4]) k.release_key(k.alt_key) time.sleep(1.0) # To quit, requires a 'break' in loop that we were called from... # ----------------------------------------------- # ---------------------------------------- # Call a Google Chrome keyboard shortcut... # ---------------------------------------- def bbps_chrome_shortcuts(pagetitle="BLOCKER!", shortcut="F5"): if DEBUG: print("Send Chrome shortcut {0}".format(shortcut)) # Using the 'onlyvisible' parameter = block until window 'pagetitle' is seen. # Note - the 'shortcut' variable may contain one or more space-separated keys... # cmd = ['xdotool', 'search', '--onlyvisible', '--name', 'BLOCKER!', 'windowactivate', '--sync', 'key', shortcut] cmd = ['xdotool', 'search', '--onlyvisible', '--name', pagetitle, 'windowactivate', '--sync', 'key', shortcut] subprocess.Popen(cmd, close_fds=True) # subprocess.call(cmd) # ---------------------------------------- # ---------------------------------------- # Press BLOCKER! [redraw|clear] button (to clear screen)... # ---------------------------------------- def bbps_redraw(): if DEBUG: print("Redraw by calling 'clear' or 'undo' followed by 'redo'") # cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', '--sync', 'key', 'z', 'Ctrl'] # cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', '--sync', 'key', 'Shift'] # cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', '--sync', 'key', 'Delete'] cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', '--sync', 'key', 'Ctrl'] subprocess.call(cmd) # ---------------------------------------- # ----------------------------------------------- # Show 'col, row, x, y' values for all keys currently switched_on ... # ----------------------------------------------- def bbps_status(switched_on, new_on): print(" Status check. Number of ALL switches now ON = ", len(switched_on)) for a, b, c, d in switched_on: print(a, b, c, d) print(" Status check. Number of NEW switches now ON = ", len(new_on)) for a, b, c, d in new_on: print(a, b, c, d) # We only need to read data from the global 'set_session_on' data set... print(" SET on:", set_session_on) # ---------------------------------------- # ---------------------------------------- # Add a new syth note at x y co-ordinates. # Each note added during a session is retained until refresh timeout... # ---------------------------------------- def bbps_add_note(x=654, y=321): # ---------------------------------------- # First, tweak lots of values related to screen display - Defaults should be OK... # ---------------------------------------- # Get the width and height of each cell... # global cell_width, cell_height, x_offset, y_offset # global note_count, start # Adjust co-ordinates so that cursor is placed into centre of a cell... # cell_width = round(grid_width * 0.5) # cell_height = round(grid_height * 0.5) # Reduce height & width of co-ordinates to clicked in [left|centre|right|middle|...] of a cell... # x = x - round(cell_width * 0.5) # y = y - round(cell_height * 0.5) # Tweak fixed left/right/centre adjustment to better fit cell location... # x = x - 20 # y = y - 10 # Generate a random integer to randomly adjust left/right/centre-l/centre-r, in range 1 to 4 inclusive... # r = randint(1, 2) # r = randint(1, 4) # Any one location may create up to 3 randomly allocated notes along x axis... # r = randint(1, 3) # x = x - round(cell_width * 0.25) # x = x + round(cell_width * 0.125 * r) # Generate a random integer to randomly add harmony/chord in range 1 to 2 inclusive... # randrange(start, stop, step) - randrange(2,5 2) returns 2 or 4 - (2,11,2) returns 2,4 6,8 or 10... # r = randrange(1, 4, 2) # r = randint(1, 2) # y = str(y - (50 * r)) # ---------------------------------------- # Place a single note into cell using x y co-ordinates... # ---------------------------------------- x = str(x) y = str(y) if DEBUG: print(" Insert new synth note at x:{0} y:{1}".format(x, y)) # cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', '--sync', 'windowmove', '0', '0'] #cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', '--sync', 'mousemove', x, y, 'click', '1'] # cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', '--sync'] #subprocess.Popen(cmd, close_fds=True) # ...OR... # Using PyUserinput (pymouse), move mouse to co-ords and click... m.click(int(x), int(y)) # ---------------------------------------- # ----------------------------------------------- # Optionally alert users (audibly), that a new note has been added... # ----------------------------------------------- # cmd = ['mpg123', '-q', '/opt/bbps/static/snd/mp3/alert-01-1s.mp3'] # subprocess.Popen(cmd, close_fds=True) # # OR... call a designated callback function.... # bbps_callback_1(11) # ---------------------------------------- # Google Chrome/Firefox browser window setup... # ---------------------------------------- # Set up GUI and transfer the row + col (X + Y) switch co-ordinates by locating # and clicking mouse cursor & button on desktop screen. # - http://tuxradar.com/content/xdotool-script-your-mouse # Screen co-ordinates start in top-left corner, where the X and Y (horizontal # and vertical) co-ordinates are 0 and 0. For a screen resolution 1024x768, # the co-ordinates for top right location are 1023 (X) and 0 (Y) and the # bottom-right is 1023 (X) and 767 (Y), and so on... # # To move the mouse pointer to the main menu button at top left corner # of screen on local desktop and then click to open it. # NOTE: 1 is the left, 2 is the middle and 3 is the right click mouse # button. Use 4 as a virtual mouse wheel up movement, and 5 for down. # # To display mouse position on screen: # while true; do clear; xdotool getmouselocation; sleep 0.1; done # xdotool mousemove 0 0 click 1 # # NOTE: Alternatively, Chrome setup could be done manually or by shell script. # ---------------------------------------- # ---------------------------------------- # Set sane defaults for cursor co-ordinates (x_offset, y_offset are readable globals)... # ---------------------------------------- def bbps_cursor_init(): if x_offset < 10 or y_offset < 10: x = str(280) y = str(280) else: x = str(x_offset + 50) y = str(y_offset + 40) # ---------------------------------------- if DEBUG: print("\nInitialise Chrome GUI focus & place mouse cursor at: x:{0} y:{1}".format(x, y)) return x, y # ---------------------------------------- # ---------------------------------------- # Optionally activate full-screen startup in kiosk mode... # See Google Chrome (un)documented switches.... # http://peter.sh/experiments/chromium-command-line-switches/ # ---------------------------------------- def bbps_kiosk_init(): # ---------------------------------------- # Set sane defaults for cursor co-ordinates (x_offset, y_offset are readable globals)... # if DEBUG: print("Initialise Chrome...") # ---------------------------------------- # To call chrome from shell as non-root user... # cmd = ['/bin/xhost', 'local:blocker;', 'sudo', '-u', 'blocker', '--', '/opt/google/chrome/google-chrome', '--disable-gpu', '-user-data-dir'] # # cmd = ['/opt/google/chrome/google-chrome', '--test-type', '--disable-setuid-sandbox', '--user-data-dir', '%U'] cmd = ['/opt/google/chrome/google-chrome', '--kiosk', '--incognito', '--test-type', '--no-sandbox', '--user-data-dir', '%U'] # subprocess.Popen(cmd, close_fds=True) # subprocess.call(cmd) # Allow time for Chrome to start before proceeding... time.sleep(2.0) # Activate 'play' buttons (spacebar)... bbps_chrome_shortcuts("BLOCKER!", "space") # ---------------------------------------- # ---------------------------------------- # Initialise web browser window in debug/manual full-screen mode... # ---------------------------------------- def bbps_webpage_init(): # ---------------------------------------- # 1. Activate & move 'BLOCKER!' browser window to top left corner of screen... # ---------------------------------------- if DEBUG: print("Find & activate 'BLOCKER!' & move window to top right corner of screen") # Allow time for chrome to start... time.sleep(2.0) # xdotool search "Google Chrome" windowactivate --sync windowmove 0 0 cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', '--sync', 'windowmove', '0', '0'] subprocess.call(cmd) # ---------------------------------------- # 2. Open window Full Screen or to designated size... # ---------------------------------------- if DEBUG: print("Find 'BLOCKER!' in window title & open window to designated size") # xdotool search "Google Chrome" windowsize 640 480 # Select first BLOCKER! window (in case multiple open) & make Full Screen & mouse to top left... # xdotool search 'BLOCKER!' windowactivate --sync key F11 mousemove --window %1 2 88 # xdotool search 'BLOCKER!' windowactivate --sync key F11 mousemove 2 88 # # The F11 key toggles full screen (Move or similar extension may override)... # bbps_chrome_shortcuts(title="BLOCKER!", shortcut="F11"): # To initialise and run full-screen: # First, initialise a window size less that full screen, then expand to full screen & finally, refresh page... # xdotool search --name 'BLOCKER!' windowactivate windowsize --sync 50% 54% key F11 key F5 # # To initialise browser with reduced screen size, do cmd call [with|without] blocking... cmd = ['xdotool', 'search', '--name', 'BLOCKER!', 'windowactivate', 'windowsize', '--sync', '50%', '54%', 'key', 'F11', 'F5', 'mousemove', x, y] # subprocess.Popen(cmd, close_fds=True) subprocess.call(cmd) # ---------------------------------------- # 3. Activate sound sequencer & syth 'Play'='on' (after pause).... # ---------------------------------------- time.sleep(1.0) bbps_chrome_shortcuts("BLOCKER!", "space") # ---------------------------------------- # START RUN MAIN CODE... # ---------------------------------------- # ---------------------------------------- # Initialise Raspberry Pi GPIO's by passing gpio_type. # Set each column & row GPIO type to INPUT & make them all LOW 'INPUT PUD_DOWN'... # ---------------------------------------- bbps_set_gpios(rows) bbps_set_gpios(pads) bbps_set_gpios(columns) if DEBUG: print("") time.sleep(1) # ---------------------------------------- # Create reference(s) to a callback function(s)... # Syntax: pi.callback(gpio, edge, func) # ---------------------------------------- for gpio in pads: # Get the gpio & callback function number from pads dictionary... # cb_state = pads[gpio]['state'] cb_num = pads[gpio]['note'] cb_num = int(cb_num) # ---------------------------------------- # Create a callback event when specified GPIO edge is detected... # DEBUG not required (automatically printed to std out if using pigpiod)... # ---------------------------------------- if DEBUG: print("Config: cb{0}\tGPIO{1}\tbbps_callback_{0}".format(cb_num, gpio)) if cb_num == 1 : cb1 = pi.callback(gpio, pigpio.FALLING_EDGE, bbps_callback_1) elif cb_num == 2 : cb2 = pi.callback(gpio, pigpio.FALLING_EDGE, bbps_callback_2) elif cb_num == 3 : cb3 = pi.callback(gpio, pigpio.FALLING_EDGE, bbps_callback_3) elif cb_num == 4 : cb4 = pi.callback(gpio, pigpio.FALLING_EDGE, bbps_callback_4) elif cb_num == 5 : cb5 = pi.callback(gpio, pigpio.FALLING_EDGE, bbps_callback_5) # elif cb_num == 12 : cb12 = pi.callback(gpio, pigpio.RISING_EDGE, bbps_callback_12) # elif cb_num == 13 : cb13 = pi.callback(gpio, pigpio.RISING_EDGE, bbps_callback_13) elif cb_num == 14 : cb14 = pi.callback(gpio, pigpio.RISING_EDGE, bbps_callback_14) elif cb_num == 15 : cb15 = pi.callback(gpio, pigpio.RISING_EDGE, bbps_callback_15) else: pass # ---------------------------------------- # Setup dictionaries x y (pixels) & row col (numbers)... # ---------------------------------------- bbps_lookup_init() # ---------------------------------------- # Allow time to read startup messages... # ---------------------------------------- if DEBUG: time.sleep(10) # ---------------------------------------- # Set sane cursor starting position (pixels) on screen... # ---------------------------------------- x, y = bbps_cursor_init() # ---------------------------------------- # Initialise browser screen display... # ---------------------------------------- # bbps_webpage_init() # bbps_kiosk_init() # ...OR... bbps_webpage_init() # ---------------------------------------- # Initialise [redraw|clear] screen behavior... # ---------------------------------------- # bbps_redraw() # ---------------------------------------- # Initialise simple CRC checker... # ---------------------------------------- crc_new = 0 # ---------------------------------------- # Create an empty 'set' to store all switches seen 'on' during a session... # ---------------------------------------- set_session_on = set() # ---------------------------------------- # START main loop... # ---------------------------------------- try: while True: # ----------------------------------------------- # In debug mode, monitor status & performance of this script... # ----------------------------------------------- # process_time() returns the sum of the system and user CPU time excluding 'sleep' time # https://stackoverflow.com/questions/7370801/measure-time-elapsed-in-python # ptime = time.process_time() # do some stuff # elapsed_time = time.process_time() - ptime # ----------------------------------------------- # Set 'break' if mouse is moved to bottom left corner of screen... # ----------------------------------------------- # xdotool behave_screen_edge bottom-left break # ----------------------------------------------- # Get current time, do report, then reset timer (ignores 'sleep' times).. # ----------------------------------------------- elapsed_time = time.process_time() - ptime if elapsed_time > max_elapsed_time: max_elapsed_time = elapsed_time ptime = time.process_time() print("Total process time: {0} Max: {1}".format(elapsed_time, max_elapsed_time)) if DEBUG: print("-----------------------------------------") now = time.time() print("\nLoop number {0} : Timeout {1}. Press Q (COL 4 + ROW 4) to quit".format(count, now - timeout)) # ----------------------------------------------- # Increment 'count' value for each completed loop (screen refreshed after max count)... # ----------------------------------------------- count+=1 # ----------------------------------------------- # Simple check to detect & draw NEW note(s) on screen grid for each active SWITCH... # To detect change, store current 'crc_new' value for comparison with a later 'crc_new' value... # ----------------------------------------------- crc_old = crc_new # ----------------------------------------------- # If timeout or max_notes exceeded, then first, refresh screen... # ----------------------------------------------- if note_count > max_notes or now > timeout: tdiff = int(now - timeout) if DEBUG: print("REFRESH: note count {0} or timeout (+{1}sec) reached".format(str(note_count), str(tdiff))) # ----------------------------------------------- # Refresh screen & activate synth 'Play' button.... # ----------------------------------------------- bbps_redraw() # bbps_chrome_shortcuts("BLOCKER!", "F5") # bbps_chrome_shortcuts("BLOCKER!", "Delete") # bbps_chrome_shortcuts("BLOCKER!", "Ctrl") time.sleep(1.0) # ----------------------------------------------- # Remove/re-initialise all reference to notes seen in last session... # ----------------------------------------------- note_count = 1 crc_old = 0 crc_new = 1 new_on.clear() switched_on.clear() set_session_on.clear() now = time.time() timeout = now + refresh_timeout # Pause to give screen display time to refresh properly... time.sleep(2.0) bbps_chrome_shortcuts("BLOCKER!", "space") # ----------------------------------------------- else: # ----------------------------------------------- # GET SWITCH STATUS FROM MULTIPLEX... # ----------------------------------------------- # Loop until quit flag (1|0) to get crc + col_num + row_num + x + y for each switch found 'on'. # The returned 'switched_on[]' & 'new_on[]' lists return (col_num, row_num, x, y) values... # ----------------------------------------------- quit, crc_new, switched_on, new_on = bbps_multiplex() # ----------------------------------------------- # Break out of loop if user pressed key (or key combo) to request 'quit'... # ----------------------------------------------- if quit : if DEBUG: print("Something/someone pressed Q or quit ({0}".format(quit)) # Call function to press keys and tidy up, then break out of loop bbps_quit() break # ----------------------------------------------- # ----------------------------------------------- # Check returned 'crc_new' with 'crc_old' value to inform if any change during last loop... # ----------------------------------------------- if int(crc_old) == int(crc_new): if DEBUG: print(" CRC (sum of row + col values) NOT changed OLD:{0} NEW:{1}".format(str(crc_old), str(crc_new))) # Nothing changed - so, just pause for breath & then continue... time.sleep(0.1) pass else: # ----------------------------------------------- # Note(s) have changed - For each switch that is NEW 'on', draw a note on the screen... # ---------------------------------------- if DEBUG: print("!!! CRC has changed OLD:{0} NEW:{1}".format(str(crc_old), str(crc_new))) if DEBUG: # Print switches_on data to screen... for col_val, row_val, x_val, y_val in switched_on: print("* ALL on GPIOs: [COL_NUM:{0} ROW_NUM:{1}] = 'ON'".format(str(col_val), str(row_val))) # Print new_on data to screen... for col_val, row_val, x_val, y_val in new_on: if DEBUG: print("* NEW 'on' GPIOs: [COL_NUM:{0} ROW_NUM:{1}] = 'ON'".format(str(col_val), str(row_val))) # Get the pre-set x y pixel values from the col_lookup & row_lookup dictionaries... # x = col_lookup[col_val] # y = row_lookup[row_val] x = x_val y = y_val # NEW note detected, so add note to the screen display... bbps_add_note(x, y) note_count += 1 time.sleep(0.01) # ---------------------------------------- # ---------------------------------------- # Status check for all switches returned 'on' from last loop... # ---------------------------------------- # If no notes currently 'on', clear session history 'set' before status check... if len(switched_on) == 0: set_session_on.clear() # Print a quick status report to screen... if DEBUG: bbps_status(switched_on, new_on) print("-----------------------------------------") # Continue looping until CTL+c pressed or Quit pressed... pass # ---------------------------------------- # ---------------------------------------- # Error checking and cleanup... # ---------------------------------------- except KeyboardInterrupt: pi.stop() # ---------------------------------------- else: # LOG.info("Switch control script ended") # camera.stop_recording() # Only run cleanup at end of program... pi.stop() # ---------------------------------------- # --------------------------------------- # Run forever, until CTL+C... #---------------------------------------- print("\nBye...\n") # -------------------------------------------------------- # Requires import os.. # os._exit(1)