From 1e1d5aafd5018e462a93fa5c528fd36b0b0bbde1 Mon Sep 17 00:00:00 2001 From: Eddy Pedroni Date: Thu, 26 May 2022 12:12:41 +0200 Subject: Added amplitude measurement implementation --- lab_control/sds1000xe.py | 36 ++++++++++++++++++++-- lab_control/test/mock_sds1000xe_server.py | 50 +++++++++++++++++++++++-------- lab_control/test/sds1000xe_test.py | 19 ++++++++++-- 3 files changed, 87 insertions(+), 18 deletions(-) diff --git a/lab_control/sds1000xe.py b/lab_control/sds1000xe.py index f71e422..d86c1c7 100644 --- a/lab_control/sds1000xe.py +++ b/lab_control/sds1000xe.py @@ -1,9 +1,39 @@ import socket +import re +from lab_control.oscilloscope import Oscilloscope -class SDS1000XE: +class SDS1000XE(Oscilloscope): PORT = 5025 + TIMEOUT = 0.2 + + AVAILABLE_CHANNELS = range(1, 5) def __init__(self, ip): - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((ip, SDS1000XE.PORT)) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect((ip, SDS1000XE.PORT)) + self._socket.settimeout(SDS1000XE.TIMEOUT) + + def measureAmplitude(self, channel: int) -> float: + assert channel in SDS1000XE.AVAILABLE_CHANNELS + + query = f"C{channel}:PAVA? AMPL" + self._socket.sendall(query.encode()) + + try: + response = self._socket.recv(4096).decode() + m = re.search(r"C(?P\d):PAVA AMPL,(?P.+)V", response) + measurement = float(m.group("rawMeasurement")) + except (TimeoutError, AttributeError) as e: + measurement = None + + return measurement + + def measurePkToPk(self, channel: int) -> float: + pass + + def measureRMS(self, channel: int) -> float: + pass + + def measureFrequency(self, channel: int) -> float: + pass diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py index 5e222c7..fc6128e 100644 --- a/lab_control/test/mock_sds1000xe_server.py +++ b/lab_control/test/mock_sds1000xe_server.py @@ -1,48 +1,72 @@ import socket import threading import atexit +import re IP = "0.0.0.0" PORT = 5025 # Bind server socket when this module is included _serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) _serverSocket.bind((IP, PORT)) _serverSocket.listen(1) # Close it when the program exits -atexit.register(_serverSocket.close) +def _cleanup(): + _serverSocket.close() +atexit.register(_cleanup) class MockSDS1000XEServer: - MOTD = "Hello".encode() - + class ChannelValues: + def __init__(self): + self.amplitude = None + def __init__(self): self._stopFlag = False self._clientSocket = None self._mainThread = threading.Thread(target=self._mainLoop) self._mainThread.start() + # Mock measured values + self._channels = [MockSDS1000XEServer.ChannelValues() for i in range(0, 4)] + def _mainLoop(self) -> None: self._clientSocket, _ = _serverSocket.accept() self._clientSocket.settimeout(0.1) - self._clientSocket.send(MockSDS1000XEServer.MOTD) try: while not self._stopFlag: - request = self._clientSocket.recv(4096).decode() - if request != "": - print(f"** Received request: {request}") + try: + request = self._clientSocket.recv(4096).decode() + response = self._handleRequest(request) - self._clientSocket.send(response.encode()) - except TimeoutError as e: - pass + if response is not None: + self._clientSocket.send(response.encode()) + except TimeoutError as e: + pass finally: self._clientSocket.close() def _handleRequest(self, request: str) -> str: - return "NOP" + m = re.search(r"C(?P\d):(?P\w{4})\?\s(?P\w+)", request) + if not m: + return None + + channelIndex = int(m.group("channel")) - 1 + if m.group("opcode") == "PAVA": + if m.group("arg") == "AMPL": + if self._channels[channelIndex].amplitude is None: + return None + else: + value = self._channels[channelIndex].amplitude + response = f"C{m.group('channel')}:PAVA AMPL,{value:.6E}V" + return response - def stop(self): + def stop(self) -> None: self._stopFlag = True self._mainThread.join() - + + def setAmplitude(self, channel: int, ampl: float) -> None: + self._channels[channel - 1].amplitude = ampl + diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py index 87cdefd..88d5022 100644 --- a/lab_control/test/sds1000xe_test.py +++ b/lab_control/test/sds1000xe_test.py @@ -1,5 +1,4 @@ import pytest -import time from lab_control.sds1000xe import SDS1000XE from lab_control.test.mock_sds1000xe_server import MockSDS1000XEServer @@ -17,5 +16,21 @@ def uut(mockServer): return SDS1000XE(MOCK_IP) def test_amplitudeMeasurement(uut, mockServer): - assert True + testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)] + + for t in testCases: + channel = t[0] + expectedAmplitude = t[1] + mockServer.setAmplitude(channel, expectedAmplitude) + + measuredAmplitude = uut.measureAmplitude(channel) + assert measuredAmplitude == expectedAmplitude + +def test_invalidChannel(uut, mockServer): + # Channel is checked by the UUT before the request is sent + testCases = [0, 5] + + for t in testCases: + with pytest.raises(AssertionError): + measuredAmplitude = uut.measureAmplitude(t) -- cgit v1.2.3