diff options
Diffstat (limited to 'lab_control/test/mock_jds6600_device.py')
-rw-r--r-- | lab_control/test/mock_jds6600_device.py | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py new file mode 100644 index 0000000..22c48be --- /dev/null +++ b/lab_control/test/mock_jds6600_device.py @@ -0,0 +1,70 @@ +import serial +import os +import pty +import tty +import termios +import atexit +import threading + +# Open virtual serial port + +# Close it when the program exits +def _cleanUp() -> None: + pass +atexit.register(_cleanUp) + +class MockJDS6600Device(): + def __init__(self): + self._master, self._slave = pty.openpty() + self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) + + self._portName = os.ttyname(self._slave) + self._channels = [False, False] + + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + def _mainLoop(self) -> None: + while True: + try: + print("Device reading request....") + request = self._masterFile.readline().decode().strip() + response = self._handleRequest(request) + print("Device sending response") + self._masterFile.write(response.encode()) + except OSError as e: + print("Exception caught, breaking out of loop") + break + print("Main loop returning") + + def _handleRequest(self, request: str) -> str: + print(f"Device received request: {request}") + return ":ok\r\n" + + def stop(self) -> None: + print("STOP") + self._masterFile.close() + + os.close(self._master) + print("master closed") + os.close(self._slave) + print("slave closed") + self._mainThread.join() + print("stop returning") + + def checkPortConfiguration(self) -> None: + iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave) + + # JDS6600 configuration taken from manual + assert ispeed == termios.B115200 + assert ospeed == termios.B115200 + assert (cflag & termios.CSIZE) == termios.CS8 + assert (cflag & termios.CSTOPB) == 0 + assert (cflag & (termios.PARENB | termios.PARODD)) == 0 + + def getPortName(self) -> str: + return self._portName + + def isOn(self, ch: int) -> bool: + return self._channels[ch - 1] + |