summaryrefslogtreecommitdiffstats
path: root/lab_control/test/mock_jds6600_device.py
diff options
context:
space:
mode:
Diffstat (limited to 'lab_control/test/mock_jds6600_device.py')
-rw-r--r--lab_control/test/mock_jds6600_device.py70
1 files changed, 70 insertions, 0 deletions
diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py
new file mode 100644
index 0000000..22c48be
--- /dev/null
+++ b/lab_control/test/mock_jds6600_device.py
@@ -0,0 +1,70 @@
+import serial
+import os
+import pty
+import tty
+import termios
+import atexit
+import threading
+
+# Open virtual serial port
+
+# Close it when the program exits
+def _cleanUp() -> None:
+ pass
+atexit.register(_cleanUp)
+
+class MockJDS6600Device():
+ def __init__(self):
+ self._master, self._slave = pty.openpty()
+ self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0)
+
+ self._portName = os.ttyname(self._slave)
+ self._channels = [False, False]
+
+ self._mainThread = threading.Thread(target=self._mainLoop)
+ self._mainThread.start()
+
+ def _mainLoop(self) -> None:
+ while True:
+ try:
+ print("Device reading request....")
+ request = self._masterFile.readline().decode().strip()
+ response = self._handleRequest(request)
+ print("Device sending response")
+ self._masterFile.write(response.encode())
+ except OSError as e:
+ print("Exception caught, breaking out of loop")
+ break
+ print("Main loop returning")
+
+ def _handleRequest(self, request: str) -> str:
+ print(f"Device received request: {request}")
+ return ":ok\r\n"
+
+ def stop(self) -> None:
+ print("STOP")
+ self._masterFile.close()
+
+ os.close(self._master)
+ print("master closed")
+ os.close(self._slave)
+ print("slave closed")
+ self._mainThread.join()
+ print("stop returning")
+
+ def checkPortConfiguration(self) -> None:
+ iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave)
+
+ # JDS6600 configuration taken from manual
+ assert ispeed == termios.B115200
+ assert ospeed == termios.B115200
+ assert (cflag & termios.CSIZE) == termios.CS8
+ assert (cflag & termios.CSTOPB) == 0
+ assert (cflag & (termios.PARENB | termios.PARODD)) == 0
+
+ def getPortName(self) -> str:
+ return self._portName
+
+ def isOn(self, ch: int) -> bool:
+ return self._channels[ch - 1]
+