summaryrefslogtreecommitdiffstats
path: root/lab_control/sds1000xe.py
diff options
context:
space:
mode:
Diffstat (limited to 'lab_control/sds1000xe.py')
-rw-r--r--lab_control/sds1000xe.py29
1 files changed, 14 insertions, 15 deletions
diff --git a/lab_control/sds1000xe.py b/lab_control/sds1000xe.py
index 61fa86d..caa6a9c 100644
--- a/lab_control/sds1000xe.py
+++ b/lab_control/sds1000xe.py
@@ -1,11 +1,10 @@
"""
Implements partial support for Siglent SDS1000X-E series oscilloscopes.
"""
-
-import socket
import re
from lab_control.oscilloscope import Oscilloscope
+from lab_control.connection.tcp_connection import TCPConnection
def _checkChannel(channel):
assert channel in SDS1000XE.AVAILABLE_CHANNELS, "SDS1000X-E: Invalid channel {channel}"
@@ -19,24 +18,25 @@ class SDS1000XE(Oscilloscope):
TIMEOUT = 5.0
AVAILABLE_CHANNELS = range(1, 5)
- def __init__(self, address):
+ def __init__(self, address, overrideConnection=None):
super().__init__()
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.connect((address, SDS1000XE.PORT))
- self._socket.settimeout(SDS1000XE.TIMEOUT)
+ if overrideConnection is not None:
+ self._connection = overrideConnection
+ else:
+ self._connection = TCPConnection(address)
+
+ self._connection.configure({"timeout" : SDS1000XE.TIMEOUT})
def _measure(self, channel: int, code: str) -> float:
_checkChannel(channel)
- pattern = r"C(?P<responseChannel>\d):PAVA .+,(?P<rawMeasurement>[\d.E+-]+)\w+"
- query = f"C{channel}:PAVA? {code}\r\n"
- self._socket.sendall(query.encode())
+ request = f"C{channel}:PAVA? {code}\r\n"
+ response = self._connection.send(request)
- try:
- # TODO add code to regex
- response = self._socket.recv(4096).decode()
+ if response is not None:
+ pattern = r"C(?P<responseChannel>\d):PAVA .+,(?P<rawMeasurement>[\d.E+-]+)\w+"
matches = re.search(pattern, response)
measurement = float(matches.group("rawMeasurement"))
- except TimeoutError:
+ else:
measurement = None
return measurement
@@ -56,8 +56,7 @@ class SDS1000XE(Oscilloscope):
def setVoltsPerDivision(self, channel: int, volts: float) -> None:
_checkChannel(channel)
query = f"C{channel}:VDIV {volts:.2E}V\r\n"
- self._socket.sendall(query.encode())
- # no response expected
+ self._connection.send(query, responseExpected=False)
def getDivisionsDisplayed(self) -> int:
return 8