From b17f87f6e9bc938448c5aeef6241a6d1639f17af Mon Sep 17 00:00:00 2001 From: Eddy Pedroni Date: Thu, 26 May 2022 16:41:02 +0200 Subject: Refactored, added peak-to-peak, RMS and frequency measurements --- lab_control/oscilloscope.py | 2 +- lab_control/sds1000xe.py | 28 ++++++++++++---------- lab_control/test/mock_sds1000xe_server.py | 39 ++++++++++++++++++------------- lab_control/test/sds1000xe_test.py | 23 +++++++++++++----- 4 files changed, 57 insertions(+), 35 deletions(-) (limited to 'lab_control') diff --git a/lab_control/oscilloscope.py b/lab_control/oscilloscope.py index a09f8a0..9f2d02c 100644 --- a/lab_control/oscilloscope.py +++ b/lab_control/oscilloscope.py @@ -5,7 +5,7 @@ class Oscilloscope: def measureAmplitude(self, channel: int) -> float: pass - def measurePkToPk(self, channel: int) -> float: + def measurePeakToPeak(self, channel: int) -> float: pass def measureRMS(self, channel: int) -> float: diff --git a/lab_control/sds1000xe.py b/lab_control/sds1000xe.py index d86c1c7..c0654e0 100644 --- a/lab_control/sds1000xe.py +++ b/lab_control/sds1000xe.py @@ -14,26 +14,30 @@ class SDS1000XE(Oscilloscope): self._socket.settimeout(SDS1000XE.TIMEOUT) def measureAmplitude(self, channel: int) -> float: + return self._measure(channel, "AMPL") + + def measurePeakToPeak(self, channel: int) -> float: + return self._measure(channel, "PKPK") + + def measureRMS(self, channel: int) -> float: + return self._measure(channel, "RMS") + + def measureFrequency(self, channel: int) -> float: + return self._measure(channel, "FREQ") + + def _measure(self, channel: int, code: str) -> float: assert channel in SDS1000XE.AVAILABLE_CHANNELS - query = f"C{channel}:PAVA? AMPL" + query = f"C{channel}:PAVA? {code}" self._socket.sendall(query.encode()) try: + # TODO add code to regex response = self._socket.recv(4096).decode() - m = re.search(r"C(?P\d):PAVA AMPL,(?P.+)V", response) + m = re.search(r"C(?P\d):PAVA .+,(?P.+)V", response) measurement = float(m.group("rawMeasurement")) - except (TimeoutError, AttributeError) as e: + except TimeoutError as e: measurement = None return measurement - def measurePkToPk(self, channel: int) -> float: - pass - - def measureRMS(self, channel: int) -> float: - pass - - def measureFrequency(self, channel: int) -> float: - pass - diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py index fc6128e..f5a2b1a 100644 --- a/lab_control/test/mock_sds1000xe_server.py +++ b/lab_control/test/mock_sds1000xe_server.py @@ -18,10 +18,6 @@ def _cleanup(): atexit.register(_cleanup) class MockSDS1000XEServer: - class ChannelValues: - def __init__(self): - self.amplitude = None - def __init__(self): self._stopFlag = False self._clientSocket = None @@ -29,7 +25,7 @@ class MockSDS1000XEServer: self._mainThread.start() # Mock measured values - self._channels = [MockSDS1000XEServer.ChannelValues() for i in range(0, 4)] + self._channels = [{"AMPL" : None} for i in range(0, 4)] def _mainLoop(self) -> None: self._clientSocket, _ = _serverSocket.accept() @@ -39,7 +35,6 @@ class MockSDS1000XEServer: while not self._stopFlag: try: request = self._clientSocket.recv(4096).decode() - response = self._handleRequest(request) if response is not None: self._clientSocket.send(response.encode()) @@ -54,19 +49,31 @@ class MockSDS1000XEServer: return None channelIndex = int(m.group("channel")) - 1 - if m.group("opcode") == "PAVA": - if m.group("arg") == "AMPL": - if self._channels[channelIndex].amplitude is None: - return None - else: - value = self._channels[channelIndex].amplitude - response = f"C{m.group('channel')}:PAVA AMPL,{value:.6E}V" - return response + opcode = m.group("opcode") + + if opcode == "PAVA": + arg = m.group("arg") + value = self._channels[channelIndex].get(arg) + + if value is None: + return None + else: + response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V" + return response def stop(self) -> None: self._stopFlag = True self._mainThread.join() - def setAmplitude(self, channel: int, ampl: float) -> None: - self._channels[channel - 1].amplitude = ampl + def setAmplitude(self, channel: int, value: float) -> None: + self._channels[channel - 1]["AMPL"] = value + + def setPeakToPeak(self, channel: int, value: float) -> None: + self._channels[channel - 1]["PKPK"] = value + + def setRMS(self, channel: int, value: float) -> None: + self._channels[channel - 1]["RMS"] = value + + def setFrequency(self, channel: int, value: float) -> None: + self._channels[channel - 1]["FREQ"] = value diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py index 88d5022..ea48c87 100644 --- a/lab_control/test/sds1000xe_test.py +++ b/lab_control/test/sds1000xe_test.py @@ -15,16 +15,27 @@ def mockServer(): def uut(mockServer): return SDS1000XE(MOCK_IP) +def checkFloatMeasurement(testCases, setValue, measureValue): + for channel, expectedValue in testCases: + setValue(channel, expectedValue) + measuredValue = measureValue(channel) + assert measuredValue == expectedValue + def test_amplitudeMeasurement(uut, mockServer): testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + checkFloatMeasurement(testCases, mockServer.setAmplitude, uut.measureAmplitude) - for t in testCases: - channel = t[0] - expectedAmplitude = t[1] - mockServer.setAmplitude(channel, expectedAmplitude) +def test_peakToPeakMeasurement(uut, mockServer): + testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + checkFloatMeasurement(testCases, mockServer.setPeakToPeak, uut.measurePeakToPeak) - measuredAmplitude = uut.measureAmplitude(channel) - assert measuredAmplitude == expectedAmplitude +def test_RMSMeasurement(uut, mockServer): + testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + checkFloatMeasurement(testCases, mockServer.setRMS, uut.measureRMS) + +def test_FrequencyMeasurement(uut, mockServer): + testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + checkFloatMeasurement(testCases, mockServer.setFrequency, uut.measureFrequency) def test_invalidChannel(uut, mockServer): # Channel is checked by the UUT before the request is sent -- cgit v1.2.3