import os import pty import termios import threading class VirtualSerialPort: def __init__(self, requestHandler): self._master, self._slave = pty.openpty() self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0) self._portName = os.ttyname(self._slave) self._requestHandler = requestHandler self._mainThread = threading.Thread(target=self._mainLoop) self._mainThread.start() def stop(self) -> None: self._masterFile.close() os.close(self._master) os.close(self._slave) self._mainThread.join() def _mainLoop(self) -> None: while True: try: request = self._masterFile.readline().decode().strip() response = self._requestHandler(request) if response is not None: self._masterFile.write(response.encode()) except OSError as e: break def getPortName(self) -> str: return self._portName