summaryrefslogtreecommitdiffstats
path: root/lab_control/test
diff options
context:
space:
mode:
authorEddy Pedroni <eddy@0xf7.com>2022-05-29 20:15:15 +0200
committerEddy Pedroni <eddy@0xf7.com>2022-05-29 20:15:15 +0200
commit2a7dfc2eeb182f701b8f4751e6b80a50ddafd460 (patch)
treefa8bc103cb5acfa810f7c1af898e10c1fe0df821 /lab_control/test
parenta0783e9f9698b3370c96eaab574d433e25372bc7 (diff)
JDS6600 on/off test passing
Diffstat (limited to 'lab_control/test')
-rw-r--r--lab_control/test/jds6600_test.py12
-rw-r--r--lab_control/test/mock_jds6600_device.py43
2 files changed, 28 insertions, 27 deletions
diff --git a/lab_control/test/jds6600_test.py b/lab_control/test/jds6600_test.py
index 45f841a..16deeb4 100644
--- a/lab_control/test/jds6600_test.py
+++ b/lab_control/test/jds6600_test.py
@@ -15,9 +15,9 @@ def mockDevice():
def uut(mockDevice):
uut = JDS6600(mockDevice.getPortName())
yield uut
- print("Cleaning up UUT")
+ uut.closePort()
-def tes_serialConfiguration(mockDevice):
+def test_serialConfiguration(mockDevice):
with pytest.raises(AssertionError):
mockDevice.checkPortConfiguration()
@@ -27,7 +27,7 @@ def tes_serialConfiguration(mockDevice):
def test_channelOnAndOff(uut, mockDevice):
for ch in AVAILABLE_CHANNELS:
assert not mockDevice.isOn(ch)
- #uut.setOn(ch)
- #assert mockDevice.isOn(ch)
- #uut.setOff(ch)
- #assert not mockDevice.isOn(ch)
+ uut.setOn(ch)
+ assert mockDevice.isOn(ch)
+ uut.setOff(ch)
+ assert not mockDevice.isOn(ch)
diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py
index 22c48be..6e036d5 100644
--- a/lab_control/test/mock_jds6600_device.py
+++ b/lab_control/test/mock_jds6600_device.py
@@ -1,17 +1,8 @@
-import serial
import os
import pty
-import tty
import termios
-import atexit
import threading
-
-# Open virtual serial port
-
-# Close it when the program exits
-def _cleanUp() -> None:
- pass
-atexit.register(_cleanUp)
+import re
class MockJDS6600Device():
def __init__(self):
@@ -27,30 +18,40 @@ class MockJDS6600Device():
def _mainLoop(self) -> None:
while True:
try:
- print("Device reading request....")
request = self._masterFile.readline().decode().strip()
response = self._handleRequest(request)
- print("Device sending response")
- self._masterFile.write(response.encode())
+
+ if response is not None:
+ self._masterFile.write(response.encode())
except OSError as e:
- print("Exception caught, breaking out of loop")
break
- print("Main loop returning")
def _handleRequest(self, request: str) -> str:
- print(f"Device received request: {request}")
+ pattern = r":(?P<opcode>[wrab])(?P<function>\d+)=(?P<args>.*)\."
+ m = re.search(pattern, request)
+
+ if not m:
+ return None
+
+ opcode = m.group("opcode")
+ function = int(m.group("function"))
+ args = m.group("args").split(",")
+
+ if opcode == "w":
+ if function == 20:
+ self._channels[0] = args[0] == "1"
+ self._channels[1] = args[1] == "1"
+ elif opcode == "r":
+ if function == 20:
+ return f":r20={int(self._channels[0])},{int(self._channels[1])}.\r\n"
+
return ":ok\r\n"
def stop(self) -> None:
- print("STOP")
self._masterFile.close()
-
os.close(self._master)
- print("master closed")
os.close(self._slave)
- print("slave closed")
self._mainThread.join()
- print("stop returning")
def checkPortConfiguration(self) -> None:
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave)