summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEddy Pedroni <eddy@0xf7.com>2022-05-29 19:20:09 +0200
committerEddy Pedroni <eddy@0xf7.com>2022-05-29 19:20:09 +0200
commita0783e9f9698b3370c96eaab574d433e25372bc7 (patch)
tree265540f0beb122edff0688293a968067e14c545c
parent8d4621a92e58b1466010f2ad3aeada5e04a453b2 (diff)
Added JDS6600 initial implementation, test device doesn't work
-rw-r--r--lab_control/jds6600.py28
-rw-r--r--lab_control/test/jds6600_test.py33
-rw-r--r--lab_control/test/mock_jds6600_device.py70
-rw-r--r--lab_control/test/mock_sds1000xe_device.py (renamed from lab_control/test/mock_sds1000xe_server.py)6
-rw-r--r--lab_control/test/sds1000xe_test.py34
5 files changed, 151 insertions, 20 deletions
diff --git a/lab_control/jds6600.py b/lab_control/jds6600.py
new file mode 100644
index 0000000..eda8306
--- /dev/null
+++ b/lab_control/jds6600.py
@@ -0,0 +1,28 @@
+import serial
+
+from lab_control.function_generator import FunctionGenerator
+
+class JDS6600(FunctionGenerator):
+ def __init__(self, portName):
+ self._port = serial.Serial(portName)
+ self._port.baudrate = 115200
+ self._port.bytesize = serial.EIGHTBITS
+ self._port.stopbits = serial.STOPBITS_ONE
+ self._port.parity = serial.PARITY_NONE
+
+ def _sendRequest(self, opcode: str, args: str) -> str:
+ request = f":{opcode}={args}.\r\n"
+ print(f"Script writing request to port: {request}")
+ self._port.write(request.encode())
+ print(f"Script waiting for response")
+ responseRaw = self._port.readline()
+ return responseRaw.decode().strip()
+
+ def setOn(self, channel: int) -> None:
+ args = ",".join(["1" if i == channel else "0" for i in [1, 2]])
+ response = self._sendRequest("w20", args)
+ print(f"Script got response {response}")
+ # TODO figure out error handling
+
+ def setOff(self, channel: int) -> None:
+ pass
diff --git a/lab_control/test/jds6600_test.py b/lab_control/test/jds6600_test.py
new file mode 100644
index 0000000..45f841a
--- /dev/null
+++ b/lab_control/test/jds6600_test.py
@@ -0,0 +1,33 @@
+import pytest
+
+from lab_control.jds6600 import JDS6600
+from lab_control.test.mock_jds6600_device import MockJDS6600Device
+
+AVAILABLE_CHANNELS = [1, 2]
+
+@pytest.fixture
+def mockDevice():
+ d = MockJDS6600Device()
+ yield d
+ d.stop()
+
+@pytest.fixture
+def uut(mockDevice):
+ uut = JDS6600(mockDevice.getPortName())
+ yield uut
+ print("Cleaning up UUT")
+
+def tes_serialConfiguration(mockDevice):
+ with pytest.raises(AssertionError):
+ mockDevice.checkPortConfiguration()
+
+ uut = JDS6600(mockDevice.getPortName())
+ mockDevice.checkPortConfiguration()
+
+def test_channelOnAndOff(uut, mockDevice):
+ for ch in AVAILABLE_CHANNELS:
+ assert not mockDevice.isOn(ch)
+ #uut.setOn(ch)
+ #assert mockDevice.isOn(ch)
+ #uut.setOff(ch)
+ #assert not mockDevice.isOn(ch)
diff --git a/lab_control/test/mock_jds6600_device.py b/lab_control/test/mock_jds6600_device.py
new file mode 100644
index 0000000..22c48be
--- /dev/null
+++ b/lab_control/test/mock_jds6600_device.py
@@ -0,0 +1,70 @@
+import serial
+import os
+import pty
+import tty
+import termios
+import atexit
+import threading
+
+# Open virtual serial port
+
+# Close it when the program exits
+def _cleanUp() -> None:
+ pass
+atexit.register(_cleanUp)
+
+class MockJDS6600Device():
+ def __init__(self):
+ self._master, self._slave = pty.openpty()
+ self._masterFile = os.fdopen(self._master, mode="r+b", closefd=False, buffering=0)
+
+ self._portName = os.ttyname(self._slave)
+ self._channels = [False, False]
+
+ self._mainThread = threading.Thread(target=self._mainLoop)
+ self._mainThread.start()
+
+ def _mainLoop(self) -> None:
+ while True:
+ try:
+ print("Device reading request....")
+ request = self._masterFile.readline().decode().strip()
+ response = self._handleRequest(request)
+ print("Device sending response")
+ self._masterFile.write(response.encode())
+ except OSError as e:
+ print("Exception caught, breaking out of loop")
+ break
+ print("Main loop returning")
+
+ def _handleRequest(self, request: str) -> str:
+ print(f"Device received request: {request}")
+ return ":ok\r\n"
+
+ def stop(self) -> None:
+ print("STOP")
+ self._masterFile.close()
+
+ os.close(self._master)
+ print("master closed")
+ os.close(self._slave)
+ print("slave closed")
+ self._mainThread.join()
+ print("stop returning")
+
+ def checkPortConfiguration(self) -> None:
+ iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self._slave)
+
+ # JDS6600 configuration taken from manual
+ assert ispeed == termios.B115200
+ assert ospeed == termios.B115200
+ assert (cflag & termios.CSIZE) == termios.CS8
+ assert (cflag & termios.CSTOPB) == 0
+ assert (cflag & (termios.PARENB | termios.PARODD)) == 0
+
+ def getPortName(self) -> str:
+ return self._portName
+
+ def isOn(self, ch: int) -> bool:
+ return self._channels[ch - 1]
+
diff --git a/lab_control/test/mock_sds1000xe_server.py b/lab_control/test/mock_sds1000xe_device.py
index 1b72146..ce752e9 100644
--- a/lab_control/test/mock_sds1000xe_server.py
+++ b/lab_control/test/mock_sds1000xe_device.py
@@ -13,11 +13,11 @@ _serverSocket.bind((IP, PORT))
_serverSocket.listen(1)
# Close it when the program exits
-def _cleanup():
+def _cleanUp():
_serverSocket.close()
-atexit.register(_cleanup)
+atexit.register(_cleanUp)
-class MockSDS1000XEServer:
+class MockSDS1000XEDevice:
def __init__(self):
self._stopFlag = False
self._clientSocket = None
diff --git a/lab_control/test/sds1000xe_test.py b/lab_control/test/sds1000xe_test.py
index c7ec252..bf9c6c4 100644
--- a/lab_control/test/sds1000xe_test.py
+++ b/lab_control/test/sds1000xe_test.py
@@ -1,19 +1,19 @@
import pytest
from lab_control.sds1000xe import SDS1000XE
-from lab_control.test.mock_sds1000xe_server import MockSDS1000XEServer
+from lab_control.test.mock_sds1000xe_device import MockSDS1000XEDevice
-MOCK_IP = "127.0.0.1"
+MOCK_DEVICE_IP = "127.0.0.1"
@pytest.fixture
-def mockServer():
- s = MockSDS1000XEServer()
- yield s
- s.stop()
+def mockDevice():
+ d = MockSDS1000XEDevice()
+ yield d
+ d.stop()
@pytest.fixture
-def uut(mockServer):
- return SDS1000XE(MOCK_IP)
+def uut(mockDevice):
+ return SDS1000XE(MOCK_DEVICE_IP)
def checkFloatMeasurement(testCases, setValue, measureValue):
for channel, expectedValue in testCases:
@@ -21,23 +21,23 @@ def checkFloatMeasurement(testCases, setValue, measureValue):
measuredValue = measureValue(channel)
assert measuredValue == expectedValue
-def test_amplitudeMeasurement(uut, mockServer):
+def test_amplitudeMeasurement(uut, mockDevice):
testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)]
- checkFloatMeasurement(testCases, mockServer.setAmplitude, uut.measureAmplitude)
+ checkFloatMeasurement(testCases, mockDevice.setAmplitude, uut.measureAmplitude)
-def test_peakToPeakMeasurement(uut, mockServer):
+def test_peakToPeakMeasurement(uut, mockDevice):
testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)]
- checkFloatMeasurement(testCases, mockServer.setPeakToPeak, uut.measurePeakToPeak)
+ checkFloatMeasurement(testCases, mockDevice.setPeakToPeak, uut.measurePeakToPeak)
-def test_RMSMeasurement(uut, mockServer):
+def test_RMSMeasurement(uut, mockDevice):
testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 10.1)]
- checkFloatMeasurement(testCases, mockServer.setRMS, uut.measureRMS)
+ checkFloatMeasurement(testCases, mockDevice.setRMS, uut.measureRMS)
-def test_frequencyMeasurement(uut, mockServer):
+def test_frequencyMeasurement(uut, mockDevice):
testCases = [(1, 16.23987), (2, 0.0), (3, -0.0164), (4, 93489.15)]
- checkFloatMeasurement(testCases, mockServer.setFrequency, uut.measureFrequency)
+ checkFloatMeasurement(testCases, mockDevice.setFrequency, uut.measureFrequency)
-def test_invalidChannel(uut, mockServer):
+def test_invalidChannel(uut, mockDevice):
# Channel is checked by the UUT before the request is sent
testCases = [0, 5]