From 73e975dd59f0d673ae4585ef1e17f0d8450818b2 Mon Sep 17 00:00:00 2001 From: randomuser Date: Sun, 26 Feb 2023 21:40:59 -0600 Subject: [PATCH] add a stupid shell to interface with a scanner --- scanner/main.py | 149 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 scanner/main.py diff --git a/scanner/main.py b/scanner/main.py new file mode 100644 index 0000000..cc2fca4 --- /dev/null +++ b/scanner/main.py @@ -0,0 +1,149 @@ +from queue import Queue +from threading import Thread +from threading import Event +from time import sleep +import sane +from PIL import Image +import sys +import os +import select +import cv2 +import subprocess + +def is_there_line(): + # Check if there is any data available to be read from stdin + if select.select([sys.stdin], [], [], 0)[0]: + # Read the next line from stdin + return sys.stdin.readline().rstrip('\n') + else: + return None + +class Message: + def __init__(self, message, payload): + self.message = message + self.payload = payload + +def interface_thread(i, o): + queue = [] + processing = [] + scanned = [] + scanners = [] + print("<{}>$ ".format(os.getcwd()), end="") + sys.stdout.flush() + while True: + line = is_there_line() + if line: + # parse the line of input + splitted = line.split(' ') + if splitted[0] == "queue": + if splitted[1] == "add": + if splitted[2]: + if splitted[2][0] != "/": + enqueued = os.path.join(os.getcwd(), splitted[2]) + else: + enqueued = splitted[2] + queue.append(enqueued) + print("queued file {}".format(enqueued)) + else: + print("specify a file!") + elif splitted[0] == "send": + print("sending {} to scanner for scanning".format(queue[0])) + payload = { + 'save_location': queue.pop(0), + } + message = Message("scan_request", payload) + o.put(message) + elif splitted[0] == "cd": + try: + os.chdir(splitted[1]) + except IndexError: + os.chdir("/home/usr") + elif splitted[0] == "mkdir": + os.mkdir(splitted[1]) + elif splitted[0] == "ls": + files = os.listdir() + print(files) + elif splitted[0] == "feh": + subprocess.Popen(["feh", splitted[1]]) + elif splitted[0] == "scannerlist": + o.put(Message("list_scanners", None)) + data = i.get() + if data.message == "list_scanners_resp": + scanners = data.payload["scanner_names"] + for x in scanners: + print(x) + i.task_done() + elif splitted[0] == "setscanner": + payload = { + "scanner_name": splitted[1], + } + message = Message("set_scanner", payload) + o.put(message) + elif splitted[0] == "sendpredef": + o.put(Message("sendpredef", None)) + elif splitted[0] == "help": + print("""help: commands - +queue: + add: add an item to the queue +send: send the first item on the queue to be scanned +cd: change directories +mkdir: make a directory +ls: list files in directory +feh: open image viewer +scannerlist: list scanners connected +setscanner: set the scanner as the selected scanner +sendpredef: set the scanner to some predefined settings""") + + print("<{}>$ ".format(os.getcwd()), end="") + sys.stdout.flush() + + +def scanner_backend(i, o): + sane.init() + scanner = None + while True: + # Get some data + data = i.get() + if data.message == "scan_request": + save_location = data.payload['save_location'] + if scanner: + print("recieved scan_request with {}".format(save_location)) + else: + print("configure a scanner first") + i.task_done() + continue + + image = scanner.scan() + image.save(data.payload["save_location"]) + + i.task_done() + elif data.message == "list_scanners": + devs = sane.get_devices() + payload = { + 'scanner_names': [x[0] for x in devs], + } + message = Message("list_scanners_resp", payload) + o.put(message) + i.task_done() + elif data.message == "set_scanner": + scanner = sane.open(data.payload["scanner_name"]) + i.task_done() + elif data.message == "sendpredef": + # configure the scanner in these predefined ways + scanner.resolution = 200 + scanner.br_x = 320 + scanner.br_y = 320 + scanner.calibration_cache = 1 + i.task_done() + +# Create the shared queue and launch both threads +def main(): + interface_queue, scanner_queue = Queue(), Queue() + threads = [ + Thread(target=interface_thread, args=(interface_queue, scanner_queue)), + Thread(target=scanner_backend, args=(scanner_queue, interface_queue)), + ] + [ i.start() for i in threads ] + +if __name__ == "__main__": + main()