add a stupid shell to interface with a scanner

This commit is contained in:
randomuser 2023-02-26 21:40:59 -06:00
parent 2fb96d41e8
commit 73e975dd59
1 changed files with 149 additions and 0 deletions

149
scanner/main.py Normal file
View File

@ -0,0 +1,149 @@
from queue import Queue
from threading import Thread
from threading import Event
from time import sleep
import sane
from PIL import Image
import sys
import os
import select
import cv2
import subprocess
def is_there_line():
# Check if there is any data available to be read from stdin
if select.select([sys.stdin], [], [], 0)[0]:
# Read the next line from stdin
return sys.stdin.readline().rstrip('\n')
else:
return None
class Message:
def __init__(self, message, payload):
self.message = message
self.payload = payload
def interface_thread(i, o):
queue = []
processing = []
scanned = []
scanners = []
print("<{}>$ ".format(os.getcwd()), end="")
sys.stdout.flush()
while True:
line = is_there_line()
if line:
# parse the line of input
splitted = line.split(' ')
if splitted[0] == "queue":
if splitted[1] == "add":
if splitted[2]:
if splitted[2][0] != "/":
enqueued = os.path.join(os.getcwd(), splitted[2])
else:
enqueued = splitted[2]
queue.append(enqueued)
print("queued file {}".format(enqueued))
else:
print("specify a file!")
elif splitted[0] == "send":
print("sending {} to scanner for scanning".format(queue[0]))
payload = {
'save_location': queue.pop(0),
}
message = Message("scan_request", payload)
o.put(message)
elif splitted[0] == "cd":
try:
os.chdir(splitted[1])
except IndexError:
os.chdir("/home/usr")
elif splitted[0] == "mkdir":
os.mkdir(splitted[1])
elif splitted[0] == "ls":
files = os.listdir()
print(files)
elif splitted[0] == "feh":
subprocess.Popen(["feh", splitted[1]])
elif splitted[0] == "scannerlist":
o.put(Message("list_scanners", None))
data = i.get()
if data.message == "list_scanners_resp":
scanners = data.payload["scanner_names"]
for x in scanners:
print(x)
i.task_done()
elif splitted[0] == "setscanner":
payload = {
"scanner_name": splitted[1],
}
message = Message("set_scanner", payload)
o.put(message)
elif splitted[0] == "sendpredef":
o.put(Message("sendpredef", None))
elif splitted[0] == "help":
print("""help: commands -
queue:
add: add an item to the queue
send: send the first item on the queue to be scanned
cd: change directories
mkdir: make a directory
ls: list files in directory
feh: open image viewer
scannerlist: list scanners connected
setscanner: set the scanner as the selected scanner
sendpredef: set the scanner to some predefined settings""")
print("<{}>$ ".format(os.getcwd()), end="")
sys.stdout.flush()
def scanner_backend(i, o):
sane.init()
scanner = None
while True:
# Get some data
data = i.get()
if data.message == "scan_request":
save_location = data.payload['save_location']
if scanner:
print("recieved scan_request with {}".format(save_location))
else:
print("configure a scanner first")
i.task_done()
continue
image = scanner.scan()
image.save(data.payload["save_location"])
i.task_done()
elif data.message == "list_scanners":
devs = sane.get_devices()
payload = {
'scanner_names': [x[0] for x in devs],
}
message = Message("list_scanners_resp", payload)
o.put(message)
i.task_done()
elif data.message == "set_scanner":
scanner = sane.open(data.payload["scanner_name"])
i.task_done()
elif data.message == "sendpredef":
# configure the scanner in these predefined ways
scanner.resolution = 200
scanner.br_x = 320
scanner.br_y = 320
scanner.calibration_cache = 1
i.task_done()
# Create the shared queue and launch both threads
def main():
interface_queue, scanner_queue = Queue(), Queue()
threads = [
Thread(target=interface_thread, args=(interface_queue, scanner_queue)),
Thread(target=scanner_backend, args=(scanner_queue, interface_queue)),
]
[ i.start() for i in threads ]
if __name__ == "__main__":
main()