diff options
-rw-r--r-- | lab_control/jds6600.py | 21 | ||||
-rw-r--r-- | lab_control/test/jds6600_test.py | 12 | ||||
-rw-r--r-- | lab_control/test/mock_jds6600_device.py | 43 |
3 files changed, 43 insertions, 33 deletions
diff --git a/lab_control/jds6600.py b/lab_control/jds6600.py index eda8306..7e7640d 100644 --- a/lab_control/jds6600.py +++ b/lab_control/jds6600.py @@ -1,4 +1,5 @@ import serial +import re from lab_control.function_generator import FunctionGenerator @@ -12,17 +13,25 @@ class JDS6600(FunctionGenerator): def _sendRequest(self, opcode: str, args: str) -> str: request = f":{opcode}={args}.\r\n" - print(f"Script writing request to port: {request}") self._port.write(request.encode()) - print(f"Script waiting for response") responseRaw = self._port.readline() return responseRaw.decode().strip() + def closePort(self) -> None: + self._port.close() + + def _queryOnOff(self) -> list[int, int]: + response = self._sendRequest("r20", "") + return [response[5], response[7]] + def setOn(self, channel: int) -> None: - args = ",".join(["1" if i == channel else "0" for i in [1, 2]]) - response = self._sendRequest("w20", args) - print(f"Script got response {response}") + state = self._queryOnOff() + state[channel - 1] = '1' + response = self._sendRequest("w20", f"{state[0]},{state[1]}") # TODO figure out error handling def setOff(self, channel: int) -> None: - pass + state = self._queryOnOff() + state[channel - 1] = '0' + response = self._sendRequest("w20", f"{state[0]},{state[1]}") + # TODO figure out error handling diff --git a/lab_control/test/jds6600_test.py b/lab_control/test/jds6600_test.py index 45f841a..16deeb4 100644 --- a/lab_control/test/jds6600_test.py +++ b/lab_control/test/jds6600_test.py @@ -15,9 +15,9 @@ def mockDevice(): def uut(mockDevice): uut = JDS6600(mockDevice.getPortName()) yield uut - print("Cleaning up UUT") + uut.closePort() -def tes_serialConfiguration(mockDevice): +def test_serialConfiguration(mockDevice): with pytest.raises(AssertionError): mockDevice.checkPortConfiguration() @@ -27,7 +27,7 @@ def tes_serialConfiguration(mockDevice): def test_channelOnAndOff(uut, mockDevice): for ch in AVAILABLE_CHANNELS: assert not mockDevice.isOn(ch) - #uut.setOn(ch) - #assert mockDevice.isOn(ch) - #uut.setOff(ch) - #assert not mockDevice.isOn(ch) + uut.setOn(ch) + assert mockDevice.isOn(ch) + uut.setOff(ch) + assert not mockDevice.isOn(ch) diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py index 22c48be..6e036d5 100644 --- a/lab_control/test/mock_jds6600_device.py +++ b/lab_control/test/mock_jds6600_device.py @@ -1,17 +1,8 @@ -import serial import os import pty -import tty import termios -import atexit import threading - -# Open virtual serial port - -# Close it when the program exits -def _cleanUp() -> None: - pass -atexit.register(_cleanUp) +import re class MockJDS6600Device(): def __init__(self): @@ -27,30 +18,40 @@ class MockJDS6600Device(): def _mainLoop(self) -> None: while True: try: - print("Device reading request....") request = self._masterFile.readline().decode().strip() response = self._handleRequest(request) - print("Device sending response") - self._masterFile.write(response.encode()) + + if response is not None: + self._masterFile.write(response.encode()) except OSError as e: - print("Exception caught, breaking out of loop") break - print("Main loop returning") def _handleRequest(self, request: str) -> str: - print(f"Device received request: {request}") + pattern = r":(?P<opcode>[wrab])(?P<function>\d+)=(?P<args>.*)\." + m = re.search(pattern, request) + + if not m: + return None + + opcode = m.group("opcode") + function = int(m.group("function")) + args = m.group("args").split(",") + + if opcode == "w": + if function == 20: + self._channels[0] = args[0] == "1" + self._channels[1] = args[1] == "1" + elif opcode == "r": + if function == 20: + return f":r20={int(self._channels[0])},{int(self._channels[1])}.\r\n" + return ":ok\r\n" def stop(self) -> None: - print("STOP") self._masterFile.close() - os.close(self._master) - print("master closed") os.close(self._slave) - print("slave closed") self._mainThread.join() - print("stop returning") def checkPortConfiguration(self) -> None: iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave) |