""" Implements partial support for Siglent SDS1000X-E series oscilloscopes. """ import socket import re from lab_control.oscilloscope import Oscilloscope def _checkChannel(channel): assert channel in SDS1000XE.AVAILABLE_CHANNELS, "SDS1000X-E: Invalid channel {channel}" class SDS1000XE(Oscilloscope): """ Instances of this class connect to the SDS1000X-E IP and port and offer an API to control the device. """ PORT = 5025 TIMEOUT = 5.0 AVAILABLE_CHANNELS = range(1, 5) def __init__(self, address): super().__init__() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect((address, SDS1000XE.PORT)) self._socket.settimeout(SDS1000XE.TIMEOUT) def _measure(self, channel: int, code: str) -> float: _checkChannel(channel) pattern = r"C(?P\d):PAVA .+,(?P[\d.E+-]+)\w+" query = f"C{channel}:PAVA? {code}\r\n" self._socket.sendall(query.encode()) try: # TODO add code to regex response = self._socket.recv(4096).decode() matches = re.search(pattern, response) measurement = float(matches.group("rawMeasurement")) except TimeoutError: measurement = None return measurement def measureAmplitude(self, channel: int) -> float: return self._measure(channel, "AMPL") def measurePeakToPeak(self, channel: int) -> float: return self._measure(channel, "PKPK") def measureRMS(self, channel: int) -> float: return self._measure(channel, "RMS") def measureFrequency(self, channel: int) -> float: return self._measure(channel, "FREQ") def setVoltsPerDivision(self, channel: int, volts: float) -> None: _checkChannel(channel) query = f"C{channel}:VDIV {volts:.2E}V\r\n" self._socket.sendall(query.encode()) # no response expected def getDivisionsDisplayed(self) -> int: return 8