From d37252c9b6044ce540db54e8b480a85dfc1ec25d Mon Sep 17 00:00:00 2001 From: Eddy Pedroni Date: Sun, 5 Jun 2022 16:49:33 +0200 Subject: Moved mocks to separate module --- lab_control/test/frequency_response_test.py | 2 +- lab_control/test/jds6600_unittest.py | 2 +- lab_control/test/mock/__init__.py | 0 lab_control/test/mock/mock_jds6600_device.py | 85 ++++++++++++++++++++++++++ lab_control/test/mock/mock_lab.py | 71 +++++++++++++++++++++ lab_control/test/mock/mock_sds1000xe_device.py | 47 ++++++++++++++ lab_control/test/mock/virtual_serial_port.py | 34 +++++++++++ lab_control/test/mock/virtual_tcp_server.py | 44 +++++++++++++ lab_control/test/mock_jds6600_device.py | 85 -------------------------- lab_control/test/mock_lab.py | 71 --------------------- lab_control/test/mock_sds1000xe_device.py | 47 -------------- lab_control/test/sds1000xe_unittest.py | 2 +- lab_control/test/virtual_serial_port.py | 34 ----------- lab_control/test/virtual_tcp_server.py | 44 ------------- 14 files changed, 284 insertions(+), 284 deletions(-) create mode 100644 lab_control/test/mock/__init__.py create mode 100644 lab_control/test/mock/mock_jds6600_device.py create mode 100644 lab_control/test/mock/mock_lab.py create mode 100644 lab_control/test/mock/mock_sds1000xe_device.py create mode 100644 lab_control/test/mock/virtual_serial_port.py create mode 100644 lab_control/test/mock/virtual_tcp_server.py delete mode 100644 lab_control/test/mock_jds6600_device.py delete mode 100644 lab_control/test/mock_lab.py delete mode 100644 lab_control/test/mock_sds1000xe_device.py delete mode 100644 lab_control/test/virtual_serial_port.py delete mode 100644 lab_control/test/virtual_tcp_server.py (limited to 'lab_control') diff --git a/lab_control/test/frequency_response_test.py b/lab_control/test/frequency_response_test.py index d694887..eb7d0d0 100644 --- a/lab_control/test/frequency_response_test.py +++ b/lab_control/test/frequency_response_test.py @@ -1,6 +1,6 @@ import pytest -from lab_control.test.mock_lab import MockLab +from lab_control.test.mock.mock_lab import MockLab from lab_control.frequency_response import FrequencyResponseMeasurement from lab_control.measurement import getLinearRange from lab_control.function_generator import FunctionGenerator diff --git a/lab_control/test/jds6600_unittest.py b/lab_control/test/jds6600_unittest.py index c3b283b..476514b 100644 --- a/lab_control/test/jds6600_unittest.py +++ b/lab_control/test/jds6600_unittest.py @@ -1,7 +1,7 @@ import pytest from lab_control.jds6600 import JDS6600 -from lab_control.test.mock_jds6600_device import MockJDS6600Device +from lab_control.test.mock.mock_jds6600_device import MockJDS6600Device from lab_control.connection.direct_connection import DirectConnection as MockConnection @pytest.fixture diff --git a/lab_control/test/mock/__init__.py b/lab_control/test/mock/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lab_control/test/mock/mock_jds6600_device.py b/lab_control/test/mock/mock_jds6600_device.py new file mode 100644 index 0000000..8b7b440 --- /dev/null +++ b/lab_control/test/mock/mock_jds6600_device.py @@ -0,0 +1,85 @@ +import re + +class MockJDS6600Device: + class ChannelState: + def __init__(self): + self.on = False + self.frequency = None + self.amplitude = None + self.function = None + + def __init__(self): + self._channels = [MockJDS6600Device.ChannelState() for i in [1, 2]] + self._injectedFailureCounter = 0 + + def _handleRequest(self, request: str) -> str: + pattern = r":(?P[wrab])(?P\d+)=(?P.*)\." + m = re.search(pattern, request) + + if not m: + return None + + opcode = m.group("opcode") + function = int(m.group("function")) + args = m.group("args").split(",") + + # channel on/off + if function == 20: + if opcode == "w": + self._channels[0].on = args[0] == "1" + self._channels[1].on = args[1] == "1" + return ":ok\r\n" + elif opcode == "r": + return f":r20={int(self._channels[0].on)},{int(self._channels[1].on)}.\r\n" + + # channel frequency + elif function == 23 or function == 24: + ch = function - 23 + if opcode == "w": + # Actual device takes a second argument for scaling, here we ignore it and always use 0 (Hz) + frequency = float(args[0]) / 100.0 + + if self._injectedFailureCounter > 0: + self._channels[ch].frequency = 0.0 + self._injectedFailureCounter -= 1 + else: + self._channels[ch].frequency = frequency + return ":ok\r\n" + elif opcode == "r": + frequency = self._channels[ch].frequency + return f":r{function}={int(frequency)},0.\r\n" + + # channel amplitude + elif function == 25 or function == 26: + if opcode == "w": + ch = function - 25 + amplitude = float(args[0]) / 1000.0 + self._channels[ch].amplitude = amplitude + return ":ok\r\n" + + # channel function shape + elif function == 21 or function == 22: + if opcode == "w": + ch = function - 21 + shape = int(args[0]) + self._channels[ch].function = shape + return ":ok\r\n" + + # Unknown request format, no response + return None + + def isOn(self, ch: int) -> bool: + return self._channels[ch - 1].on + + def getFrequency(self, ch: int) -> float: + return self._channels[ch - 1].frequency + + def getAmplitude(self, ch: int) -> float: + return self._channels[ch - 1].amplitude + + def getFunction(self, ch: int) -> int: + return self._channels[ch - 1].function + + def injectFailures(self, count: int) -> None: + self._injectedFailureCounter += count + diff --git a/lab_control/test/mock/mock_lab.py b/lab_control/test/mock/mock_lab.py new file mode 100644 index 0000000..1016b92 --- /dev/null +++ b/lab_control/test/mock/mock_lab.py @@ -0,0 +1,71 @@ +from collections.abc import Callable + +from lab_control.function_generator import FunctionGenerator +from lab_control.oscilloscope import Oscilloscope + +class MockLab(FunctionGenerator, Oscilloscope): + class FGChannelState: + def __init__(self): + self.on = False + self.frequency = None + self.amplitude = None + self.function = None + + class OscChannelState: + def __init__(self): + self.testFunction = None + self.connectedChannel = None + self.voltsPerDiv = None + + def __init__(self): + self.fgChannels = [MockLab.FGChannelState() for i in range(0, 2)] + self.oscChannels = [MockLab.OscChannelState() for i in range(0, 4)] + + def setOn(self, channel: int) -> None: + self.fgChannels[channel - 1].on = True + + def setOff(self, channel: int) -> None: + self.fgChannels[channel - 1].on = False + + def setFrequency(self, channel: int, frequency: float) -> None: + self.fgChannels[channel - 1].frequency = frequency + + def setAmplitude(self, channel: int, amplitude: float) -> None: + self.fgChannels[channel - 1].amplitude = amplitude + + def setFunction(self, channel: int, function: int) -> None: + self.fgChannels[channel - 1].function = function + + def measureAmplitude(self, channel: int) -> float: + fgChannel = self.oscChannels[channel - 1].connectedChannel + frequency = fgChannel.frequency if fgChannel.on else 0.0 + amplitude = fgChannel.amplitude if fgChannel.on else 0.0 + + return self.oscChannels[channel - 1].testFunction(frequency) * amplitude + + def measurePeakToPeak(self, channel: int) -> float: + pass + + def measureRMS(self, channel: int) -> float: + pass + + def measureFrequency(self, channel: int) -> float: + pass + + def setVoltsPerDivision(self, channel: int, volts: float) -> None: + self.oscChannels[channel - 1].voltsPerDiv = volts + + def getDivisionsDisplayed(self) -> int: + return 12 + + def setTestFunction(self, channel: int, f: Callable[[float], float]) -> None: + self.oscChannels[channel - 1].testFunction = f + + def connectChannels(self, fg: int, osc: int) -> None: + self.oscChannels[osc - 1].connectedChannel = self.fgChannels[fg - 1] + + def getFunctionGeneratorChannel(self, channel: int): + return self.fgChannels[channel - 1] + + def getOscilloscopeChannel(self, channel: int): + return self.oscChannels[channel - 1] diff --git a/lab_control/test/mock/mock_sds1000xe_device.py b/lab_control/test/mock/mock_sds1000xe_device.py new file mode 100644 index 0000000..04ec07a --- /dev/null +++ b/lab_control/test/mock/mock_sds1000xe_device.py @@ -0,0 +1,47 @@ +import re + +class MockSDS1000XEDevice: + def __init__(self): + # Mock internal values + self._channels = [{"AMPL" : None, "VDIV" : None} for i in range(0, 4)] + + def _handleRequest(self, request: str) -> str: + m = re.search(r"C(?P\d):(?P\w+)\??\s(?P.+)", request.strip()) + if not m: + return None + + channelIndex = int(m.group("channel")) - 1 + opcode = m.group("opcode") + + if opcode == "PAVA": + arg = m.group("arg") + value = self._channels[channelIndex].get(arg) + unit = "Hz" if arg == "FREQ" else "V" + + if value is None: + return None + else: + response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}{unit}" + return response + elif opcode == "VDIV": + arg = float(m.group("arg").rstrip("V")) + self._channels[channelIndex]["VDIV"] = arg + return None + + return None + + def setAmplitude(self, channel: int, value: float) -> None: + self._channels[channel - 1]["AMPL"] = value + + def setPeakToPeak(self, channel: int, value: float) -> None: + self._channels[channel - 1]["PKPK"] = value + + def setRMS(self, channel: int, value: float) -> None: + self._channels[channel - 1]["RMS"] = value + + def setFrequency(self, channel: int, value: float) -> None: + self._channels[channel - 1]["FREQ"] = value + + def getVoltsPerDivision(self, channel: int) -> float: + return self._channels[channel - 1]["VDIV"] + diff --git a/lab_control/test/mock/virtual_serial_port.py b/lab_control/test/mock/virtual_serial_port.py new file mode 100644 index 0000000..f46e29c --- /dev/null +++ b/lab_control/test/mock/virtual_serial_port.py @@ -0,0 +1,34 @@ +import os +import pty +import termios +import threading + +class VirtualSerialPort: + def __init__(self, requestHandler): + self._master, self._slave = pty.openpty() + self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) + self._portName = os.ttyname(self._slave) + self._requestHandler = requestHandler + + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + def stop(self) -> None: + self._masterFile.close() + os.close(self._master) + os.close(self._slave) + self._mainThread.join() + + def _mainLoop(self) -> None: + while True: + try: + request = self._masterFile.readline().decode().strip() + response = self._requestHandler(request) + + if response is not None: + self._masterFile.write(response.encode()) + except OSError as e: + break + + def getPortName(self) -> str: + return self._portName diff --git a/lab_control/test/mock/virtual_tcp_server.py b/lab_control/test/mock/virtual_tcp_server.py new file mode 100644 index 0000000..07a4345 --- /dev/null +++ b/lab_control/test/mock/virtual_tcp_server.py @@ -0,0 +1,44 @@ +import socket +import threading +import atexit + +IP = "0.0.0.0" +PORT = 5025 + +# Bind server socket when this module is included +_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +_serverSocket.bind((IP, PORT)) +_serverSocket.listen(1) + +# Close it when the program exits +def _cleanUp(): + _serverSocket.close() +atexit.register(_cleanUp) + +class VirtualTCPServer: + def __init__(self): + self._stopFlag = False + self._clientSocket = None + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + def _mainLoop(self) -> None: + self._clientSocket, _ = _serverSocket.accept() + self._clientSocket.settimeout(0.1) + + try: + while not self._stopFlag: + try: + request = self._clientSocket.recv(4096).decode() + response = self._handleRequest(request.strip()) + if response is not None: + self._clientSocket.send(response.encode()) + except TimeoutError as e: + pass + finally: + self._clientSocket.close() + + def stop(self) -> None: + self._stopFlag = True + self._mainThread.join() diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py deleted file mode 100644 index 8b7b440..0000000 --- a/lab_control/test/mock_jds6600_device.py +++ /dev/null @@ -1,85 +0,0 @@ -import re - -class MockJDS6600Device: - class ChannelState: - def __init__(self): - self.on = False - self.frequency = None - self.amplitude = None - self.function = None - - def __init__(self): - self._channels = [MockJDS6600Device.ChannelState() for i in [1, 2]] - self._injectedFailureCounter = 0 - - def _handleRequest(self, request: str) -> str: - pattern = r":(?P[wrab])(?P\d+)=(?P.*)\." - m = re.search(pattern, request) - - if not m: - return None - - opcode = m.group("opcode") - function = int(m.group("function")) - args = m.group("args").split(",") - - # channel on/off - if function == 20: - if opcode == "w": - self._channels[0].on = args[0] == "1" - self._channels[1].on = args[1] == "1" - return ":ok\r\n" - elif opcode == "r": - return f":r20={int(self._channels[0].on)},{int(self._channels[1].on)}.\r\n" - - # channel frequency - elif function == 23 or function == 24: - ch = function - 23 - if opcode == "w": - # Actual device takes a second argument for scaling, here we ignore it and always use 0 (Hz) - frequency = float(args[0]) / 100.0 - - if self._injectedFailureCounter > 0: - self._channels[ch].frequency = 0.0 - self._injectedFailureCounter -= 1 - else: - self._channels[ch].frequency = frequency - return ":ok\r\n" - elif opcode == "r": - frequency = self._channels[ch].frequency - return f":r{function}={int(frequency)},0.\r\n" - - # channel amplitude - elif function == 25 or function == 26: - if opcode == "w": - ch = function - 25 - amplitude = float(args[0]) / 1000.0 - self._channels[ch].amplitude = amplitude - return ":ok\r\n" - - # channel function shape - elif function == 21 or function == 22: - if opcode == "w": - ch = function - 21 - shape = int(args[0]) - self._channels[ch].function = shape - return ":ok\r\n" - - # Unknown request format, no response - return None - - def isOn(self, ch: int) -> bool: - return self._channels[ch - 1].on - - def getFrequency(self, ch: int) -> float: - return self._channels[ch - 1].frequency - - def getAmplitude(self, ch: int) -> float: - return self._channels[ch - 1].amplitude - - def getFunction(self, ch: int) -> int: - return self._channels[ch - 1].function - - def injectFailures(self, count: int) -> None: - self._injectedFailureCounter += count - diff --git a/lab_control/test/mock_lab.py b/lab_control/test/mock_lab.py deleted file mode 100644 index 1016b92..0000000 --- a/lab_control/test/mock_lab.py +++ /dev/null @@ -1,71 +0,0 @@ -from collections.abc import Callable - -from lab_control.function_generator import FunctionGenerator -from lab_control.oscilloscope import Oscilloscope - -class MockLab(FunctionGenerator, Oscilloscope): - class FGChannelState: - def __init__(self): - self.on = False - self.frequency = None - self.amplitude = None - self.function = None - - class OscChannelState: - def __init__(self): - self.testFunction = None - self.connectedChannel = None - self.voltsPerDiv = None - - def __init__(self): - self.fgChannels = [MockLab.FGChannelState() for i in range(0, 2)] - self.oscChannels = [MockLab.OscChannelState() for i in range(0, 4)] - - def setOn(self, channel: int) -> None: - self.fgChannels[channel - 1].on = True - - def setOff(self, channel: int) -> None: - self.fgChannels[channel - 1].on = False - - def setFrequency(self, channel: int, frequency: float) -> None: - self.fgChannels[channel - 1].frequency = frequency - - def setAmplitude(self, channel: int, amplitude: float) -> None: - self.fgChannels[channel - 1].amplitude = amplitude - - def setFunction(self, channel: int, function: int) -> None: - self.fgChannels[channel - 1].function = function - - def measureAmplitude(self, channel: int) -> float: - fgChannel = self.oscChannels[channel - 1].connectedChannel - frequency = fgChannel.frequency if fgChannel.on else 0.0 - amplitude = fgChannel.amplitude if fgChannel.on else 0.0 - - return self.oscChannels[channel - 1].testFunction(frequency) * amplitude - - def measurePeakToPeak(self, channel: int) -> float: - pass - - def measureRMS(self, channel: int) -> float: - pass - - def measureFrequency(self, channel: int) -> float: - pass - - def setVoltsPerDivision(self, channel: int, volts: float) -> None: - self.oscChannels[channel - 1].voltsPerDiv = volts - - def getDivisionsDisplayed(self) -> int: - return 12 - - def setTestFunction(self, channel: int, f: Callable[[float], float]) -> None: - self.oscChannels[channel - 1].testFunction = f - - def connectChannels(self, fg: int, osc: int) -> None: - self.oscChannels[osc - 1].connectedChannel = self.fgChannels[fg - 1] - - def getFunctionGeneratorChannel(self, channel: int): - return self.fgChannels[channel - 1] - - def getOscilloscopeChannel(self, channel: int): - return self.oscChannels[channel - 1] diff --git a/lab_control/test/mock_sds1000xe_device.py b/lab_control/test/mock_sds1000xe_device.py deleted file mode 100644 index 04ec07a..0000000 --- a/lab_control/test/mock_sds1000xe_device.py +++ /dev/null @@ -1,47 +0,0 @@ -import re - -class MockSDS1000XEDevice: - def __init__(self): - # Mock internal values - self._channels = [{"AMPL" : None, "VDIV" : None} for i in range(0, 4)] - - def _handleRequest(self, request: str) -> str: - m = re.search(r"C(?P\d):(?P\w+)\??\s(?P.+)", request.strip()) - if not m: - return None - - channelIndex = int(m.group("channel")) - 1 - opcode = m.group("opcode") - - if opcode == "PAVA": - arg = m.group("arg") - value = self._channels[channelIndex].get(arg) - unit = "Hz" if arg == "FREQ" else "V" - - if value is None: - return None - else: - response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}{unit}" - return response - elif opcode == "VDIV": - arg = float(m.group("arg").rstrip("V")) - self._channels[channelIndex]["VDIV"] = arg - return None - - return None - - def setAmplitude(self, channel: int, value: float) -> None: - self._channels[channel - 1]["AMPL"] = value - - def setPeakToPeak(self, channel: int, value: float) -> None: - self._channels[channel - 1]["PKPK"] = value - - def setRMS(self, channel: int, value: float) -> None: - self._channels[channel - 1]["RMS"] = value - - def setFrequency(self, channel: int, value: float) -> None: - self._channels[channel - 1]["FREQ"] = value - - def getVoltsPerDivision(self, channel: int) -> float: - return self._channels[channel - 1]["VDIV"] - diff --git a/lab_control/test/sds1000xe_unittest.py b/lab_control/test/sds1000xe_unittest.py index ce87a6e..b05b901 100644 --- a/lab_control/test/sds1000xe_unittest.py +++ b/lab_control/test/sds1000xe_unittest.py @@ -2,7 +2,7 @@ import pytest from lab_control.sds1000xe import SDS1000XE from lab_control.connection.direct_connection import DirectConnection as MockConnection -from lab_control.test.mock_sds1000xe_device import MockSDS1000XEDevice +from lab_control.test.mock.mock_sds1000xe_device import MockSDS1000XEDevice @pytest.fixture def mockDevice(): diff --git a/lab_control/test/virtual_serial_port.py b/lab_control/test/virtual_serial_port.py deleted file mode 100644 index f46e29c..0000000 --- a/lab_control/test/virtual_serial_port.py +++ /dev/null @@ -1,34 +0,0 @@ -import os -import pty -import termios -import threading - -class VirtualSerialPort: - def __init__(self, requestHandler): - self._master, self._slave = pty.openpty() - self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) - self._portName = os.ttyname(self._slave) - self._requestHandler = requestHandler - - self._mainThread = threading.Thread(target=self._mainLoop) - self._mainThread.start() - - def stop(self) -> None: - self._masterFile.close() - os.close(self._master) - os.close(self._slave) - self._mainThread.join() - - def _mainLoop(self) -> None: - while True: - try: - request = self._masterFile.readline().decode().strip() - response = self._requestHandler(request) - - if response is not None: - self._masterFile.write(response.encode()) - except OSError as e: - break - - def getPortName(self) -> str: - return self._portName diff --git a/lab_control/test/virtual_tcp_server.py b/lab_control/test/virtual_tcp_server.py deleted file mode 100644 index 07a4345..0000000 --- a/lab_control/test/virtual_tcp_server.py +++ /dev/null @@ -1,44 +0,0 @@ -import socket -import threading -import atexit - -IP = "0.0.0.0" -PORT = 5025 - -# Bind server socket when this module is included -_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -_serverSocket.bind((IP, PORT)) -_serverSocket.listen(1) - -# Close it when the program exits -def _cleanUp(): - _serverSocket.close() -atexit.register(_cleanUp) - -class VirtualTCPServer: - def __init__(self): - self._stopFlag = False - self._clientSocket = None - self._mainThread = threading.Thread(target=self._mainLoop) - self._mainThread.start() - - def _mainLoop(self) -> None: - self._clientSocket, _ = _serverSocket.accept() - self._clientSocket.settimeout(0.1) - - try: - while not self._stopFlag: - try: - request = self._clientSocket.recv(4096).decode() - response = self._handleRequest(request.strip()) - if response is not None: - self._clientSocket.send(response.encode()) - except TimeoutError as e: - pass - finally: - self._clientSocket.close() - - def stop(self) -> None: - self._stopFlag = True - self._mainThread.join() -- cgit v1.2.3