From a0783e9f9698b3370c96eaab574d433e25372bc7 Mon Sep 17 00:00:00 2001 From: Eddy Pedroni Date: Sun, 29 May 2022 19:20:09 +0200 Subject: Added JDS6600 initial implementation, test device doesn't work --- lab_control/test/mock_sds1000xe_server.py | 79 ------------------------------- 1 file changed, 79 deletions(-) delete mode 100644 lab_control/test/mock_sds1000xe_server.py (limited to 'lab_control/test/mock_sds1000xe_server.py') diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py deleted file mode 100644 index 1b72146..0000000 --- a/lab_control/test/mock_sds1000xe_server.py +++ /dev/null @@ -1,79 +0,0 @@ -import socket -import threading -import atexit -import re - -IP = "0.0.0.0" -PORT = 5025 - -# Bind server socket when this module is included -_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -_serverSocket.bind((IP, PORT)) -_serverSocket.listen(1) - -# Close it when the program exits -def _cleanup(): - _serverSocket.close() -atexit.register(_cleanup) - -class MockSDS1000XEServer: - def __init__(self): - self._stopFlag = False - self._clientSocket = None - self._mainThread = threading.Thread(target=self._mainLoop) - self._mainThread.start() - - # Mock measured values - self._channels = [{"AMPL" : None} for i in range(0, 4)] - - def _mainLoop(self) -> None: - self._clientSocket, _ = _serverSocket.accept() - self._clientSocket.settimeout(0.1) - - try: - while not self._stopFlag: - try: - request = self._clientSocket.recv(4096).decode() - response = self._handleRequest(request) - if response is not None: - self._clientSocket.send(response.encode()) - except TimeoutError as e: - pass - finally: - self._clientSocket.close() - - def _handleRequest(self, request: str) -> str: - m = re.search(r"C(?P\d):(?P\w+)\?\s(?P\w+)", request) - if not m: - return None - - channelIndex = int(m.group("channel")) - 1 - opcode = m.group("opcode") - - if opcode == "PAVA": - arg = m.group("arg") - value = self._channels[channelIndex].get(arg) - - if value is None: - return None - else: - response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V" - return response - - def stop(self) -> None: - self._stopFlag = True - self._mainThread.join() - - def setAmplitude(self, channel: int, value: float) -> None: - self._channels[channel - 1]["AMPL"] = value - - def setPeakToPeak(self, channel: int, value: float) -> None: - self._channels[channel - 1]["PKPK"] = value - - def setRMS(self, channel: int, value: float) -> None: - self._channels[channel - 1]["RMS"] = value - - def setFrequency(self, channel: int, value: float) -> None: - self._channels[channel - 1]["FREQ"] = value - -- cgit v1.2.3