From b17f87f6e9bc938448c5aeef6241a6d1639f17af Mon Sep 17 00:00:00 2001 From: Eddy Pedroni Date: Thu, 26 May 2022 16:41:02 +0200 Subject: Refactored, added peak-to-peak, RMS and frequency measurements --- lab_control/test/mock_sds1000xe_server.py | 39 ++++++++++++++++++------------- lab_control/test/sds1000xe_test.py | 23 +++++++++++++----- 2 files changed, 40 insertions(+), 22 deletions(-) (limited to 'lab_control/test') diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py index fc6128e..f5a2b1a 100644 --- a/lab_control/test/mock_sds1000xe_server.py +++ b/lab_control/test/mock_sds1000xe_server.py @@ -18,10 +18,6 @@ def _cleanup(): atexit.register(_cleanup) class MockSDS1000XEServer: - class ChannelValues: - def __init__(self): - self.amplitude = None - def __init__(self): self._stopFlag = False self._clientSocket = None @@ -29,7 +25,7 @@ class MockSDS1000XEServer: self._mainThread.start() # Mock measured values - self._channels = [MockSDS1000XEServer.ChannelValues() for i in range(0, 4)] + self._channels = [{"AMPL" : None} for i in range(0, 4)] def _mainLoop(self) -> None: self._clientSocket, _ = _serverSocket.accept() @@ -39,7 +35,6 @@ class MockSDS1000XEServer: while not self._stopFlag: try: request = self._clientSocket.recv(4096).decode() - response = self._handleRequest(request) if response is not None: self._clientSocket.send(response.encode()) @@ -54,19 +49,31 @@ class MockSDS1000XEServer: return None channelIndex = int(m.group("channel")) - 1 - if m.group("opcode") == "PAVA": - if m.group("arg") == "AMPL": - if self._channels[channelIndex].amplitude is None: - return None - else: - value = self._channels[channelIndex].amplitude - response = f"C{m.group('channel')}:PAVA AMPL,{value:.6E}V" - return response + opcode = m.group("opcode") + + if opcode == "PAVA": + arg = m.group("arg") + value = self._channels[channelIndex].get(arg) + + if value is None: + return None + else: + response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V" + return response def stop(self) -> None: self._stopFlag = True self._mainThread.join() - def setAmplitude(self, channel: int, ampl: float) -> None: - self._channels[channel - 1].amplitude = ampl + def setAmplitude(self, channel: int, value: float) -> None: + self._channels[channel - 1]["AMPL"] = value + + def setPeakToPeak(self, channel: int, value: float) -> None: + self._channels[channel - 1]["PKPK"] = value + + def setRMS(self, channel: int, value: float) -> None: + self._channels[channel - 1]["RMS"] = value + + def setFrequency(self, channel: int, value: float) -> None: + self._channels[channel - 1]["FREQ"] = value diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py index 88d5022..ea48c87 100644 --- a/lab_control/test/sds1000xe_test.py +++ b/lab_control/test/sds1000xe_test.py @@ -15,16 +15,27 @@ def mockServer(): def uut(mockServer): return SDS1000XE(MOCK_IP) +def checkFloatMeasurement(testCases, setValue, measureValue): + for channel, expectedValue in testCases: + setValue(channel, expectedValue) + measuredValue = measureValue(channel) + assert measuredValue == expectedValue + def test_amplitudeMeasurement(uut, mockServer): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + checkFloatMeasurement(testCases, mockServer.setAmplitude, uut.measureAmplitude) - for t in testCases: - channel = t[0] - expectedAmplitude = t[1] - mockServer.setAmplitude(channel, expectedAmplitude) +def test_peakToPeakMeasurement(uut, mockServer): + testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + checkFloatMeasurement(testCases, mockServer.setPeakToPeak, uut.measurePeakToPeak) - measuredAmplitude = uut.measureAmplitude(channel) - assert measuredAmplitude == expectedAmplitude +def test_RMSMeasurement(uut, mockServer): + testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + checkFloatMeasurement(testCases, mockServer.setRMS, uut.measureRMS) + +def test_FrequencyMeasurement(uut, mockServer): + testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + checkFloatMeasurement(testCases, mockServer.setFrequency, uut.measureFrequency) def test_invalidChannel(uut, mockServer): # Channel is checked by the UUT before the request is sent -- cgit v1.2.3