diff options
author | Eddy Pedroni <eddy@0xf7.com> | 2022-05-29 19:20:09 +0200 |
---|---|---|
committer | Eddy Pedroni <eddy@0xf7.com> | 2022-05-29 19:20:09 +0200 |
commit | a0783e9f9698b3370c96eaab574d433e25372bc7 (patch) | |
tree | 265540f0beb122edff0688293a968067e14c545c | |
parent | 8d4621a92e58b1466010f2ad3aeada5e04a453b2 (diff) |
Added JDS6600 initial implementation, test device doesn't work
-rw-r--r-- | lab_control/jds6600.py | 28 | ||||
-rw-r--r-- | lab_control/test/jds6600_test.py | 33 | ||||
-rw-r--r-- | lab_control/test/mock_jds6600_device.py | 70 | ||||
-rw-r--r-- | lab_control/test/mock_sds1000xe_device.py (renamed from lab_control/test/mock_sds1000xe_server.py) | 6 | ||||
-rw-r--r-- | lab_control/test/sds1000xe_test.py | 34 |
5 files changed, 151 insertions, 20 deletions
diff --git a/lab_control/jds6600.py b/lab_control/jds6600.py new file mode 100644 index 0000000..eda8306 --- /dev/null +++ b/lab_control/jds6600.py @@ -0,0 +1,28 @@ +import serial + +from lab_control.function_generator import FunctionGenerator + +class JDS6600(FunctionGenerator): + def __init__(self, portName): + self._port = serial.Serial(portName) + self._port.baudrate = 115200 + self._port.bytesize = serial.EIGHTBITS + self._port.stopbits = serial.STOPBITS_ONE + self._port.parity = serial.PARITY_NONE + + def _sendRequest(self, opcode: str, args: str) -> str: + request = f":{opcode}={args}.\r\n" + print(f"Script writing request to port: {request}") + self._port.write(request.encode()) + print(f"Script waiting for response") + responseRaw = self._port.readline() + return responseRaw.decode().strip() + + def setOn(self, channel: int) -> None: + args = ",".join(["1" if i == channel else "0" for i in [1, 2]]) + response = self._sendRequest("w20", args) + print(f"Script got response {response}") + # TODO figure out error handling + + def setOff(self, channel: int) -> None: + pass diff --git a/lab_control/test/jds6600_test.py b/lab_control/test/jds6600_test.py new file mode 100644 index 0000000..45f841a --- /dev/null +++ b/lab_control/test/jds6600_test.py @@ -0,0 +1,33 @@ +import pytest + +from lab_control.jds6600 import JDS6600 +from lab_control.test.mock_jds6600_device import MockJDS6600Device + +AVAILABLE_CHANNELS = [1, 2] + +@pytest.fixture +def mockDevice(): + d = MockJDS6600Device() + yield d + d.stop() + +@pytest.fixture +def uut(mockDevice): + uut = JDS6600(mockDevice.getPortName()) + yield uut + print("Cleaning up UUT") + +def tes_serialConfiguration(mockDevice): + with pytest.raises(AssertionError): + mockDevice.checkPortConfiguration() + + uut = JDS6600(mockDevice.getPortName()) + mockDevice.checkPortConfiguration() + +def test_channelOnAndOff(uut, mockDevice): + for ch in AVAILABLE_CHANNELS: + assert not mockDevice.isOn(ch) + #uut.setOn(ch) + #assert mockDevice.isOn(ch) + #uut.setOff(ch) + #assert not mockDevice.isOn(ch) diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py new file mode 100644 index 0000000..22c48be --- /dev/null +++ b/lab_control/test/mock_jds6600_device.py @@ -0,0 +1,70 @@ +import serial +import os +import pty +import tty +import termios +import atexit +import threading + +# Open virtual serial port + +# Close it when the program exits +def _cleanUp() -> None: + pass +atexit.register(_cleanUp) + +class MockJDS6600Device(): + def __init__(self): + self._master, self._slave = pty.openpty() + self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) + + self._portName = os.ttyname(self._slave) + self._channels = [False, False] + + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + def _mainLoop(self) -> None: + while True: + try: + print("Device reading request....") + request = self._masterFile.readline().decode().strip() + response = self._handleRequest(request) + print("Device sending response") + self._masterFile.write(response.encode()) + except OSError as e: + print("Exception caught, breaking out of loop") + break + print("Main loop returning") + + def _handleRequest(self, request: str) -> str: + print(f"Device received request: {request}") + return ":ok\r\n" + + def stop(self) -> None: + print("STOP") + self._masterFile.close() + + os.close(self._master) + print("master closed") + os.close(self._slave) + print("slave closed") + self._mainThread.join() + print("stop returning") + + def checkPortConfiguration(self) -> None: + iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave) + + # JDS6600 configuration taken from manual + assert ispeed == termios.B115200 + assert ospeed == termios.B115200 + assert (cflag & termios.CSIZE) == termios.CS8 + assert (cflag & termios.CSTOPB) == 0 + assert (cflag & (termios.PARENB | termios.PARODD)) == 0 + + def getPortName(self) -> str: + return self._portName + + def isOn(self, ch: int) -> bool: + return self._channels[ch - 1] + diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_device.py index 1b72146..ce752e9 100644 --- a/lab_control/test/mock_sds1000xe_server.py +++ b/lab_control/test/mock_sds1000xe_device.py @@ -13,11 +13,11 @@ _serverSocket.bind((IP, PORT)) _serverSocket.listen(1) # Close it when the program exits -def _cleanup(): +def _cleanUp(): _serverSocket.close() -atexit.register(_cleanup) +atexit.register(_cleanUp) -class MockSDS1000XEServer: +class MockSDS1000XEDevice: def __init__(self): self._stopFlag = False self._clientSocket = None diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py index c7ec252..bf9c6c4 100644 --- a/lab_control/test/sds1000xe_test.py +++ b/lab_control/test/sds1000xe_test.py @@ -1,19 +1,19 @@ import pytest from lab_control.sds1000xe import SDS1000XE -from lab_control.test.mock_sds1000xe_server import MockSDS1000XEServer +from lab_control.test.mock_sds1000xe_device import MockSDS1000XEDevice -MOCK_IP = "127.0.0.1" +MOCK_DEVICE_IP = "127.0.0.1" @pytest.fixture -def mockServer(): - s = MockSDS1000XEServer() - yield s - s.stop() +def mockDevice(): + d = MockSDS1000XEDevice() + yield d + d.stop() @pytest.fixture -def uut(mockServer): - return SDS1000XE(MOCK_IP) +def uut(mockDevice): + return SDS1000XE(MOCK_DEVICE_IP) def checkFloatMeasurement(testCases, setValue, measureValue): for channel, expectedValue in testCases: @@ -21,23 +21,23 @@ def checkFloatMeasurement(testCases, setValue, measureValue): measuredValue = measureValue(channel) assert measuredValue == expectedValue -def test_amplitudeMeasurement(uut, mockServer): +def test_amplitudeMeasurement(uut, mockDevice): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] - checkFloatMeasurement(testCases, mockServer.setAmplitude, uut.measureAmplitude) + checkFloatMeasurement(testCases, mockDevice.setAmplitude, uut.measureAmplitude) -def test_peakToPeakMeasurement(uut, mockServer): +def test_peakToPeakMeasurement(uut, mockDevice): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] - checkFloatMeasurement(testCases, mockServer.setPeakToPeak, uut.measurePeakToPeak) + checkFloatMeasurement(testCases, mockDevice.setPeakToPeak, uut.measurePeakToPeak) -def test_RMSMeasurement(uut, mockServer): +def test_RMSMeasurement(uut, mockDevice): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] - checkFloatMeasurement(testCases, mockServer.setRMS, uut.measureRMS) + checkFloatMeasurement(testCases, mockDevice.setRMS, uut.measureRMS) -def test_frequencyMeasurement(uut, mockServer): +def test_frequencyMeasurement(uut, mockDevice): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 93489.15)] - checkFloatMeasurement(testCases, mockServer.setFrequency, uut.measureFrequency) + checkFloatMeasurement(testCases, mockDevice.setFrequency, uut.measureFrequency) -def test_invalidChannel(uut, mockServer): +def test_invalidChannel(uut, mockDevice): # Channel is checked by the UUT before the request is sent testCases = [0, 5] |