diff options
Diffstat (limited to 'lab_control/test')
-rw-r--r-- | lab_control/test/mock_sds1000xe_device.py | 44 | ||||
-rw-r--r-- | lab_control/test/sds1000xe_unittest.py (renamed from lab_control/test/sds1000xe_test.py) | 19 | ||||
-rw-r--r-- | lab_control/test/virtual_tcp_server.py | 44 |
3 files changed, 54 insertions, 53 deletions
diff --git a/lab_control/test/mock_sds1000xe_device.py b/lab_control/test/mock_sds1000xe_device.py index 68c2471..04ec07a 100644 --- a/lab_control/test/mock_sds1000xe_device.py +++ b/lab_control/test/mock_sds1000xe_device.py @@ -1,50 +1,12 @@ -import socket -import threading -import atexit import re -IP = "0.0.0.0" -PORT = 5025 - -# Bind server socket when this module is included -_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -_serverSocket.bind((IP, PORT)) -_serverSocket.listen(1) - -# Close it when the program exits -def _cleanUp(): - _serverSocket.close() -atexit.register(_cleanUp) - class MockSDS1000XEDevice: def __init__(self): - self._stopFlag = False - self._clientSocket = None - self._mainThread = threading.Thread(target=self._mainLoop) - self._mainThread.start() - # Mock internal values self._channels = [{"AMPL" : None, "VDIV" : None} for i in range(0, 4)] - def _mainLoop(self) -> None: - self._clientSocket, _ = _serverSocket.accept() - self._clientSocket.settimeout(0.1) - - try: - while not self._stopFlag: - try: - request = self._clientSocket.recv(4096).decode() - response = self._handleRequest(request.strip()) - if response is not None: - self._clientSocket.send(response.encode()) - except TimeoutError as e: - pass - finally: - self._clientSocket.close() - def _handleRequest(self, request: str) -> str: - m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\??\s(?P<arg>.+)", request) + m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\??\s(?P<arg>.+)", request.strip()) if not m: return None @@ -66,9 +28,7 @@ class MockSDS1000XEDevice: self._channels[channelIndex]["VDIV"] = arg return None - def stop(self) -> None: - self._stopFlag = True - self._mainThread.join() + return None def setAmplitude(self, channel: int, value: float) -> None: self._channels[channel - 1]["AMPL"] = value diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_unittest.py index 3774d52..ce87a6e 100644 --- a/lab_control/test/sds1000xe_test.py +++ b/lab_control/test/sds1000xe_unittest.py @@ -1,20 +1,20 @@ import pytest -import time from lab_control.sds1000xe import SDS1000XE +from lab_control.connection.direct_connection import DirectConnection as MockConnection from lab_control.test.mock_sds1000xe_device import MockSDS1000XEDevice -MOCK_DEVICE_IP = "127.0.0.1" - @pytest.fixture def mockDevice(): - d = MockSDS1000XEDevice() - yield d - d.stop() + return MockSDS1000XEDevice() + +@pytest.fixture +def mockConnection(mockDevice): + return MockConnection(mockDevice._handleRequest) @pytest.fixture -def uut(mockDevice): - return SDS1000XE(MOCK_DEVICE_IP) +def uut(mockConnection): + return SDS1000XE("", overrideConnection=mockConnection) def checkFloatMeasurement(testValues, setValue, measureValue): for channel in SDS1000XE.AVAILABLE_CHANNELS: @@ -57,8 +57,5 @@ def test_setVoltsPerDivision(uut, mockDevice): for value in testValues: uut.setVoltsPerDivision(channel, value) - - time.sleep(0.1) # Allow time for the mock to receive and process the request - assert mockDevice.getVoltsPerDivision(channel) == value diff --git a/lab_control/test/virtual_tcp_server.py b/lab_control/test/virtual_tcp_server.py new file mode 100644 index 0000000..07a4345 --- /dev/null +++ b/lab_control/test/virtual_tcp_server.py @@ -0,0 +1,44 @@ +import socket +import threading +import atexit + +IP = "0.0.0.0" +PORT = 5025 + +# Bind server socket when this module is included +_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +_serverSocket.bind((IP, PORT)) +_serverSocket.listen(1) + +# Close it when the program exits +def _cleanUp(): + _serverSocket.close() +atexit.register(_cleanUp) + +class VirtualTCPServer: + def __init__(self): + self._stopFlag = False + self._clientSocket = None + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + def _mainLoop(self) -> None: + self._clientSocket, _ = _serverSocket.accept() + self._clientSocket.settimeout(0.1) + + try: + while not self._stopFlag: + try: + request = self._clientSocket.recv(4096).decode() + response = self._handleRequest(request.strip()) + if response is not None: + self._clientSocket.send(response.encode()) + except TimeoutError as e: + pass + finally: + self._clientSocket.close() + + def stop(self) -> None: + self._stopFlag = True + self._mainThread.join() |