summaryrefslogtreecommitdiffstats
path: root/lab_control/test/mock_jds6600_device.py
diff options
context:
space:
mode:
Diffstat (limited to 'lab_control/test/mock_jds6600_device.py')
-rw-r--r--lab_control/test/mock_jds6600_device.py43
1 files changed, 22 insertions, 21 deletions
diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py
index 22c48be..6e036d5 100644
--- a/lab_control/test/mock_jds6600_device.py
+++ b/lab_control/test/mock_jds6600_device.py
@@ -1,17 +1,8 @@
-import serial
import os
import pty
-import tty
import termios
-import atexit
import threading
-
-# Open virtual serial port
-
-# Close it when the program exits
-def _cleanUp() -> None:
- pass
-atexit.register(_cleanUp)
+import re
class MockJDS6600Device():
def __init__(self):
@@ -27,30 +18,40 @@ class MockJDS6600Device():
def _mainLoop(self) -> None:
while True:
try:
- print("Device reading request....")
request = self._masterFile.readline().decode().strip()
response = self._handleRequest(request)
- print("Device sending response")
- self._masterFile.write(response.encode())
+
+ if response is not None:
+ self._masterFile.write(response.encode())
except OSError as e:
- print("Exception caught, breaking out of loop")
break
- print("Main loop returning")
def _handleRequest(self, request: str) -> str:
- print(f"Device received request: {request}")
+ pattern = r":(?P<opcode>[wrab])(?P<function>\d+)=(?P<args>.*)\."
+ m = re.search(pattern, request)
+
+ if not m:
+ return None
+
+ opcode = m.group("opcode")
+ function = int(m.group("function"))
+ args = m.group("args").split(",")
+
+ if opcode == "w":
+ if function == 20:
+ self._channels[0] = args[0] == "1"
+ self._channels[1] = args[1] == "1"
+ elif opcode == "r":
+ if function == 20:
+ return f":r20={int(self._channels[0])},{int(self._channels[1])}.\r\n"
+
return ":ok\r\n"
def stop(self) -> None:
- print("STOP")
self._masterFile.close()
-
os.close(self._master)
- print("master closed")
os.close(self._slave)
- print("slave closed")
self._mainThread.join()
- print("stop returning")
def checkPortConfiguration(self) -> None:
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave)