diff options
Diffstat (limited to 'lab_control/test/mock')
-rw-r--r-- | lab_control/test/mock/__init__.py | 0 | ||||
-rw-r--r-- | lab_control/test/mock/mock_jds6600_device.py | 85 | ||||
-rw-r--r-- | lab_control/test/mock/mock_lab.py | 71 | ||||
-rw-r--r-- | lab_control/test/mock/mock_sds1000xe_device.py | 47 | ||||
-rw-r--r-- | lab_control/test/mock/virtual_serial_port.py | 34 | ||||
-rw-r--r-- | lab_control/test/mock/virtual_tcp_server.py | 44 |
6 files changed, 281 insertions, 0 deletions
diff --git a/lab_control/test/mock/__init__.py b/lab_control/test/mock/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lab_control/test/mock/__init__.py diff --git a/lab_control/test/mock/mock_jds6600_device.py b/lab_control/test/mock/mock_jds6600_device.py new file mode 100644 index 0000000..8b7b440 --- /dev/null +++ b/lab_control/test/mock/mock_jds6600_device.py @@ -0,0 +1,85 @@ +import re + +class MockJDS6600Device: + class ChannelState: + def __init__(self): + self.on = False + self.frequency = None + self.amplitude = None + self.function = None + + def __init__(self): + self._channels = [MockJDS6600Device.ChannelState() for i in [1, 2]] + self._injectedFailureCounter = 0 + + def _handleRequest(self, request: str) -> str: + pattern = r":(?P<opcode>[wrab])(?P<function>\d+)=(?P<args>.*)\." + m = re.search(pattern, request) + + if not m: + return None + + opcode = m.group("opcode") + function = int(m.group("function")) + args = m.group("args").split(",") + + # channel on/off + if function == 20: + if opcode == "w": + self._channels[0].on = args[0] == "1" + self._channels[1].on = args[1] == "1" + return ":ok\r\n" + elif opcode == "r": + return f":r20={int(self._channels[0].on)},{int(self._channels[1].on)}.\r\n" + + # channel frequency + elif function == 23 or function == 24: + ch = function - 23 + if opcode == "w": + # Actual device takes a second argument for scaling, here we ignore it and always use 0 (Hz) + frequency = float(args[0]) / 100.0 + + if self._injectedFailureCounter > 0: + self._channels[ch].frequency = 0.0 + self._injectedFailureCounter -= 1 + else: + self._channels[ch].frequency = frequency + return ":ok\r\n" + elif opcode == "r": + frequency = self._channels[ch].frequency + return f":r{function}={int(frequency)},0.\r\n" + + # channel amplitude + elif function == 25 or function == 26: + if opcode == "w": + ch = function - 25 + amplitude = float(args[0]) / 1000.0 + self._channels[ch].amplitude = amplitude + return ":ok\r\n" + + # channel function shape + elif function == 21 or function == 22: + if opcode == "w": + ch = function - 21 + shape = int(args[0]) + self._channels[ch].function = shape + return ":ok\r\n" + + # Unknown request format, no response + return None + + def isOn(self, ch: int) -> bool: + return self._channels[ch - 1].on + + def getFrequency(self, ch: int) -> float: + return self._channels[ch - 1].frequency + + def getAmplitude(self, ch: int) -> float: + return self._channels[ch - 1].amplitude + + def getFunction(self, ch: int) -> int: + return self._channels[ch - 1].function + + def injectFailures(self, count: int) -> None: + self._injectedFailureCounter += count + diff --git a/lab_control/test/mock/mock_lab.py b/lab_control/test/mock/mock_lab.py new file mode 100644 index 0000000..1016b92 --- /dev/null +++ b/lab_control/test/mock/mock_lab.py @@ -0,0 +1,71 @@ +from collections.abc import Callable + +from lab_control.function_generator import FunctionGenerator +from lab_control.oscilloscope import Oscilloscope + +class MockLab(FunctionGenerator, Oscilloscope): + class FGChannelState: + def __init__(self): + self.on = False + self.frequency = None + self.amplitude = None + self.function = None + + class OscChannelState: + def __init__(self): + self.testFunction = None + self.connectedChannel = None + self.voltsPerDiv = None + + def __init__(self): + self.fgChannels = [MockLab.FGChannelState() for i in range(0, 2)] + self.oscChannels = [MockLab.OscChannelState() for i in range(0, 4)] + + def setOn(self, channel: int) -> None: + self.fgChannels[channel - 1].on = True + + def setOff(self, channel: int) -> None: + self.fgChannels[channel - 1].on = False + + def setFrequency(self, channel: int, frequency: float) -> None: + self.fgChannels[channel - 1].frequency = frequency + + def setAmplitude(self, channel: int, amplitude: float) -> None: + self.fgChannels[channel - 1].amplitude = amplitude + + def setFunction(self, channel: int, function: int) -> None: + self.fgChannels[channel - 1].function = function + + def measureAmplitude(self, channel: int) -> float: + fgChannel = self.oscChannels[channel - 1].connectedChannel + frequency = fgChannel.frequency if fgChannel.on else 0.0 + amplitude = fgChannel.amplitude if fgChannel.on else 0.0 + + return self.oscChannels[channel - 1].testFunction(frequency) * amplitude + + def measurePeakToPeak(self, channel: int) -> float: + pass + + def measureRMS(self, channel: int) -> float: + pass + + def measureFrequency(self, channel: int) -> float: + pass + + def setVoltsPerDivision(self, channel: int, volts: float) -> None: + self.oscChannels[channel - 1].voltsPerDiv = volts + + def getDivisionsDisplayed(self) -> int: + return 12 + + def setTestFunction(self, channel: int, f: Callable[[float], float]) -> None: + self.oscChannels[channel - 1].testFunction = f + + def connectChannels(self, fg: int, osc: int) -> None: + self.oscChannels[osc - 1].connectedChannel = self.fgChannels[fg - 1] + + def getFunctionGeneratorChannel(self, channel: int): + return self.fgChannels[channel - 1] + + def getOscilloscopeChannel(self, channel: int): + return self.oscChannels[channel - 1] diff --git a/lab_control/test/mock/mock_sds1000xe_device.py b/lab_control/test/mock/mock_sds1000xe_device.py new file mode 100644 index 0000000..04ec07a --- /dev/null +++ b/lab_control/test/mock/mock_sds1000xe_device.py @@ -0,0 +1,47 @@ +import re + +class MockSDS1000XEDevice: + def __init__(self): + # Mock internal values + self._channels = [{"AMPL" : None, "VDIV" : None} for i in range(0, 4)] + + def _handleRequest(self, request: str) -> str: + m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\??\s(?P<arg>.+)", request.strip()) + if not m: + return None + + channelIndex = int(m.group("channel")) - 1 + opcode = m.group("opcode") + + if opcode == "PAVA": + arg = m.group("arg") + value = self._channels[channelIndex].get(arg) + unit = "Hz" if arg == "FREQ" else "V" + + if value is None: + return None + else: + response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}{unit}" + return response + elif opcode == "VDIV": + arg = float(m.group("arg").rstrip("V")) + self._channels[channelIndex]["VDIV"] = arg + return None + + return None + + def setAmplitude(self, channel: int, value: float) -> None: + self._channels[channel - 1]["AMPL"] = value + + def setPeakToPeak(self, channel: int, value: float) -> None: + self._channels[channel - 1]["PKPK"] = value + + def setRMS(self, channel: int, value: float) -> None: + self._channels[channel - 1]["RMS"] = value + + def setFrequency(self, channel: int, value: float) -> None: + self._channels[channel - 1]["FREQ"] = value + + def getVoltsPerDivision(self, channel: int) -> float: + return self._channels[channel - 1]["VDIV"] + diff --git a/lab_control/test/mock/virtual_serial_port.py b/lab_control/test/mock/virtual_serial_port.py new file mode 100644 index 0000000..f46e29c --- /dev/null +++ b/lab_control/test/mock/virtual_serial_port.py @@ -0,0 +1,34 @@ +import os +import pty +import termios +import threading + +class VirtualSerialPort: + def __init__(self, requestHandler): + self._master, self._slave = pty.openpty() + self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) + self._portName = os.ttyname(self._slave) + self._requestHandler = requestHandler + + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + def stop(self) -> None: + self._masterFile.close() + os.close(self._master) + os.close(self._slave) + self._mainThread.join() + + def _mainLoop(self) -> None: + while True: + try: + request = self._masterFile.readline().decode().strip() + response = self._requestHandler(request) + + if response is not None: + self._masterFile.write(response.encode()) + except OSError as e: + break + + def getPortName(self) -> str: + return self._portName diff --git a/lab_control/test/mock/virtual_tcp_server.py b/lab_control/test/mock/virtual_tcp_server.py new file mode 100644 index 0000000..07a4345 --- /dev/null +++ b/lab_control/test/mock/virtual_tcp_server.py @@ -0,0 +1,44 @@ +import socket +import threading +import atexit + +IP = "0.0.0.0" +PORT = 5025 + +# Bind server socket when this module is included +_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +_serverSocket.bind((IP, PORT)) +_serverSocket.listen(1) + +# Close it when the program exits +def _cleanUp(): + _serverSocket.close() +atexit.register(_cleanUp) + +class VirtualTCPServer: + def __init__(self): + self._stopFlag = False + self._clientSocket = None + self._mainThread = threading.Thread(target=self._mainLoop) + self._mainThread.start() + + def _mainLoop(self) -> None: + self._clientSocket, _ = _serverSocket.accept() + self._clientSocket.settimeout(0.1) + + try: + while not self._stopFlag: + try: + request = self._clientSocket.recv(4096).decode() + response = self._handleRequest(request.strip()) + if response is not None: + self._clientSocket.send(response.encode()) + except TimeoutError as e: + pass + finally: + self._clientSocket.close() + + def stop(self) -> None: + self._stopFlag = True + self._mainThread.join() |