summaryrefslogtreecommitdiffstats
path: root/lab_control/test/mock_sds1000xe_device.py
diff options
context:
space:
mode:
Diffstat (limited to 'lab_control/test/mock_sds1000xe_device.py')
-rw-r--r--lab_control/test/mock_sds1000xe_device.py79
1 files changed, 79 insertions, 0 deletions
diff --git a/lab_control/test/mock_sds1000xe_device.py b/lab_control/test/mock_sds1000xe_device.py
new file mode 100644
index 0000000..ce752e9
--- /dev/null
+++ b/lab_control/test/mock_sds1000xe_device.py
@@ -0,0 +1,79 @@
+import socket
+import threading
+import atexit
+import re
+
+IP = "0.0.0.0"
+PORT = 5025
+
+# Bind server socket when this module is included
+_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+_serverSocket.bind((IP, PORT))
+_serverSocket.listen(1)
+
+# Close it when the program exits
+def _cleanUp():
+ _serverSocket.close()
+atexit.register(_cleanUp)
+
+class MockSDS1000XEDevice:
+ def __init__(self):
+ self._stopFlag = False
+ self._clientSocket = None
+ self._mainThread = threading.Thread(target=self._mainLoop)
+ self._mainThread.start()
+
+ # Mock measured values
+ self._channels = [{"AMPL" : None} for i in range(0, 4)]
+
+ def _mainLoop(self) -> None:
+ self._clientSocket, _ = _serverSocket.accept()
+ self._clientSocket.settimeout(0.1)
+
+ try:
+ while not self._stopFlag:
+ try:
+ request = self._clientSocket.recv(4096).decode()
+ response = self._handleRequest(request)
+ if response is not None:
+ self._clientSocket.send(response.encode())
+ except TimeoutError as e:
+ pass
+ finally:
+ self._clientSocket.close()
+
+ def _handleRequest(self, request: str) -> str:
+ m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\?\s(?P<arg>\w+)", request)
+ if not m:
+ return None
+
+ channelIndex = int(m.group("channel")) - 1
+ opcode = m.group("opcode")
+
+ if opcode == "PAVA":
+ arg = m.group("arg")
+ value = self._channels[channelIndex].get(arg)
+
+ if value is None:
+ return None
+ else:
+ response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V"
+ return response
+
+ def stop(self) -> None:
+ self._stopFlag = True
+ self._mainThread.join()
+
+ def setAmplitude(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["AMPL"] = value
+
+ def setPeakToPeak(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["PKPK"] = value
+
+ def setRMS(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["RMS"] = value
+
+ def setFrequency(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["FREQ"] = value
+