diff options
Diffstat (limited to 'lab_control/test/mock_sds1000xe_device.py')
-rw-r--r-- | lab_control/test/mock_sds1000xe_device.py | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/lab_control/test/mock_sds1000xe_device.py b/lab_control/test/mock_sds1000xe_device.py new file mode 100644 index 0000000..ce752e9 --- /dev/null +++ b/lab_control/test/mock_sds1000xe_device.py @@ -0,0 +1,79 @@ +import socket +import threading +import atexit +import re + +IP = "0.0.0.0" +PORT = 5025 + +# Bind server socket when this module is included +_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +_serverSocket.bind((IP, PORT)) +_serverSocket.listen(1) + +# Close it when the program exits +def _cleanUp(): + _serverSocket.close() +atexit.register(_cleanUp) + +class MockSDS1000XEDevice: + def __init__(self): + self._stopFlag = False + self._clientSocket = None + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + # Mock measured values + self._channels = [{"AMPL" : None} for i in range(0, 4)] + + def _mainLoop(self) -> None: + self._clientSocket, _ = _serverSocket.accept() + self._clientSocket.settimeout(0.1) + + try: + while not self._stopFlag: + try: + request = self._clientSocket.recv(4096).decode() + response = self._handleRequest(request) + if response is not None: + self._clientSocket.send(response.encode()) + except TimeoutError as e: + pass + finally: + self._clientSocket.close() + + def _handleRequest(self, request: str) -> str: + m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\?\s(?P<arg>\w+)", request) + if not m: + return None + + channelIndex = int(m.group("channel")) - 1 + opcode = m.group("opcode") + + if opcode == "PAVA": + arg = m.group("arg") + value = self._channels[channelIndex].get(arg) + + if value is None: + return None + else: + response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V" + return response + + def stop(self) -> None: + self._stopFlag = True + self._mainThread.join() + + def setAmplitude(self, channel: int, value: float) -> None: + self._channels[channel - 1]["AMPL"] = value + + def setPeakToPeak(self, channel: int, value: float) -> None: + self._channels[channel - 1]["PKPK"] = value + + def setRMS(self, channel: int, value: float) -> None: + self._channels[channel - 1]["RMS"] = value + + def setFrequency(self, channel: int, value: float) -> None: + self._channels[channel - 1]["FREQ"] = value + |