import os import pty import termios import threading import re class MockJDS6600Device(): class ChannelState: def __init__(self): self.on = False self.frequency = None self.amplitude = None self.function = None def __init__(self): self._master, self._slave = pty.openpty() self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) self._portName = os.ttyname(self._slave) self._channels = [MockJDS6600Device.ChannelState() for i in [1, 2]] self._injectedFailureCounter = 0 self._mainThread = threading.Thread(target=self._mainLoop) self._mainThread.start() def _mainLoop(self) -> None: while True: try: request = self._masterFile.readline().decode().strip() response = self._handleRequest(request) if response is not None: self._masterFile.write(response.encode()) except OSError as e: break def _handleRequest(self, request: str) -> str: pattern = r":(?P[wrab])(?P\d+)=(?P.*)\." m = re.search(pattern, request) if not m: return None opcode = m.group("opcode") function = int(m.group("function")) args = m.group("args").split(",") # channel on/off if function == 20: if opcode == "w": self._channels[0].on = args[0] == "1" self._channels[1].on = args[1] == "1" return ":ok\r\n" elif opcode == "r": return f":r20={int(self._channels[0].on)},{int(self._channels[1].on)}.\r\n" # channel frequency elif function == 23 or function == 24: ch = function - 23 if opcode == "w": # Actual device takes a second argument for scaling, here we ignore it and always use 0 (Hz) frequency = float(args[0]) / 100.0 if self._injectedFailureCounter > 0: self._channels[ch].frequency = 0.0 self._injectedFailureCounter -= 1 else: self._channels[ch].frequency = frequency return ":ok\r\n" elif opcode == "r": frequency = self._channels[ch].frequency return f":r{function}={int(frequency)},0.\r\n" # channel amplitude elif function == 25 or function == 26: if opcode == "w": ch = function - 25 amplitude = float(args[0]) / 1000.0 self._channels[ch].amplitude = amplitude return ":ok\r\n" # channel function shape elif function == 21 or function == 22: if opcode == "w": ch = function - 21 shape = int(args[0]) self._channels[ch].function = shape return ":ok\r\n" # Unknown request format, no response return None def stop(self) -> None: self._masterFile.close() os.close(self._master) os.close(self._slave) self._mainThread.join() def checkPortConfiguration(self) -> None: iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave) # JDS6600 configuration taken from manual assert ispeed == termios.B115200 assert ospeed == termios.B115200 assert (cflag & termios.CSIZE) == termios.CS8 assert (cflag & termios.CSTOPB) == 0 assert (cflag & (termios.PARENB | termios.PARODD)) == 0 def getPortName(self) -> str: return self._portName def isOn(self, ch: int) -> bool: return self._channels[ch - 1].on def getFrequency(self, ch: int) -> float: return self._channels[ch - 1].frequency def getAmplitude(self, ch: int) -> float: return self._channels[ch - 1].amplitude def getFunction(self, ch: int) -> int: return self._channels[ch - 1].function def injectFailures(self, count: int) -> None: self._injectedFailureCounter += count