aboutsummaryrefslogtreecommitdiffstats
path: root/solo-tool-project/src/solo_tool
diff options
context:
space:
mode:
authorEddy Pedroni <epedroni@pm.me>2026-01-01 17:57:27 +0100
committerEddy Pedroni <epedroni@pm.me>2026-01-01 17:57:27 +0100
commit8ea2b64ff798af913dcba64baace8d2536bf0b18 (patch)
treef85ea2f371055e67c629909df4897aec2f4bbad2 /solo-tool-project/src/solo_tool
parent88ce99d87889cdf953af611ef09d7a12b6d23747 (diff)
Add Android app wrapper around web interface
Diffstat (limited to 'solo-tool-project/src/solo_tool')
-rw-r--r--solo-tool-project/src/solo_tool/recorder.py75
-rw-r--r--solo-tool-project/src/solo_tool/session_manager.py88
-rw-r--r--solo-tool-project/src/solo_tool/solo_tool.py14
-rw-r--r--solo-tool-project/src/solo_tool/storage.py87
4 files changed, 186 insertions, 78 deletions
diff --git a/solo-tool-project/src/solo_tool/recorder.py b/solo-tool-project/src/solo_tool/recorder.py
new file mode 100644
index 0000000..fd2df02
--- /dev/null
+++ b/solo-tool-project/src/solo_tool/recorder.py
@@ -0,0 +1,75 @@
+import pyaudio as pa
+from pathlib import Path
+
+class Recording:
+ def __init__(self, frames: [], channels: int, samplingRate: int, sampleFormat: int):
+ self._frames = frames
+ self._channels = channels
+ self._samplingRate = samplingRate
+ self._sampleFormat = sampleFormat
+
+ def writeWav(self, file: Path) -> None:
+ import wave
+ with wave.open(str(file), "wb") as wf:
+ wf.setnchannels(self._channels)
+ wf.setsampwidth(self._sampleFormat)
+ wf.setframerate(self._samplingRate)
+ wf.writeframes(b''.join(self._frames))
+
+ def writeMp3(self, file: Path) -> None:
+ from pydub import AudioSegment
+ segment = AudioSegment(
+ data=b''.join(self._frames),
+ sample_width=self._sampleFormat,
+ frame_rate=self._samplingRate,
+ channels=self._channels
+ )
+ segment.export(str(file), format="mp3", bitrate="320k")
+
+class Recorder:
+ def __init__(self, bufferSize: int, samplingRate: int):
+ self._bufferSize = bufferSize
+ self._samplingRate = samplingRate
+ self._sampleFormat = pa.paInt16
+ self._channels = 2
+ self._pa = pa.PyAudio()
+ self._stream = None
+ self._frames = []
+
+ def __del__(self):
+ if self.recording:
+ self._stream.stop_stream()
+ self._stream.close()
+
+ def _callback(self, inData, frameCount, timeInfo, statusFlags):
+ if statusFlags != pa.paNoError:
+ print(f"Recorder callback got status {hex(statusFlags)}, aborting")
+ return (None, pa.paAbort)
+
+ self._frames.append(inData)
+ return (None, pa.paContinue)
+
+ def startRecording(self) -> None:
+ if self.recording:
+ return
+
+ self._frames.clear()
+ self._stream = self._pa.open(format=self._sampleFormat,
+ channels=self._channels,
+ rate=self._samplingRate,
+ frames_per_buffer=self._bufferSize,
+ input=True,
+ stream_callback=self._callback)
+
+ def stopRecording(self) -> Recording:
+ if not self.recording:
+ return None
+ self._stream.stop_stream()
+ self._stream.close()
+ self._stream = None
+ return Recording(self._frames, self._channels, self._samplingRate, self._pa.get_sample_size(self._sampleFormat))
+
+ @property
+ def recording(self) -> bool:
+ return self._stream is not None
+
diff --git a/solo-tool-project/src/solo_tool/session_manager.py b/solo-tool-project/src/solo_tool/session_manager.py
index 8624207..7d98106 100644
--- a/solo-tool-project/src/solo_tool/session_manager.py
+++ b/solo-tool-project/src/solo_tool/session_manager.py
@@ -1,12 +1,7 @@
-from typing import Protocol
-from abc import abstractmethod
-from . import SoloTool
-
from pathlib import Path
-from glob import glob
-import json
-import requests
-from os import getenv
+
+from . import SoloTool
+from .storage import FileSystemStorageBackend, FileBrowserStorageBackend
class SessionManager():
def __init__(self, sessionPath: str):
@@ -15,17 +10,17 @@ class SessionManager():
from re import search
match = search(r"^([a-z0-9]+://)", sessionPath)
if not match or match.group(0) == "file://":
- self._backend = _FileSystemBackend(sessionPath)
+ self._backend = FileSystemStorageBackend(sessionPath)
elif match.group(0) in ["http://", "https://"]:
- self._backend = _FileBrowserBackend(sessionPath)
+ self._backend = FileBrowserStorageBackend(sessionPath)
else:
raise ValueError(f"Unsupported session path: {sessionPath}")
def getSessions(self) -> list[str]:
- return self._backend.listIds()
+ return self._backend.listSessions()
def loadSession(self, id: str, player=None) -> SoloTool:
- session = self._backend.read(id)
+ session = self._backend.readSession(id)
st = SoloTool(player=player)
for i, entry in enumerate(session):
@@ -48,70 +43,7 @@ class SessionManager():
}
session.append(entry)
- self._backend.write(session, id)
-
-class _Backend(Protocol):
- @abstractmethod
- def listIds(self) -> list[str]:
- raise NotImplementedError
-
- @abstractmethod
- def read(self, id: str) -> dict:
- raise NotImplementedError
-
- @abstractmethod
- def write(self, session: dict, id: str) -> None:
- raise NotImplementedError
-
-class _FileSystemBackend(_Backend):
- def __init__(self, sessionPath: str):
- self._sessionPath = Path(sessionPath)
-
- def listIds(self) -> list[str]:
- return [Path(f).stem for f in glob(f"{self._sessionPath}/*.json")]
-
- def read(self, id: str) -> dict:
- with open(self._sessionPath / f"{id}.json", "r") as f:
- session = json.load(f)
- return session
-
- def write(self, session: dict, id: str) -> None:
- with open(self._sessionPath / f"{id}.json", "w") as f:
- json.dump(session, f)
-
-class _FileBrowserBackend(_Backend):
- def __init__(self, serverUrl: str):
- self._baseUrl = serverUrl
- self._username = getenv("ST_USER")
- self._password = getenv("ST_PASS")
- self._apiKey = self._getApiKey()
-
- def listIds(self) -> list[str]:
- url = f"{self._baseUrl}/api/resources"
- response = self._request("GET", url)
- return [item["name"][0:-5] for item in response.json()["items"] if item["extension"] == ".json"]
-
- def read(self, id: str) -> dict:
- url = f"{self._baseUrl}/api/raw/{id}.json"
- response = self._request("GET", url)
- return json.loads(response.content)
-
- def write(self, session: dict, id: str) -> None:
- url = f"{self._baseUrl}/api/resources/{id}.json"
- self._request("PUT", url, json=session)
-
- def _getApiKey(self) -> str:
- response = requests.post(f"{self._baseUrl}/api/login", json={"username":self._username, "password":self._password})
- return response.content
-
- def _request(self, verb: str, url: str, **kwargs):
- headers = {"X-Auth" : self._apiKey}
- response = requests.request(verb, url, headers=headers, **kwargs)
- if response.status_code == requests.codes.UNAUTHORIZED:
- # if unauthorized, the key might have expired
- self._apiKey = self._getApiKey()
- headers["X-Auth"] = self._apiKey
- response = requests.request(verb, url, headers=headers, **kwargs)
- response.raise_for_status()
- return response
+ self._backend.writeSession(session, id)
+ def saveRecording(self, recording: Path, destination: str) -> None:
+ self._backend.writeRecording(recording, destination)
diff --git a/solo-tool-project/src/solo_tool/solo_tool.py b/solo-tool-project/src/solo_tool/solo_tool.py
index e8474e6..c4acaf8 100644
--- a/solo-tool-project/src/solo_tool/solo_tool.py
+++ b/solo-tool-project/src/solo_tool/solo_tool.py
@@ -12,12 +12,14 @@ class SoloTool:
self._keyPoints = []
self._keyPoint = None
self._volumes = []
+ self._adHoc = False
def __del__(self):
del self._player
def _updateSong(self, index):
previousSong = self._song
+ self._adHoc = False
self._song = index
self._player.pause()
self._player.setCurrentSong(self._songs[index])
@@ -151,3 +153,15 @@ class SoloTool:
def registerRateCallback(self, callback):
self._notifier.registerCallback(Notifier.PLAYBACK_RATE_EVENT, callback)
+ def playAdHoc(self, file) -> None:
+ self._adHoc = True
+ self._player.setCurrentSong(file)
+
+ def backToNormal(self) -> None:
+ self._adHoc = False
+ self._player.setCurrentSong(self._songs[self._song])
+
+ @property
+ def playingAdHoc(self) -> bool:
+ return self._adHoc
+
diff --git a/solo-tool-project/src/solo_tool/storage.py b/solo-tool-project/src/solo_tool/storage.py
new file mode 100644
index 0000000..0c5577f
--- /dev/null
+++ b/solo-tool-project/src/solo_tool/storage.py
@@ -0,0 +1,87 @@
+from typing import Protocol
+from abc import abstractmethod
+
+from pathlib import Path
+from glob import glob
+import json
+import requests
+from os import getenv
+
+class StorageBackend(Protocol):
+ @abstractmethod
+ def listSessions(self) -> list[str]:
+ raise NotImplementedError
+
+ @abstractmethod
+ def readSession(self, id: str) -> dict:
+ raise NotImplementedError
+
+ @abstractmethod
+ def writeSession(self, session: dict, id: str) -> None:
+ raise NotImplementedError
+
+ @abstractmethod
+ def writeRecording(self, recording: Path, destination: str) -> None:
+ raise NotImplementedError
+
+class FileSystemStorageBackend(StorageBackend):
+ def __init__(self, storagePath: str):
+ self._storagePath = Path(storagePath)
+
+ def listSessions(self) -> list[str]:
+ #return [Path(f).stem for f in glob(f"{self._storagePath / "sessions"}/*.json")]
+ return [Path(f).stem for f in glob(str(self._storagePath / "sessions" / "*.json"))]
+
+ def readSession(self, id: str) -> dict:
+ with open(self._storagePath / "sessions" / f"{id}.json", "r") as f:
+ session = json.load(f)
+ return session
+
+ def writeSession(self, session: dict, id: str) -> None:
+ with open(self._storagePath / "sessions" / f"{id}.json", "w") as f:
+ json.dump(session, f)
+
+ def writeRecording(self, recording: Path, destination: str) -> None:
+ pass
+
+class FileBrowserStorageBackend(StorageBackend):
+ def __init__(self, serverUrl: str):
+ self._baseUrl = serverUrl
+ self._username = getenv("ST_USER")
+ self._password = getenv("ST_PASS")
+ self._apiKey = self._getApiKey()
+
+ def listSessions(self) -> list[str]:
+ url = f"{self._baseUrl}/api/resources/sessions"
+ response = self._request("GET", url)
+ return [item["name"][0:-5] for item in response.json()["items"] if item["extension"] == ".json"]
+
+ def readSession(self, id: str) -> dict:
+ url = f"{self._baseUrl}/api/raw/sessions/{id}.json"
+ response = self._request("GET", url)
+ return json.loads(response.content)
+
+ def writeSession(self, session: dict, id: str) -> None:
+ url = f"{self._baseUrl}/api/resources/sessions/{id}.json"
+ self._request("PUT", url, json=session)
+
+ def writeRecording(self, recording: Path, destination: str) -> None:
+ url = f"{self._baseUrl}/api/resources/recordings/{destination}"
+ with open(recording, "rb") as file:
+ self._request("POST", url, {"Content-Type" : "audio/mpeg"}, data=file)
+
+ def _getApiKey(self) -> str:
+ response = requests.post(f"{self._baseUrl}/api/login", json={"username":self._username, "password":self._password})
+ return response.content
+
+ def _request(self, verb: str, url: str, moreHeaders: dict={}, **kwargs):
+ headers = moreHeaders | {"X-Auth" : self._apiKey}
+ response = requests.request(verb, url, headers=headers, **kwargs)
+ if response.status_code == requests.codes.UNAUTHORIZED:
+ # if unauthorized, the key might have expired
+ self._apiKey = self._getApiKey()
+ headers["X-Auth"] = self._apiKey
+ response = requests.request(verb, url, headers=headers, **kwargs)
+ response.raise_for_status()
+ return response
+