summaryrefslogtreecommitdiffstats
path: root/lab_control/test/mock
diff options
context:
space:
mode:
Diffstat (limited to 'lab_control/test/mock')
-rw-r--r--lab_control/test/mock/__init__.py0
-rw-r--r--lab_control/test/mock/mock_jds6600_device.py85
-rw-r--r--lab_control/test/mock/mock_lab.py71
-rw-r--r--lab_control/test/mock/mock_sds1000xe_device.py47
-rw-r--r--lab_control/test/mock/virtual_serial_port.py34
-rw-r--r--lab_control/test/mock/virtual_tcp_server.py44
6 files changed, 281 insertions, 0 deletions
diff --git a/lab_control/test/mock/__init__.py b/lab_control/test/mock/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lab_control/test/mock/__init__.py
diff --git a/lab_control/test/mock/mock_jds6600_device.py b/lab_control/test/mock/mock_jds6600_device.py
new file mode 100644
index 0000000..8b7b440
--- /dev/null
+++ b/lab_control/test/mock/mock_jds6600_device.py
@@ -0,0 +1,85 @@
+import re
+
+class MockJDS6600Device:
+ class ChannelState:
+ def __init__(self):
+ self.on = False
+ self.frequency = None
+ self.amplitude = None
+ self.function = None
+
+ def __init__(self):
+ self._channels = [MockJDS6600Device.ChannelState() for i in [1, 2]]
+ self._injectedFailureCounter = 0
+
+ def _handleRequest(self, request: str) -> str:
+ pattern = r":(?P<opcode>[wrab])(?P<function>\d+)=(?P<args>.*)\."
+ m = re.search(pattern, request)
+
+ if not m:
+ return None
+
+ opcode = m.group("opcode")
+ function = int(m.group("function"))
+ args = m.group("args").split(",")
+
+ # channel on/off
+ if function == 20:
+ if opcode == "w":
+ self._channels[0].on = args[0] == "1"
+ self._channels[1].on = args[1] == "1"
+ return ":ok\r\n"
+ elif opcode == "r":
+ return f":r20={int(self._channels[0].on)},{int(self._channels[1].on)}.\r\n"
+
+ # channel frequency
+ elif function == 23 or function == 24:
+ ch = function - 23
+ if opcode == "w":
+ # Actual device takes a second argument for scaling, here we ignore it and always use 0 (Hz)
+ frequency = float(args[0]) / 100.0
+
+ if self._injectedFailureCounter > 0:
+ self._channels[ch].frequency = 0.0
+ self._injectedFailureCounter -= 1
+ else:
+ self._channels[ch].frequency = frequency
+ return ":ok\r\n"
+ elif opcode == "r":
+ frequency = self._channels[ch].frequency
+ return f":r{function}={int(frequency)},0.\r\n"
+
+ # channel amplitude
+ elif function == 25 or function == 26:
+ if opcode == "w":
+ ch = function - 25
+ amplitude = float(args[0]) / 1000.0
+ self._channels[ch].amplitude = amplitude
+ return ":ok\r\n"
+
+ # channel function shape
+ elif function == 21 or function == 22:
+ if opcode == "w":
+ ch = function - 21
+ shape = int(args[0])
+ self._channels[ch].function = shape
+ return ":ok\r\n"
+
+ # Unknown request format, no response
+ return None
+
+ def isOn(self, ch: int) -> bool:
+ return self._channels[ch - 1].on
+
+ def getFrequency(self, ch: int) -> float:
+ return self._channels[ch - 1].frequency
+
+ def getAmplitude(self, ch: int) -> float:
+ return self._channels[ch - 1].amplitude
+
+ def getFunction(self, ch: int) -> int:
+ return self._channels[ch - 1].function
+
+ def injectFailures(self, count: int) -> None:
+ self._injectedFailureCounter += count
+
diff --git a/lab_control/test/mock/mock_lab.py b/lab_control/test/mock/mock_lab.py
new file mode 100644
index 0000000..1016b92
--- /dev/null
+++ b/lab_control/test/mock/mock_lab.py
@@ -0,0 +1,71 @@
+from collections.abc import Callable
+
+from lab_control.function_generator import FunctionGenerator
+from lab_control.oscilloscope import Oscilloscope
+
+class MockLab(FunctionGenerator, Oscilloscope):
+ class FGChannelState:
+ def __init__(self):
+ self.on = False
+ self.frequency = None
+ self.amplitude = None
+ self.function = None
+
+ class OscChannelState:
+ def __init__(self):
+ self.testFunction = None
+ self.connectedChannel = None
+ self.voltsPerDiv = None
+
+ def __init__(self):
+ self.fgChannels = [MockLab.FGChannelState() for i in range(0, 2)]
+ self.oscChannels = [MockLab.OscChannelState() for i in range(0, 4)]
+
+ def setOn(self, channel: int) -> None:
+ self.fgChannels[channel - 1].on = True
+
+ def setOff(self, channel: int) -> None:
+ self.fgChannels[channel - 1].on = False
+
+ def setFrequency(self, channel: int, frequency: float) -> None:
+ self.fgChannels[channel - 1].frequency = frequency
+
+ def setAmplitude(self, channel: int, amplitude: float) -> None:
+ self.fgChannels[channel - 1].amplitude = amplitude
+
+ def setFunction(self, channel: int, function: int) -> None:
+ self.fgChannels[channel - 1].function = function
+
+ def measureAmplitude(self, channel: int) -> float:
+ fgChannel = self.oscChannels[channel - 1].connectedChannel
+ frequency = fgChannel.frequency if fgChannel.on else 0.0
+ amplitude = fgChannel.amplitude if fgChannel.on else 0.0
+
+ return self.oscChannels[channel - 1].testFunction(frequency) * amplitude
+
+ def measurePeakToPeak(self, channel: int) -> float:
+ pass
+
+ def measureRMS(self, channel: int) -> float:
+ pass
+
+ def measureFrequency(self, channel: int) -> float:
+ pass
+
+ def setVoltsPerDivision(self, channel: int, volts: float) -> None:
+ self.oscChannels[channel - 1].voltsPerDiv = volts
+
+ def getDivisionsDisplayed(self) -> int:
+ return 12
+
+ def setTestFunction(self, channel: int, f: Callable[[float], float]) -> None:
+ self.oscChannels[channel - 1].testFunction = f
+
+ def connectChannels(self, fg: int, osc: int) -> None:
+ self.oscChannels[osc - 1].connectedChannel = self.fgChannels[fg - 1]
+
+ def getFunctionGeneratorChannel(self, channel: int):
+ return self.fgChannels[channel - 1]
+
+ def getOscilloscopeChannel(self, channel: int):
+ return self.oscChannels[channel - 1]
diff --git a/lab_control/test/mock/mock_sds1000xe_device.py b/lab_control/test/mock/mock_sds1000xe_device.py
new file mode 100644
index 0000000..04ec07a
--- /dev/null
+++ b/lab_control/test/mock/mock_sds1000xe_device.py
@@ -0,0 +1,47 @@
+import re
+
+class MockSDS1000XEDevice:
+ def __init__(self):
+ # Mock internal values
+ self._channels = [{"AMPL" : None, "VDIV" : None} for i in range(0, 4)]
+
+ def _handleRequest(self, request: str) -> str:
+ m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\??\s(?P<arg>.+)", request.strip())
+ if not m:
+ return None
+
+ channelIndex = int(m.group("channel")) - 1
+ opcode = m.group("opcode")
+
+ if opcode == "PAVA":
+ arg = m.group("arg")
+ value = self._channels[channelIndex].get(arg)
+ unit = "Hz" if arg == "FREQ" else "V"
+
+ if value is None:
+ return None
+ else:
+ response = f"C{m.group('channel')}:PAVA {arg},{value:.6E}{unit}"
+ return response
+ elif opcode == "VDIV":
+ arg = float(m.group("arg").rstrip("V"))
+ self._channels[channelIndex]["VDIV"] = arg
+ return None
+
+ return None
+
+ def setAmplitude(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["AMPL"] = value
+
+ def setPeakToPeak(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["PKPK"] = value
+
+ def setRMS(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["RMS"] = value
+
+ def setFrequency(self, channel: int, value: float) -> None:
+ self._channels[channel - 1]["FREQ"] = value
+
+ def getVoltsPerDivision(self, channel: int) -> float:
+ return self._channels[channel - 1]["VDIV"]
+
diff --git a/lab_control/test/mock/virtual_serial_port.py b/lab_control/test/mock/virtual_serial_port.py
new file mode 100644
index 0000000..f46e29c
--- /dev/null
+++ b/lab_control/test/mock/virtual_serial_port.py
@@ -0,0 +1,34 @@
+import os
+import pty
+import termios
+import threading
+
+class VirtualSerialPort:
+ def __init__(self, requestHandler):
+ self._master, self._slave = pty.openpty()
+ self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0)
+ self._portName = os.ttyname(self._slave)
+ self._requestHandler = requestHandler
+
+ self._mainThread = threading.Thread(target=self._mainLoop)
+ self._mainThread.start()
+
+ def stop(self) -> None:
+ self._masterFile.close()
+ os.close(self._master)
+ os.close(self._slave)
+ self._mainThread.join()
+
+ def _mainLoop(self) -> None:
+ while True:
+ try:
+ request = self._masterFile.readline().decode().strip()
+ response = self._requestHandler(request)
+
+ if response is not None:
+ self._masterFile.write(response.encode())
+ except OSError as e:
+ break
+
+ def getPortName(self) -> str:
+ return self._portName
diff --git a/lab_control/test/mock/virtual_tcp_server.py b/lab_control/test/mock/virtual_tcp_server.py
new file mode 100644
index 0000000..07a4345
--- /dev/null
+++ b/lab_control/test/mock/virtual_tcp_server.py
@@ -0,0 +1,44 @@
+import socket
+import threading
+import atexit
+
+IP = "0.0.0.0"
+PORT = 5025
+
+# Bind server socket when this module is included
+_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+_serverSocket.bind((IP, PORT))
+_serverSocket.listen(1)
+
+# Close it when the program exits
+def _cleanUp():
+ _serverSocket.close()
+atexit.register(_cleanUp)
+
+class VirtualTCPServer:
+ def __init__(self):
+ self._stopFlag = False
+ self._clientSocket = None
+ self._mainThread = threading.Thread(target=self._mainLoop)
+ self._mainThread.start()
+
+ def _mainLoop(self) -> None:
+ self._clientSocket, _ = _serverSocket.accept()
+ self._clientSocket.settimeout(0.1)
+
+ try:
+ while not self._stopFlag:
+ try:
+ request = self._clientSocket.recv(4096).decode()
+ response = self._handleRequest(request.strip())
+ if response is not None:
+ self._clientSocket.send(response.encode())
+ except TimeoutError as e:
+ pass
+ finally:
+ self._clientSocket.close()
+
+ def stop(self) -> None:
+ self._stopFlag = True
+ self._mainThread.join()