From a0783e9f9698b3370c96eaab574d433e25372bc7 Mon Sep 17 00:00:00 2001 From: Eddy Pedroni Date: Sun, 29 May 2022 19:20:09 +0200 Subject: Added JDS6600 initial implementation, test device doesn't work --- lab_control/jds6600.py | 28 +++++++++++ lab_control/test/jds6600_test.py | 33 +++++++++++++ lab_control/test/mock_jds6600_device.py | 70 +++++++++++++++++++++++++++ lab_control/test/mock_sds1000xe_device.py | 79 +++++++++++++++++++++++++++++++ lab_control/test/mock_sds1000xe_server.py | 79 ------------------------------- lab_control/test/sds1000xe_test.py | 34 ++++++------- 6 files changed, 227 insertions(+), 96 deletions(-) create mode 100644 lab_control/jds6600.py create mode 100644 lab_control/test/jds6600_test.py create mode 100644 lab_control/test/mock_jds6600_device.py create mode 100644 lab_control/test/mock_sds1000xe_device.py delete mode 100644 lab_control/test/mock_sds1000xe_server.py (limited to 'lab_control') diff --git a/lab_control/jds6600.py b/lab_control/jds6600.py new file mode 100644 index 0000000..eda8306 --- /dev/null +++ b/lab_control/jds6600.py @@ -0,0 +1,28 @@ +import serial + +from lab_control.function_generator import FunctionGenerator + +class JDS6600(FunctionGenerator): + def __init__(self, portName): + self._port = serial.Serial(portName) + self._port.baudrate = 115200 + self._port.bytesize = serial.EIGHTBITS + self._port.stopbits = serial.STOPBITS_ONE + self._port.parity = serial.PARITY_NONE + + def _sendRequest(self, opcode: str, args: str) -> str: + request = f":{opcode}={args}.\r\n" + print(f"Script writing request to port: {request}") + self._port.write(request.encode()) + print(f"Script waiting for response") + responseRaw = self._port.readline() + return responseRaw.decode().strip() + + def setOn(self, channel: int) -> None: + args = ",".join(["1" if i == channel else "0" for i in [1, 2]]) + response = self._sendRequest("w20", args) + print(f"Script got response {response}") + # TODO figure out error handling + + def setOff(self, channel: int) -> None: + pass diff --git a/lab_control/test/jds6600_test.py b/lab_control/test/jds6600_test.py new file mode 100644 index 0000000..45f841a --- /dev/null +++ b/lab_control/test/jds6600_test.py @@ -0,0 +1,33 @@ +import pytest + +from lab_control.jds6600 import JDS6600 +from lab_control.test.mock_jds6600_device import MockJDS6600Device + +AVAILABLE_CHANNELS = [1, 2] + +@pytest.fixture +def mockDevice(): + d = MockJDS6600Device() + yield d + d.stop() + +@pytest.fixture +def uut(mockDevice): + uut = JDS6600(mockDevice.getPortName()) + yield uut + print("Cleaning up UUT") + +def tes_serialConfiguration(mockDevice): + with pytest.raises(AssertionError): + mockDevice.checkPortConfiguration() + + uut = JDS6600(mockDevice.getPortName()) + mockDevice.checkPortConfiguration() + +def test_channelOnAndOff(uut, mockDevice): + for ch in AVAILABLE_CHANNELS: + assert not mockDevice.isOn(ch) + #uut.setOn(ch) + #assert mockDevice.isOn(ch) + #uut.setOff(ch) + #assert not mockDevice.isOn(ch) diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py new file mode 100644 index 0000000..22c48be --- /dev/null +++ b/lab_control/test/mock_jds6600_device.py @@ -0,0 +1,70 @@ +import serial +import os +import pty +import tty +import termios +import atexit +import threading + +# Open virtual serial port + +# Close it when the program exits +def _cleanUp() -> None: + pass +atexit.register(_cleanUp) + +class MockJDS6600Device(): + def __init__(self): + self._master, self._slave = pty.openpty() + self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) + + self._portName = os.ttyname(self._slave) + self._channels = [False, False] + + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + def _mainLoop(self) -> None: + while True: + try: + print("Device reading request....") + request = self._masterFile.readline().decode().strip() + response = self._handleRequest(request) + print("Device sending response") + self._masterFile.write(response.encode()) + except OSError as e: + print("Exception caught, breaking out of loop") + break + print("Main loop returning") + + def _handleRequest(self, request: str) -> str: + print(f"Device received request: {request}") + return ":ok\r\n" + + def stop(self) -> None: + print("STOP") + self._masterFile.close() + + os.close(self._master) + print("master closed") + os.close(self._slave) + print("slave closed") + self._mainThread.join() + print("stop returning") + + def checkPortConfiguration(self) -> None: + iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave) + + # JDS6600 configuration taken from manual + assert ispeed == termios.B115200 + assert ospeed == termios.B115200 + assert (cflag & termios.CSIZE) == termios.CS8 + assert (cflag & termios.CSTOPB) == 0 + assert (cflag & (termios.PARENB | termios.PARODD)) == 0 + + def getPortName(self) -> str: + return self._portName + + def isOn(self, ch: int) -> bool: + return self._channels[ch - 1] + diff --git a/lab_control/test/mock_sds1000xe_device.py b/lab_control/test/mock_sds1000xe_device.py new file mode 100644 index 0000000..ce752e9 --- /dev/null +++ b/lab_control/test/mock_sds1000xe_device.py @@ -0,0 +1,79 @@ +import socket +import threading +import atexit +import re + +IP = "0.0.0.0" +PORT = 5025 + +# Bind server socket when this module is included +_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +_serverSocket.bind((IP, PORT)) +_serverSocket.listen(1) + +# Close it when the program exits +def _cleanUp(): + _serverSocket.close() +atexit.register(_cleanUp) + +class MockSDS1000XEDevice: + def __init__(self): + self._stopFlag = False + self._clientSocket = None + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + # Mock measured values + self._channels = [{"AMPL" : None} for i in range(0, 4)] + + def _mainLoop(self) -> None: + self._clientSocket, _ = _serverSocket.accept() + self._clientSocket.settimeout(0.1) + + try: + while not self._stopFlag: + try: + request = self._clientSocket.recv(4096).decode() + response = self._handleRequest(request) + if response is not None: + self._clientSocket.send(response.encode()) + except TimeoutError as e: + pass + finally: + self._clientSocket.close() + + def _handleRequest(self, request: str) -> str: + m = re.search(r"C(?P\d):(?P\w+)\?\s(?P\w+)", request) + if not m: + return None + + channelIndex = int(m.group("channel")) - 1 + opcode = m.group("opcode") + + if opcode == "PAVA": + arg = m.group("arg") + value = self._channels[channelIndex].get(arg) + + if value is None: + return None + else: + response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V" + return response + + def stop(self) -> None: + self._stopFlag = True + self._mainThread.join() + + def setAmplitude(self, channel: int, value: float) -> None: + self._channels[channel - 1]["AMPL"] = value + + def setPeakToPeak(self, channel: int, value: float) -> None: + self._channels[channel - 1]["PKPK"] = value + + def setRMS(self, channel: int, value: float) -> None: + self._channels[channel - 1]["RMS"] = value + + def setFrequency(self, channel: int, value: float) -> None: + self._channels[channel - 1]["FREQ"] = value + diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py deleted file mode 100644 index 1b72146..0000000 --- a/lab_control/test/mock_sds1000xe_server.py +++ /dev/null @@ -1,79 +0,0 @@ -import socket -import threading -import atexit -import re - -IP = "0.0.0.0" -PORT = 5025 - -# Bind server socket when this module is included -_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -_serverSocket.bind((IP, PORT)) -_serverSocket.listen(1) - -# Close it when the program exits -def _cleanup(): - _serverSocket.close() -atexit.register(_cleanup) - -class MockSDS1000XEServer: - def __init__(self): - self._stopFlag = False - self._clientSocket = None - self._mainThread = threading.Thread(target=self._mainLoop) - self._mainThread.start() - - # Mock measured values - self._channels = [{"AMPL" : None} for i in range(0, 4)] - - def _mainLoop(self) -> None: - self._clientSocket, _ = _serverSocket.accept() - self._clientSocket.settimeout(0.1) - - try: - while not self._stopFlag: - try: - request = self._clientSocket.recv(4096).decode() - response = self._handleRequest(request) - if response is not None: - self._clientSocket.send(response.encode()) - except TimeoutError as e: - pass - finally: - self._clientSocket.close() - - def _handleRequest(self, request: str) -> str: - m = re.search(r"C(?P\d):(?P\w+)\?\s(?P\w+)", request) - if not m: - return None - - channelIndex = int(m.group("channel")) - 1 - opcode = m.group("opcode") - - if opcode == "PAVA": - arg = m.group("arg") - value = self._channels[channelIndex].get(arg) - - if value is None: - return None - else: - response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V" - return response - - def stop(self) -> None: - self._stopFlag = True - self._mainThread.join() - - def setAmplitude(self, channel: int, value: float) -> None: - self._channels[channel - 1]["AMPL"] = value - - def setPeakToPeak(self, channel: int, value: float) -> None: - self._channels[channel - 1]["PKPK"] = value - - def setRMS(self, channel: int, value: float) -> None: - self._channels[channel - 1]["RMS"] = value - - def setFrequency(self, channel: int, value: float) -> None: - self._channels[channel - 1]["FREQ"] = value - diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py index c7ec252..bf9c6c4 100644 --- a/lab_control/test/sds1000xe_test.py +++ b/lab_control/test/sds1000xe_test.py @@ -1,19 +1,19 @@ import pytest from lab_control.sds1000xe import SDS1000XE -from lab_control.test.mock_sds1000xe_server import MockSDS1000XEServer +from lab_control.test.mock_sds1000xe_device import MockSDS1000XEDevice -MOCK_IP = "127.0.0.1" +MOCK_DEVICE_IP = "127.0.0.1" @pytest.fixture -def mockServer(): - s = MockSDS1000XEServer() - yield s - s.stop() +def mockDevice(): + d = MockSDS1000XEDevice() + yield d + d.stop() @pytest.fixture -def uut(mockServer): - return SDS1000XE(MOCK_IP) +def uut(mockDevice): + return SDS1000XE(MOCK_DEVICE_IP) def checkFloatMeasurement(testCases, setValue, measureValue): for channel, expectedValue in testCases: @@ -21,23 +21,23 @@ def checkFloatMeasurement(testCases, setValue, measureValue): measuredValue = measureValue(channel) assert measuredValue == expectedValue -def test_amplitudeMeasurement(uut, mockServer): +def test_amplitudeMeasurement(uut, mockDevice): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] - checkFloatMeasurement(testCases, mockServer.setAmplitude, uut.measureAmplitude) + checkFloatMeasurement(testCases, mockDevice.setAmplitude, uut.measureAmplitude) -def test_peakToPeakMeasurement(uut, mockServer): +def test_peakToPeakMeasurement(uut, mockDevice): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] - checkFloatMeasurement(testCases, mockServer.setPeakToPeak, uut.measurePeakToPeak) + checkFloatMeasurement(testCases, mockDevice.setPeakToPeak, uut.measurePeakToPeak) -def test_RMSMeasurement(uut, mockServer): +def test_RMSMeasurement(uut, mockDevice): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] - checkFloatMeasurement(testCases, mockServer.setRMS, uut.measureRMS) + checkFloatMeasurement(testCases, mockDevice.setRMS, uut.measureRMS) -def test_frequencyMeasurement(uut, mockServer): +def test_frequencyMeasurement(uut, mockDevice): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 93489.15)] - checkFloatMeasurement(testCases, mockServer.setFrequency, uut.measureFrequency) + checkFloatMeasurement(testCases, mockDevice.setFrequency, uut.measureFrequency) -def test_invalidChannel(uut, mockServer): +def test_invalidChannel(uut, mockDevice): # Channel is checked by the UUT before the request is sent testCases = [0, 5] -- cgit v1.2.3