diff options
| -rw-r--r-- | lab_control/connection/direct_connection.py | 2 | ||||
| -rw-r--r-- | lab_control/connection/tcp_connection.py | 22 | ||||
| -rw-r--r-- | lab_control/sds1000xe.py | 29 | ||||
| -rw-r--r-- | lab_control/test/mock_sds1000xe_device.py | 44 | ||||
| -rw-r--r-- | lab_control/test/sds1000xe_unittest.py (renamed from lab_control/test/sds1000xe_test.py) | 19 | ||||
| -rw-r--r-- | lab_control/test/virtual_tcp_server.py | 44 | 
6 files changed, 91 insertions, 69 deletions
| diff --git a/lab_control/connection/direct_connection.py b/lab_control/connection/direct_connection.py index 56a5cf3..8df1e42 100644 --- a/lab_control/connection/direct_connection.py +++ b/lab_control/connection/direct_connection.py @@ -10,7 +10,7 @@ class DirectConnection:      def close(self) -> None:          self.open = False -    def send(self, request: str) -> str: +    def send(self, request: str, responseExpected=True) -> str:          return self.requestHandler(request)      def checkConfiguration(self) -> None: diff --git a/lab_control/connection/tcp_connection.py b/lab_control/connection/tcp_connection.py new file mode 100644 index 0000000..6af710c --- /dev/null +++ b/lab_control/connection/tcp_connection.py @@ -0,0 +1,22 @@ +import socket + +class TCPConnection: +    def __init__(self, ip: str, port: int): +        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +        self._socket.connect((address, port)) + +    def configure(self, config: dict) -> None: +        self._socket.settimeout(parameters["timeout"]) + +    def send(self, request, responseExpected=True): +        self._socket.sendall(request.encode()) + +        if responseExpected: +            try: +                response = self._socket.recv(4096).decode() +            except TimeoutError: +                response = None +            return response +         +    def checkConfiguration(self) -> None: +        pass diff --git a/lab_control/sds1000xe.py b/lab_control/sds1000xe.py index 61fa86d..caa6a9c 100644 --- a/lab_control/sds1000xe.py +++ b/lab_control/sds1000xe.py @@ -1,11 +1,10 @@  """  Implements partial support for Siglent SDS1000X-E series oscilloscopes.  """ - -import socket  import re  from lab_control.oscilloscope import Oscilloscope +from lab_control.connection.tcp_connection import TCPConnection  def _checkChannel(channel):      assert channel in SDS1000XE.AVAILABLE_CHANNELS, "SDS1000X-E: Invalid channel {channel}" @@ -19,24 +18,25 @@ class SDS1000XE(Oscilloscope):      TIMEOUT = 5.0      AVAILABLE_CHANNELS = range(1, 5) -    def __init__(self, address): +    def __init__(self, address, overrideConnection=None):          super().__init__() -        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -        self._socket.connect((address, SDS1000XE.PORT)) -        self._socket.settimeout(SDS1000XE.TIMEOUT) +        if overrideConnection is not None: +            self._connection = overrideConnection +        else: +            self._connection = TCPConnection(address) + +        self._connection.configure({"timeout" : SDS1000XE.TIMEOUT})      def _measure(self, channel: int, code: str) -> float:          _checkChannel(channel) -        pattern = r"C(?P<responseChannel>\d):PAVA .+,(?P<rawMeasurement>[\d.E+-]+)\w+" -        query = f"C{channel}:PAVA? {code}\r\n" -        self._socket.sendall(query.encode()) +        request = f"C{channel}:PAVA? {code}\r\n" +        response = self._connection.send(request) -        try: -            # TODO add code to regex -            response = self._socket.recv(4096).decode() +        if response is not None: +            pattern = r"C(?P<responseChannel>\d):PAVA .+,(?P<rawMeasurement>[\d.E+-]+)\w+"              matches = re.search(pattern, response)              measurement = float(matches.group("rawMeasurement")) -        except TimeoutError: +        else:              measurement = None          return measurement @@ -56,8 +56,7 @@ class SDS1000XE(Oscilloscope):      def setVoltsPerDivision(self, channel: int, volts: float) -> None:          _checkChannel(channel)          query = f"C{channel}:VDIV {volts:.2E}V\r\n" -        self._socket.sendall(query.encode()) -        # no response expected +        self._connection.send(query, responseExpected=False)      def getDivisionsDisplayed(self) -> int:          return 8 diff --git a/lab_control/test/mock_sds1000xe_device.py b/lab_control/test/mock_sds1000xe_device.py index 68c2471..04ec07a 100644 --- a/lab_control/test/mock_sds1000xe_device.py +++ b/lab_control/test/mock_sds1000xe_device.py @@ -1,50 +1,12 @@ -import socket -import threading -import atexit  import re -IP = "0.0.0.0" -PORT = 5025 - -# Bind server socket when this module is included -_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -_serverSocket.bind((IP, PORT)) -_serverSocket.listen(1) - -# Close it when the program exits -def _cleanUp(): -    _serverSocket.close() -atexit.register(_cleanUp) -  class MockSDS1000XEDevice:      def __init__(self): -        self._stopFlag = False -        self._clientSocket = None -        self._mainThread = threading.Thread(target=self._mainLoop) -        self._mainThread.start() -          # Mock internal values          self._channels = [{"AMPL" : None, "VDIV" : None} for i in range(0, 4)] -    def _mainLoop(self) -> None: -        self._clientSocket, _ = _serverSocket.accept() -        self._clientSocket.settimeout(0.1) - -        try: -            while not self._stopFlag: -                try: -                    request = self._clientSocket.recv(4096).decode() -                    response = self._handleRequest(request.strip()) -                    if response is not None: -                        self._clientSocket.send(response.encode()) -                except TimeoutError as e: -                    pass -        finally: -            self._clientSocket.close() -          def _handleRequest(self, request: str) -> str: -        m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\??\s(?P<arg>.+)", request) +        m = re.search(r"C(?P<channel>\d):(?P<opcode>\w+)\??\s(?P<arg>.+)", request.strip())          if not m:              return None @@ -66,9 +28,7 @@ class MockSDS1000XEDevice:              self._channels[channelIndex]["VDIV"] = arg              return None -    def stop(self) -> None: -        self._stopFlag = True -        self._mainThread.join() +        return None      def setAmplitude(self, channel: int, value: float) -> None:          self._channels[channel - 1]["AMPL"] = value diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_unittest.py index 3774d52..ce87a6e 100644 --- a/lab_control/test/sds1000xe_test.py +++ b/lab_control/test/sds1000xe_unittest.py @@ -1,20 +1,20 @@  import pytest -import time  from lab_control.sds1000xe import SDS1000XE +from lab_control.connection.direct_connection import DirectConnection as MockConnection  from lab_control.test.mock_sds1000xe_device import MockSDS1000XEDevice -MOCK_DEVICE_IP = "127.0.0.1" -  @pytest.fixture  def mockDevice(): -    d = MockSDS1000XEDevice() -    yield d -    d.stop() +    return MockSDS1000XEDevice() + +@pytest.fixture +def mockConnection(mockDevice): +    return MockConnection(mockDevice._handleRequest)  @pytest.fixture -def uut(mockDevice): -    return SDS1000XE(MOCK_DEVICE_IP) +def uut(mockConnection): +    return SDS1000XE("", overrideConnection=mockConnection)  def checkFloatMeasurement(testValues, setValue, measureValue):      for channel in SDS1000XE.AVAILABLE_CHANNELS: @@ -57,8 +57,5 @@ def test_setVoltsPerDivision(uut, mockDevice):          for value in testValues:              uut.setVoltsPerDivision(channel, value) - -            time.sleep(0.1) # Allow time for the mock to receive and process the request -              assert mockDevice.getVoltsPerDivision(channel) == value diff --git a/lab_control/test/virtual_tcp_server.py b/lab_control/test/virtual_tcp_server.py new file mode 100644 index 0000000..07a4345 --- /dev/null +++ b/lab_control/test/virtual_tcp_server.py @@ -0,0 +1,44 @@ +import socket +import threading +import atexit + +IP = "0.0.0.0" +PORT = 5025 + +# Bind server socket when this module is included +_serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +_serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +_serverSocket.bind((IP, PORT)) +_serverSocket.listen(1) + +# Close it when the program exits +def _cleanUp(): +    _serverSocket.close() +atexit.register(_cleanUp) + +class VirtualTCPServer: +    def __init__(self): +        self._stopFlag = False +        self._clientSocket = None +        self._mainThread = threading.Thread(target=self._mainLoop) +        self._mainThread.start() + +    def _mainLoop(self) -> None: +        self._clientSocket, _ = _serverSocket.accept() +        self._clientSocket.settimeout(0.1) + +        try: +            while not self._stopFlag: +                try: +                    request = self._clientSocket.recv(4096).decode() +                    response = self._handleRequest(request.strip()) +                    if response is not None: +                        self._clientSocket.send(response.encode()) +                except TimeoutError as e: +                    pass +        finally: +            self._clientSocket.close() +     +    def stop(self) -> None: +        self._stopFlag = True +        self._mainThread.join() | 
