summaryrefslogtreecommitdiffstats
path: root/lab_control
diff options
context:
space:
mode:
Diffstat (limited to 'lab_control')
-rw-r--r--lab_control/sds1000xe.py36
-rw-r--r--lab_control/test/mock_sds1000xe_server.py50
-rw-r--r--lab_control/test/sds1000xe_test.py19
3 files changed, 87 insertions, 18 deletions
diff --git a/lab_control/sds1000xe.py b/lab_control/sds1000xe.py
index f71e422..d86c1c7 100644
--- a/lab_control/sds1000xe.py
+++ b/lab_control/sds1000xe.py
@@ -1,9 +1,39 @@
import socket
+import re
+from lab_control.oscilloscope import Oscilloscope
-class SDS1000XE:
+class SDS1000XE(Oscilloscope):
PORT = 5025
+ TIMEOUT = 0.2
+
+ AVAILABLE_CHANNELS = range(1, 5)
def __init__(self, ip):
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.connect((ip, SDS1000XE.PORT))
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.connect((ip, SDS1000XE.PORT))
+ self._socket.settimeout(SDS1000XE.TIMEOUT)
+
+ def measureAmplitude(self, channel: int) -> float:
+ assert channel in SDS1000XE.AVAILABLE_CHANNELS
+
+ query = f"C{channel}:PAVA? AMPL"
+ self._socket.sendall(query.encode())
+
+ try:
+ response = self._socket.recv(4096).decode()
+ m = re.search(r"C(?P<responseChannel>\d):PAVA AMPL,(?P<rawMeasurement>.+)V", response)
+ measurement = float(m.group("rawMeasurement"))
+ except (TimeoutError, AttributeError) as e:
+ measurement = None
+
+ return measurement
+
+ def measurePkToPk(self, channel: int) -> float:
+ pass
+
+ def measureRMS(self, channel: int) -> float:
+ pass
+
+ def measureFrequency(self, channel: int) -> float:
+ pass
diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py
index 5e222c7..fc6128e 100644
--- a/lab_control/test/mock_sds1000xe_server.py
+++ b/lab_control/test/mock_sds1000xe_server.py
@@ -1,48 +1,72 @@
import socket
import threading
import atexit
+import re
IP = "0.0.0.0"
PORT = 5025
# Bind server socket when this module is included
_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_serverSocket.bind((IP, PORT))
_serverSocket.listen(1)
# Close it when the program exits
-atexit.register(_serverSocket.close)
+def _cleanup():
+ _serverSocket.close()
+atexit.register(_cleanup)
class MockSDS1000XEServer:
- MOTD = "Hello".encode()
-
+ class ChannelValues:
+ def __init__(self):
+ self.amplitude = None
+
def __init__(self):
self._stopFlag = False
self._clientSocket = None
self._mainThread = threading.Thread(target=self._mainLoop)
self._mainThread.start()
+ # Mock measured values
+ self._channels = [MockSDS1000XEServer.ChannelValues() for i in range(0, 4)]
+
def _mainLoop(self) -> None:
self._clientSocket, _ = _serverSocket.accept()
self._clientSocket.settimeout(0.1)
- self._clientSocket.send(MockSDS1000XEServer.MOTD)
try:
while not self._stopFlag:
- request = self._clientSocket.recv(4096).decode()
- if request != "":
- print(f"** Received request: {request}")
+ try:
+ request = self._clientSocket.recv(4096).decode()
+
response = self._handleRequest(request)
- self._clientSocket.send(response.encode())
- except TimeoutError as e:
- pass
+ if response is not None:
+ self._clientSocket.send(response.encode())
+ except TimeoutError as e:
+ pass
finally:
self._clientSocket.close()
def _handleRequest(self, request: str) -> str:
- return "NOP"
+ m = re.search(r"C(?P<channel>\d):(?P<opcode>\w{4})\?\s(?P<arg>\w+)", request)
+ if not m:
+ return None
+
+ channelIndex = int(m.group("channel")) - 1
+ if m.group("opcode") == "PAVA":
+ if m.group("arg") == "AMPL":
+ if self._channels[channelIndex].amplitude is None:
+ return None
+ else:
+ value = self._channels[channelIndex].amplitude
+ response = f"C{m.group('channel')}:PAVA AMPL,{value:.6E}V"
+ return response
- def stop(self):
+ def stop(self) -> None:
self._stopFlag = True
self._mainThread.join()
-
+
+ def setAmplitude(self, channel: int, ampl: float) -> None:
+ self._channels[channel - 1].amplitude = ampl
+
diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py
index 87cdefd..88d5022 100644
--- a/lab_control/test/sds1000xe_test.py
+++ b/lab_control/test/sds1000xe_test.py
@@ -1,5 +1,4 @@
import pytest
-import time
from lab_control.sds1000xe import SDS1000XE
from lab_control.test.mock_sds1000xe_server import MockSDS1000XEServer
@@ -17,5 +16,21 @@ def uut(mockServer):
return SDS1000XE(MOCK_IP)
def test_amplitudeMeasurement(uut, mockServer):
- assert True
+ testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)]
+
+ for t in testCases:
+ channel = t[0]
+ expectedAmplitude = t[1]
+ mockServer.setAmplitude(channel, expectedAmplitude)
+
+ measuredAmplitude = uut.measureAmplitude(channel)
+ assert measuredAmplitude == expectedAmplitude
+
+def test_invalidChannel(uut, mockServer):
+ # Channel is checked by the UUT before the request is sent
+ testCases = [0, 5]
+
+ for t in testCases:
+ with pytest.raises(AssertionError):
+ measuredAmplitude = uut.measureAmplitude(t)