summaryrefslogtreecommitdiffstats
path: root/lab_control/test/mock/mock_jds6600_device.py
diff options
context:
space:
mode:
Diffstat (limited to 'lab_control/test/mock/mock_jds6600_device.py')
-rw-r--r--lab_control/test/mock/mock_jds6600_device.py85
1 files changed, 85 insertions, 0 deletions
diff --git a/lab_control/test/mock/mock_jds6600_device.py b/lab_control/test/mock/mock_jds6600_device.py
new file mode 100644
index 0000000..8b7b440
--- /dev/null
+++ b/lab_control/test/mock/mock_jds6600_device.py
@@ -0,0 +1,85 @@
+import re
+
+class MockJDS6600Device:
+ class ChannelState:
+ def __init__(self):
+ self.on = False
+ self.frequency = None
+ self.amplitude = None
+ self.function = None
+
+ def __init__(self):
+ self._channels = [MockJDS6600Device.ChannelState() for i in [1, 2]]
+ self._injectedFailureCounter = 0
+
+ def _handleRequest(self, request: str) -> str:
+ pattern = r":(?P<opcode>[wrab])(?P<function>\d+)=(?P<args>.*)\."
+ m = re.search(pattern, request)
+
+ if not m:
+ return None
+
+ opcode = m.group("opcode")
+ function = int(m.group("function"))
+ args = m.group("args").split(",")
+
+ # channel on/off
+ if function == 20:
+ if opcode == "w":
+ self._channels[0].on = args[0] == "1"
+ self._channels[1].on = args[1] == "1"
+ return ":ok\r\n"
+ elif opcode == "r":
+ return f":r20={int(self._channels[0].on)},{int(self._channels[1].on)}.\r\n"
+
+ # channel frequency
+ elif function == 23 or function == 24:
+ ch = function - 23
+ if opcode == "w":
+ # Actual device takes a second argument for scaling, here we ignore it and always use 0 (Hz)
+ frequency = float(args[0]) / 100.0
+
+ if self._injectedFailureCounter > 0:
+ self._channels[ch].frequency = 0.0
+ self._injectedFailureCounter -= 1
+ else:
+ self._channels[ch].frequency = frequency
+ return ":ok\r\n"
+ elif opcode == "r":
+ frequency = self._channels[ch].frequency
+ return f":r{function}={int(frequency)},0.\r\n"
+
+ # channel amplitude
+ elif function == 25 or function == 26:
+ if opcode == "w":
+ ch = function - 25
+ amplitude = float(args[0]) / 1000.0
+ self._channels[ch].amplitude = amplitude
+ return ":ok\r\n"
+
+ # channel function shape
+ elif function == 21 or function == 22:
+ if opcode == "w":
+ ch = function - 21
+ shape = int(args[0])
+ self._channels[ch].function = shape
+ return ":ok\r\n"
+
+ # Unknown request format, no response
+ return None
+
+ def isOn(self, ch: int) -> bool:
+ return self._channels[ch - 1].on
+
+ def getFrequency(self, ch: int) -> float:
+ return self._channels[ch - 1].frequency
+
+ def getAmplitude(self, ch: int) -> float:
+ return self._channels[ch - 1].amplitude
+
+ def getFunction(self, ch: int) -> int:
+ return self._channels[ch - 1].function
+
+ def injectFailures(self, count: int) -> None:
+ self._injectedFailureCounter += count
+