import socket import threading import atexit import re IP = "0.0.0.0" PORT = 5025 # Bind server socket when this module is included _serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) _serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) _serverSocket.bind((IP, PORT)) _serverSocket.listen(1) # Close it when the program exits def _cleanUp(): _serverSocket.close() atexit.register(_cleanUp) class MockSDS1000XEDevice: def __init__(self): self._stopFlag = False self._clientSocket = None self._mainThread = threading.Thread(target=self._mainLoop) self._mainThread.start() # Mock measured values self._channels = [{"AMPL" : None} for i in range(0, 4)] def _mainLoop(self) -> None: self._clientSocket, _ = _serverSocket.accept() self._clientSocket.settimeout(0.1) try: while not self._stopFlag: try: request = self._clientSocket.recv(4096).decode() response = self._handleRequest(request) if response is not None: self._clientSocket.send(response.encode()) except TimeoutError as e: pass finally: self._clientSocket.close() def _handleRequest(self, request: str) -> str: m = re.search(r"C(?P\d):(?P\w+)\?\s(?P\w+)", request) if not m: return None channelIndex = int(m.group("channel")) - 1 opcode = m.group("opcode") if opcode == "PAVA": arg = m.group("arg") value = self._channels[channelIndex].get(arg) unit = "Hz" if arg == "FREQ" else "V" if value is None: return None else: response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}{unit}" return response def stop(self) -> None: self._stopFlag = True self._mainThread.join() def setAmplitude(self, channel: int, value: float) -> None: self._channels[channel - 1]["AMPL"] = value def setPeakToPeak(self, channel: int, value: float) -> None: self._channels[channel - 1]["PKPK"] = value def setRMS(self, channel: int, value: float) -> None: self._channels[channel - 1]["RMS"] = value def setFrequency(self, channel: int, value: float) -> None: self._channels[channel - 1]["FREQ"] = value