summaryrefslogtreecommitdiffstats
path: root/lab_control
diff options
context:
space:
mode:
authorEddy Pedroni <eddy@0xf7.com>2022-05-26 16:41:02 +0200
committerEddy Pedroni <eddy@0xf7.com>2022-05-26 16:41:02 +0200
commitb17f87f6e9bc938448c5aeef6241a6d1639f17af (patch)
treeec08ebd97d3ba3e4a4003ce69bcc844cd8201085 /lab_control
parent1e1d5aafd5018e462a93fa5c528fd36b0b0bbde1 (diff)
Refactored, added peak-to-peak, RMS and frequency measurements
Diffstat (limited to 'lab_control')
-rw-r--r--lab_control/oscilloscope.py2
-rw-r--r--lab_control/sds1000xe.py28
-rw-r--r--lab_control/test/mock_sds1000xe_server.py39
-rw-r--r--lab_control/test/sds1000xe_test.py23
4 files changed, 57 insertions, 35 deletions
diff --git a/lab_control/oscilloscope.py b/lab_control/oscilloscope.py
index a09f8a0..9f2d02c 100644
--- a/lab_control/oscilloscope.py
+++ b/lab_control/oscilloscope.py
@@ -5,7 +5,7 @@ class Oscilloscope:
def measureAmplitude(self, channel: int) -> float:
pass
- def measurePkToPk(self, channel: int) -> float:
+ def measurePeakToPeak(self, channel: int) -> float:
pass
def measureRMS(self, channel: int) -> float:
diff --git a/lab_control/sds1000xe.py b/lab_control/sds1000xe.py
index d86c1c7..c0654e0 100644
--- a/lab_control/sds1000xe.py
+++ b/lab_control/sds1000xe.py
@@ -14,26 +14,30 @@ class SDS1000XE(Oscilloscope):
self._socket.settimeout(SDS1000XE.TIMEOUT)
def measureAmplitude(self, channel: int) -> float:
+ return self._measure(channel, "AMPL")
+
+ def measurePeakToPeak(self, channel: int) -> float:
+ return self._measure(channel, "PKPK")
+
+ def measureRMS(self, channel: int) -> float:
+ return self._measure(channel, "RMS")
+
+ def measureFrequency(self, channel: int) -> float:
+ return self._measure(channel, "FREQ")
+
+ def _measure(self, channel: int, code: str) -> float:
assert channel in SDS1000XE.AVAILABLE_CHANNELS
- query = f"C{channel}:PAVA? AMPL"
+ query = f"C{channel}:PAVA? {code}"
self._socket.sendall(query.encode())
try:
+ # TODO add code to regex
response = self._socket.recv(4096).decode()
- m = re.search(r"C(?P<responseChannel>\d):PAVA AMPL,(?P<rawMeasurement>.+)V", response)
+ m = re.search(r"C(?P<responseChannel>\d):PAVA .+,(?P<rawMeasurement>.+)V", response)
measurement = float(m.group("rawMeasurement"))
- except (TimeoutError, AttributeError) as e:
+ except TimeoutError as e:
measurement = None
return measurement
- def measurePkToPk(self, channel: int) -> float:
- pass
-
- def measureRMS(self, channel: int) -> float:
- pass
-
- def measureFrequency(self, channel: int) -> float:
- pass
-
diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py
index fc6128e..f5a2b1a 100644
--- a/lab_control/test/mock_sds1000xe_server.py
+++ b/lab_control/test/mock_sds1000xe_server.py
@@ -18,10 +18,6 @@ def _cleanup():
atexit.register(_cleanup)
class MockSDS1000XEServer:
- class ChannelValues:
- def __init__(self):
- self.amplitude = None
-
def __init__(self):
self._stopFlag = False
self._clientSocket = None
@@ -29,7 +25,7 @@ class MockSDS1000XEServer:
self._mainThread.start()
# Mock measured values
- self._channels = [MockSDS1000XEServer.ChannelValues() for i in range(0, 4)]
+ self._channels = [{"AMPL" : None} for i in range(0, 4)]
def _mainLoop(self) -> None:
self._clientSocket, _ = _serverSocket.accept()
@@ -39,7 +35,6 @@ class MockSDS1000XEServer:
while not self._stopFlag:
try:
request = self._clientSocket.recv(4096).decode()
-
response = self._handleRequest(request)
if response is not None:
self._clientSocket.send(response.encode())
@@ -54,19 +49,31 @@ class MockSDS1000XEServer:
return None
channelIndex = int(m.group("channel")) - 1
- if m.group("opcode") == "PAVA":
- if m.group("arg") == "AMPL":
- if self._channels[channelIndex].amplitude is None:
- return None
- else:
- value = self._channels[channelIndex].amplitude
- response = f"C{m.group('channel')}:PAVA AMPL,{value:.6E}V"
- return response
+ opcode = m.group("opcode")
+
+ if opcode == "PAVA":
+ arg = m.group("arg")
+ value = self._channels[channelIndex].get(arg)
+
+ if value is None:
+ return None
+ else:
+ response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V"
+ return response
def stop(self) -> None:
self._stopFlag = True
self._mainThread.join()
- def setAmplitude(self, channel: int, ampl: float) -> None:
- self._channels[channel - 1].amplitude = ampl
+ def setAmplitude(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["AMPL"] = value
+
+ def setPeakToPeak(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["PKPK"] = value
+
+ def setRMS(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["RMS"] = value
+
+ def setFrequency(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["FREQ"] = value
diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py
index 88d5022..ea48c87 100644
--- a/lab_control/test/sds1000xe_test.py
+++ b/lab_control/test/sds1000xe_test.py
@@ -15,16 +15,27 @@ def mockServer():
def uut(mockServer):
return SDS1000XE(MOCK_IP)
+def checkFloatMeasurement(testCases, setValue, measureValue):
+ for channel, expectedValue in testCases:
+ setValue(channel, expectedValue)
+ measuredValue = measureValue(channel)
+ assert measuredValue == expectedValue
+
def test_amplitudeMeasurement(uut, mockServer):
testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)]
+ checkFloatMeasurement(testCases, mockServer.setAmplitude, uut.measureAmplitude)
- for t in testCases:
- channel = t[0]
- expectedAmplitude = t[1]
- mockServer.setAmplitude(channel, expectedAmplitude)
+def test_peakToPeakMeasurement(uut, mockServer):
+ testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)]
+ checkFloatMeasurement(testCases, mockServer.setPeakToPeak, uut.measurePeakToPeak)
- measuredAmplitude = uut.measureAmplitude(channel)
- assert measuredAmplitude == expectedAmplitude
+def test_RMSMeasurement(uut, mockServer):
+ testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)]
+ checkFloatMeasurement(testCases, mockServer.setRMS, uut.measureRMS)
+
+def test_FrequencyMeasurement(uut, mockServer):
+ testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)]
+ checkFloatMeasurement(testCases, mockServer.setFrequency, uut.measureFrequency)
def test_invalidChannel(uut, mockServer):
# Channel is checked by the UUT before the request is sent