import os import pty import termios import threading import re class MockJDS6600Device(): def __init__(self): self._master, self._slave = pty.openpty() self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) self._portName = os.ttyname(self._slave) self._channels = [False, False] self._mainThread = threading.Thread(target=self._mainLoop) self._mainThread.start() def _mainLoop(self) -> None: while True: try: request = self._masterFile.readline().decode().strip() response = self._handleRequest(request) if response is not None: self._masterFile.write(response.encode()) except OSError as e: break def _handleRequest(self, request: str) -> str: pattern = r":(?P[wrab])(?P\d+)=(?P.*)\." m = re.search(pattern, request) if not m: return None opcode = m.group("opcode") function = int(m.group("function")) args = m.group("args").split(",") if opcode == "w": if function == 20: self._channels[0] = args[0] == "1" self._channels[1] = args[1] == "1" elif opcode == "r": if function == 20: return f":r20={int(self._channels[0])},{int(self._channels[1])}.\r\n" return ":ok\r\n" def stop(self) -> None: self._masterFile.close() os.close(self._master) os.close(self._slave) self._mainThread.join() def checkPortConfiguration(self) -> None: iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave) # JDS6600 configuration taken from manual assert ispeed == termios.B115200 assert ospeed == termios.B115200 assert (cflag & termios.CSIZE) == termios.CS8 assert (cflag & termios.CSTOPB) == 0 assert (cflag & (termios.PARENB | termios.PARODD)) == 0 def getPortName(self) -> str: return self._portName def isOn(self, ch: int) -> bool: return self._channels[ch - 1]