summaryrefslogtreecommitdiffstats
path: root/lab_control/test/mock_sds1000xe_server.py
diff options
context:
space:
mode:
authorEddy Pedroni <eddy@0xf7.com>2022-05-29 19:20:09 +0200
committerEddy Pedroni <eddy@0xf7.com>2022-05-29 19:20:09 +0200
commita0783e9f9698b3370c96eaab574d433e25372bc7 (patch)
tree265540f0beb122edff0688293a968067e14c545c /lab_control/test/mock_sds1000xe_server.py
parent8d4621a92e58b1466010f2ad3aeada5e04a453b2 (diff)
Added JDS6600 initial implementation, test device doesn't work
Diffstat (limited to 'lab_control/test/mock_sds1000xe_server.py')
-rw-r--r--lab_control/test/mock_sds1000xe_server.py79
1 files changed, 0 insertions, 79 deletions
diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py
deleted file mode 100644
index 1b72146..0000000
--- a/lab_control/test/mock_sds1000xe_server.py
+++ /dev/null
@@ -1,79 +0,0 @@
-import socket
-import threading
-import atexit
-import re
-
-IP = "0.0.0.0"
-PORT = 5025
-
-# Bind server socket when this module is included
-_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-_serverSocket.bind((IP, PORT))
-_serverSocket.listen(1)
-
-# Close it when the program exits
-def _cleanup():
- _serverSocket.close()
-atexit.register(_cleanup)
-
-class MockSDS1000XEServer:
- def __init__(self):
- self._stopFlag = False
- self._clientSocket = None
- self._mainThread = threading.Thread(target=self._mainLoop)
- self._mainThread.start()
-
- # Mock measured values
- self._channels = [{"AMPL" : None} for i in range(0, 4)]
-
- def _mainLoop(self) -> None:
- self._clientSocket, _ = _serverSocket.accept()
- self._clientSocket.settimeout(0.1)
-
- try:
- while not self._stopFlag:
- try:
- request = self._clientSocket.recv(4096).decode()
- response = self._handleRequest(request)
- if response is not None:
- self._clientSocket.send(response.encode())
- except TimeoutError as e:
- pass
- finally:
- self._clientSocket.close()
-
- def _handleRequest(self, request: str) -> str:
- m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\?\s(?P<arg>\w+)", request)
- if not m:
- return None
-
- channelIndex = int(m.group("channel")) - 1
- opcode = m.group("opcode")
-
- if opcode == "PAVA":
- arg = m.group("arg")
- value = self._channels[channelIndex].get(arg)
-
- if value is None:
- return None
- else:
- response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}V"
- return response
-
- def stop(self) -> None:
- self._stopFlag = True
- self._mainThread.join()
-
- def setAmplitude(self, channel: int, value: float) -> None:
- self._channels[channel - 1]["AMPL"] = value
-
- def setPeakToPeak(self, channel: int, value: float) -> None:
- self._channels[channel - 1]["PKPK"] = value
-
- def setRMS(self, channel: int, value: float) -> None:
- self._channels[channel - 1]["RMS"] = value
-
- def setFrequency(self, channel: int, value: float) -> None:
- self._channels[channel - 1]["FREQ"] = value
-