summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lab_control/jds6600.py21
-rw-r--r--lab_control/test/jds6600_test.py12
-rw-r--r--lab_control/test/mock_jds6600_device.py43
3 files changed, 43 insertions, 33 deletions
diff --git a/lab_control/jds6600.py b/lab_control/jds6600.py
index eda8306..7e7640d 100644
--- a/lab_control/jds6600.py
+++ b/lab_control/jds6600.py
@@ -1,4 +1,5 @@
import serial
+import re
from lab_control.function_generator import FunctionGenerator
@@ -12,17 +13,25 @@ class JDS6600(FunctionGenerator):
def _sendRequest(self, opcode: str, args: str) -> str:
request = f":{opcode}={args}.\r\n"
- print(f"Script writing request to port: {request}")
self._port.write(request.encode())
- print(f"Script waiting for response")
responseRaw = self._port.readline()
return responseRaw.decode().strip()
+ def closePort(self) -> None:
+ self._port.close()
+
+ def _queryOnOff(self) -> list[int, int]:
+ response = self._sendRequest("r20", "")
+ return [response[5], response[7]]
+
def setOn(self, channel: int) -> None:
- args = ",".join(["1" if i == channel else "0" for i in [1, 2]])
- response = self._sendRequest("w20", args)
- print(f"Script got response {response}")
+ state = self._queryOnOff()
+ state[channel - 1] = '1'
+ response = self._sendRequest("w20", f"{state[0]},{state[1]}")
# TODO figure out error handling
def setOff(self, channel: int) -> None:
- pass
+ state = self._queryOnOff()
+ state[channel - 1] = '0'
+ response = self._sendRequest("w20", f"{state[0]},{state[1]}")
+ # TODO figure out error handling
diff --git a/lab_control/test/jds6600_test.py b/lab_control/test/jds6600_test.py
index 45f841a..16deeb4 100644
--- a/lab_control/test/jds6600_test.py
+++ b/lab_control/test/jds6600_test.py
@@ -15,9 +15,9 @@ def mockDevice():
def uut(mockDevice):
uut = JDS6600(mockDevice.getPortName())
yield uut
- print("Cleaning up UUT")
+ uut.closePort()
-def tes_serialConfiguration(mockDevice):
+def test_serialConfiguration(mockDevice):
with pytest.raises(AssertionError):
mockDevice.checkPortConfiguration()
@@ -27,7 +27,7 @@ def tes_serialConfiguration(mockDevice):
def test_channelOnAndOff(uut, mockDevice):
for ch in AVAILABLE_CHANNELS:
assert not mockDevice.isOn(ch)
- #uut.setOn(ch)
- #assert mockDevice.isOn(ch)
- #uut.setOff(ch)
- #assert not mockDevice.isOn(ch)
+ uut.setOn(ch)
+ assert mockDevice.isOn(ch)
+ uut.setOff(ch)
+ assert not mockDevice.isOn(ch)
diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py
index 22c48be..6e036d5 100644
--- a/lab_control/test/mock_jds6600_device.py
+++ b/lab_control/test/mock_jds6600_device.py
@@ -1,17 +1,8 @@
-import serial
import os
import pty
-import tty
import termios
-import atexit
import threading
-
-# Open virtual serial port
-
-# Close it when the program exits
-def _cleanUp() -> None:
- pass
-atexit.register(_cleanUp)
+import re
class MockJDS6600Device():
def __init__(self):
@@ -27,30 +18,40 @@ class MockJDS6600Device():
def _mainLoop(self) -> None:
while True:
try:
- print("Device reading request....")
request = self._masterFile.readline().decode().strip()
response = self._handleRequest(request)
- print("Device sending response")
- self._masterFile.write(response.encode())
+
+ if response is not None:
+ self._masterFile.write(response.encode())
except OSError as e:
- print("Exception caught, breaking out of loop")
break
- print("Main loop returning")
def _handleRequest(self, request: str) -> str:
- print(f"Device received request: {request}")
+ pattern = r":(?P<opcode>[wrab])(?P<function>\d+)=(?P<args>.*)\."
+ m = re.search(pattern, request)
+
+ if not m:
+ return None
+
+ opcode = m.group("opcode")
+ function = int(m.group("function"))
+ args = m.group("args").split(",")
+
+ if opcode == "w":
+ if function == 20:
+ self._channels[0] = args[0] == "1"
+ self._channels[1] = args[1] == "1"
+ elif opcode == "r":
+ if function == 20:
+ return f":r20={int(self._channels[0])},{int(self._channels[1])}.\r\n"
+
return ":ok\r\n"
def stop(self) -> None:
- print("STOP")
self._masterFile.close()
-
os.close(self._master)
- print("master closed")
os.close(self._slave)
- print("slave closed")
self._mainThread.join()
- print("stop returning")
def checkPortConfiguration(self) -> None:
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave)