From e6f589732fb73b66bfaeeb5ce4ed3eed0ce3d4e8 Mon Sep 17 00:00:00 2001 From: Eddy Pedroni Date: Thu, 26 May 2022 08:36:50 +0200 Subject: Improved mock oscilloscope server --- lab_control/sds1000xe.py | 7 ++-- lab_control/test/mock_sds1000xe_server.py | 53 +++++++++++++++++-------------- lab_control/test/sds1000xe_test.py | 15 ++++----- 3 files changed, 40 insertions(+), 35 deletions(-) diff --git a/lab_control/sds1000xe.py b/lab_control/sds1000xe.py index fff4f24..f71e422 100644 --- a/lab_control/sds1000xe.py +++ b/lab_control/sds1000xe.py @@ -1,6 +1,9 @@ import socket class SDS1000XE: - def __init__(self, ip, port=5025): + PORT = 5025 + + def __init__(self, ip): self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((ip, port)) + self.socket.connect((ip, SDS1000XE.PORT)) + diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_server.py index 7da90c9..5e222c7 100644 --- a/lab_control/test/mock_sds1000xe_server.py +++ b/lab_control/test/mock_sds1000xe_server.py @@ -1,43 +1,48 @@ import socket import threading +import atexit -class MockSDS1000XEServer: - IP = "0.0.0.0" - PORT = 5025 +IP = "0.0.0.0" +PORT = 5025 + +# Bind server socket when this module is included +_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +_serverSocket.bind((IP, PORT)) +_serverSocket.listen(1) + +# Close it when the program exits +atexit.register(_serverSocket.close) +class MockSDS1000XEServer: MOTD = "Hello".encode() def __init__(self): - self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.serverSocket.bind((MockSDS1000XEServer.IP, MockSDS1000XEServer.PORT)) - self.serverSocket.listen(1) - print("server socket is bound") - - self.main = threading.Thread(target=self._mainLoop) - self.main.start() + self._stopFlag = False + self._clientSocket = None + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() def _mainLoop(self) -> None: - clientSocket, _ = self.serverSocket.accept() - clientSocket.settimeout(0.1) - clientSocket.send(MockSDS1000XEServer.MOTD) + self._clientSocket, _ = _serverSocket.accept() + self._clientSocket.settimeout(0.1) + self._clientSocket.send(MockSDS1000XEServer.MOTD) try: - while True: - request = clientSocket.recv(4096).decode() + while not self._stopFlag: + request = self._clientSocket.recv(4096).decode() if request != "": print(f"** Received request: {request}") response = self._handleRequest(request) - clientSocket.send(response.encode()) - except Exception as e: - print("Exception caught in main server loop") - print(e) + self._clientSocket.send(response.encode()) + except TimeoutError as e: + pass + finally: + self._clientSocket.close() def _handleRequest(self, request: str) -> str: return "NOP" def stop(self): - print("Closing server socket") - self.serverSocket.close() - print("joining main") - self.main.join() - print("stopped") + self._stopFlag = True + self._mainThread.join() + diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py index 7cbee70..87cdefd 100644 --- a/lab_control/test/sds1000xe_test.py +++ b/lab_control/test/sds1000xe_test.py @@ -1,9 +1,10 @@ import pytest +import time from lab_control.sds1000xe import SDS1000XE -from lab_control.mock_sds1000xe_server import MockSDS1000XEServer +from lab_control.test.mock_sds1000xe_server import MockSDS1000XEServer -testIP = "127.0.0.1" +MOCK_IP = "127.0.0.1" @pytest.fixture def mockServer(): @@ -13,12 +14,8 @@ def mockServer(): @pytest.fixture def uut(mockServer): - return SDS1000XE(testIP) - -def test_defaults(): - pass - -def test_differentPort(uut, mockServer): - assert False + return SDS1000XE(MOCK_IP) +def test_amplitudeMeasurement(uut, mockServer): + assert True -- cgit v1.2.3