summaryrefslogtreecommitdiffstats
path: root/lab_control/test/mock_sds1000xe_device.py
diff options
context:
space:
mode:
Diffstat (limited to 'lab_control/test/mock_sds1000xe_device.py')
-rw-r--r--lab_control/test/mock_sds1000xe_device.py44
1 files changed, 2 insertions, 42 deletions
diff --git a/lab_control/test/mock_sds1000xe_device.py b/lab_control/test/mock_sds1000xe_device.py
index 68c2471..04ec07a 100644
--- a/lab_control/test/mock_sds1000xe_device.py
+++ b/lab_control/test/mock_sds1000xe_device.py
@@ -1,50 +1,12 @@
-import socket
-import threading
-import atexit
import re
-IP = "0.0.0.0"
-PORT = 5025
-
-# Bind server socket when this module is included
-_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-_serverSocket.bind((IP, PORT))
-_serverSocket.listen(1)
-
-# Close it when the program exits
-def _cleanUp():
- _serverSocket.close()
-atexit.register(_cleanUp)
-
class MockSDS1000XEDevice:
def __init__(self):
- self._stopFlag = False
- self._clientSocket = None
- self._mainThread = threading.Thread(target=self._mainLoop)
- self._mainThread.start()
-
# Mock internal values
self._channels = [{"AMPL" : None, "VDIV" : None} for i in range(0, 4)]
- def _mainLoop(self) -> None:
- self._clientSocket, _ = _serverSocket.accept()
- self._clientSocket.settimeout(0.1)
-
- try:
- while not self._stopFlag:
- try:
- request = self._clientSocket.recv(4096).decode()
- response = self._handleRequest(request.strip())
- if response is not None:
- self._clientSocket.send(response.encode())
- except TimeoutError as e:
- pass
- finally:
- self._clientSocket.close()
-
def _handleRequest(self, request: str) -> str:
- m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\??\s(?P<arg>.+)", request)
+ m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\??\s(?P<arg>.+)", request.strip())
if not m:
return None
@@ -66,9 +28,7 @@ class MockSDS1000XEDevice:
self._channels[channelIndex]["VDIV"] = arg
return None
- def stop(self) -> None:
- self._stopFlag = True
- self._mainThread.join()
+ return None
def setAmplitude(self, channel: int, value: float) -> None:
self._channels[channel - 1]["AMPL"] = value