import serial import os import pty import tty import termios import atexit import threading # Open virtual serial port # Close it when the program exits def _cleanUp() -> None: pass atexit.register(_cleanUp) class MockJDS6600Device(): def __init__(self): self._master, self._slave = pty.openpty() self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) self._portName = os.ttyname(self._slave) self._channels = [False, False] self._mainThread = threading.Thread(target=self._mainLoop) self._mainThread.start() def _mainLoop(self) -> None: while True: try: print("Device reading request....") request = self._masterFile.readline().decode().strip() response = self._handleRequest(request) print("Device sending response") self._masterFile.write(response.encode()) except OSError as e: print("Exception caught, breaking out of loop") break print("Main loop returning") def _handleRequest(self, request: str) -> str: print(f"Device received request: {request}") return ":ok\r\n" def stop(self) -> None: print("STOP") self._masterFile.close() os.close(self._master) print("master closed") os.close(self._slave) print("slave closed") self._mainThread.join() print("stop returning") def checkPortConfiguration(self) -> None: iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave) # JDS6600 configuration taken from manual assert ispeed == termios.B115200 assert ospeed == termios.B115200 assert (cflag & termios.CSIZE) == termios.CS8 assert (cflag & termios.CSTOPB) == 0 assert (cflag & (termios.PARENB | termios.PARODD)) == 0 def getPortName(self) -> str: return self._portName def isOn(self, ch: int) -> bool: return self._channels[ch - 1]