From 8ea2b64ff798af913dcba64baace8d2536bf0b18 Mon Sep 17 00:00:00 2001 From: Eddy Pedroni Date: Thu, 1 Jan 2026 17:57:27 +0100 Subject: Add Android app wrapper around web interface --- solo-tool-project/pyproject.toml | 4 +- solo-tool-project/src/solo_tool/recorder.py | 75 ++++++++++++++++++ solo-tool-project/src/solo_tool/session_manager.py | 88 +++------------------- solo-tool-project/src/solo_tool/solo_tool.py | 14 ++++ solo-tool-project/src/solo_tool/storage.py | 87 +++++++++++++++++++++ solo-tool-project/test/session_manager_unittest.py | 2 +- 6 files changed, 190 insertions(+), 80 deletions(-) create mode 100644 solo-tool-project/src/solo_tool/recorder.py create mode 100644 solo-tool-project/src/solo_tool/storage.py (limited to 'solo-tool-project') diff --git a/solo-tool-project/pyproject.toml b/solo-tool-project/pyproject.toml index 841ee46..40d09d8 100644 --- a/solo-tool-project/pyproject.toml +++ b/solo-tool-project/pyproject.toml @@ -14,7 +14,9 @@ dependencies = [ "python-rtmidi", "sip", "mido", - "python-mpv" + "python-mpv", + "pyaudio", + "pydub-ng" ] [project.optional-dependencies] diff --git a/solo-tool-project/src/solo_tool/recorder.py b/solo-tool-project/src/solo_tool/recorder.py new file mode 100644 index 0000000..fd2df02 --- /dev/null +++ b/solo-tool-project/src/solo_tool/recorder.py @@ -0,0 +1,75 @@ +import pyaudio as pa +from pathlib import Path + +class Recording: + def __init__(self, frames: [], channels: int, samplingRate: int, sampleFormat: int): + self._frames = frames + self._channels = channels + self._samplingRate = samplingRate + self._sampleFormat = sampleFormat + + def writeWav(self, file: Path) -> None: + import wave + with wave.open(str(file), "wb") as wf: + wf.setnchannels(self._channels) + wf.setsampwidth(self._sampleFormat) + wf.setframerate(self._samplingRate) + wf.writeframes(b''.join(self._frames)) + + def writeMp3(self, file: Path) -> None: + from pydub import AudioSegment + segment = AudioSegment( + data=b''.join(self._frames), + sample_width=self._sampleFormat, + frame_rate=self._samplingRate, + channels=self._channels + ) + segment.export(str(file), format="mp3", bitrate="320k") + +class Recorder: + def __init__(self, bufferSize: int, samplingRate: int): + self._bufferSize = bufferSize + self._samplingRate = samplingRate + self._sampleFormat = pa.paInt16 + self._channels = 2 + self._pa = pa.PyAudio() + self._stream = None + self._frames = [] + + def __del__(self): + if self.recording: + self._stream.stop_stream() + self._stream.close() + + def _callback(self, inData, frameCount, timeInfo, statusFlags): + if statusFlags != pa.paNoError: + print(f"Recorder callback got status {hex(statusFlags)}, aborting") + return (None, pa.paAbort) + + self._frames.append(inData) + return (None, pa.paContinue) + + def startRecording(self) -> None: + if self.recording: + return + + self._frames.clear() + self._stream = self._pa.open(format=self._sampleFormat, + channels=self._channels, + rate=self._samplingRate, + frames_per_buffer=self._bufferSize, + input=True, + stream_callback=self._callback) + + def stopRecording(self) -> Recording: + if not self.recording: + return None + self._stream.stop_stream() + self._stream.close() + self._stream = None + return Recording(self._frames, self._channels, self._samplingRate, self._pa.get_sample_size(self._sampleFormat)) + + @property + def recording(self) -> bool: + return self._stream is not None + diff --git a/solo-tool-project/src/solo_tool/session_manager.py b/solo-tool-project/src/solo_tool/session_manager.py index 8624207..7d98106 100644 --- a/solo-tool-project/src/solo_tool/session_manager.py +++ b/solo-tool-project/src/solo_tool/session_manager.py @@ -1,12 +1,7 @@ -from typing import Protocol -from abc import abstractmethod -from . import SoloTool - from pathlib import Path -from glob import glob -import json -import requests -from os import getenv + +from . import SoloTool +from .storage import FileSystemStorageBackend, FileBrowserStorageBackend class SessionManager(): def __init__(self, sessionPath: str): @@ -15,17 +10,17 @@ class SessionManager(): from re import search match = search(r"^([a-z0-9]+://)", sessionPath) if not match or match.group(0) == "file://": - self._backend = _FileSystemBackend(sessionPath) + self._backend = FileSystemStorageBackend(sessionPath) elif match.group(0) in ["http://", "https://"]: - self._backend = _FileBrowserBackend(sessionPath) + self._backend = FileBrowserStorageBackend(sessionPath) else: raise ValueError(f"Unsupported session path: {sessionPath}") def getSessions(self) -> list[str]: - return self._backend.listIds() + return self._backend.listSessions() def loadSession(self, id: str, player=None) -> SoloTool: - session = self._backend.read(id) + session = self._backend.readSession(id) st = SoloTool(player=player) for i, entry in enumerate(session): @@ -48,70 +43,7 @@ class SessionManager(): } session.append(entry) - self._backend.write(session, id) - -class _Backend(Protocol): - @abstractmethod - def listIds(self) -> list[str]: - raise NotImplementedError - - @abstractmethod - def read(self, id: str) -> dict: - raise NotImplementedError - - @abstractmethod - def write(self, session: dict, id: str) -> None: - raise NotImplementedError - -class _FileSystemBackend(_Backend): - def __init__(self, sessionPath: str): - self._sessionPath = Path(sessionPath) - - def listIds(self) -> list[str]: - return [Path(f).stem for f in glob(f"{self._sessionPath}/*.json")] - - def read(self, id: str) -> dict: - with open(self._sessionPath / f"{id}.json", "r") as f: - session = json.load(f) - return session - - def write(self, session: dict, id: str) -> None: - with open(self._sessionPath / f"{id}.json", "w") as f: - json.dump(session, f) - -class _FileBrowserBackend(_Backend): - def __init__(self, serverUrl: str): - self._baseUrl = serverUrl - self._username = getenv("ST_USER") - self._password = getenv("ST_PASS") - self._apiKey = self._getApiKey() - - def listIds(self) -> list[str]: - url = f"{self._baseUrl}/api/resources" - response = self._request("GET", url) - return [item["name"][0:-5] for item in response.json()["items"] if item["extension"] == ".json"] - - def read(self, id: str) -> dict: - url = f"{self._baseUrl}/api/raw/{id}.json" - response = self._request("GET", url) - return json.loads(response.content) - - def write(self, session: dict, id: str) -> None: - url = f"{self._baseUrl}/api/resources/{id}.json" - self._request("PUT", url, json=session) - - def _getApiKey(self) -> str: - response = requests.post(f"{self._baseUrl}/api/login", json={"username":self._username, "password":self._password}) - return response.content - - def _request(self, verb: str, url: str, **kwargs): - headers = {"X-Auth" : self._apiKey} - response = requests.request(verb, url, headers=headers, **kwargs) - if response.status_code == requests.codes.UNAUTHORIZED: - # if unauthorized, the key might have expired - self._apiKey = self._getApiKey() - headers["X-Auth"] = self._apiKey - response = requests.request(verb, url, headers=headers, **kwargs) - response.raise_for_status() - return response + self._backend.writeSession(session, id) + def saveRecording(self, recording: Path, destination: str) -> None: + self._backend.writeRecording(recording, destination) diff --git a/solo-tool-project/src/solo_tool/solo_tool.py b/solo-tool-project/src/solo_tool/solo_tool.py index e8474e6..c4acaf8 100644 --- a/solo-tool-project/src/solo_tool/solo_tool.py +++ b/solo-tool-project/src/solo_tool/solo_tool.py @@ -12,12 +12,14 @@ class SoloTool: self._keyPoints = [] self._keyPoint = None self._volumes = [] + self._adHoc = False def __del__(self): del self._player def _updateSong(self, index): previousSong = self._song + self._adHoc = False self._song = index self._player.pause() self._player.setCurrentSong(self._songs[index]) @@ -151,3 +153,15 @@ class SoloTool: def registerRateCallback(self, callback): self._notifier.registerCallback(Notifier.PLAYBACK_RATE_EVENT, callback) + def playAdHoc(self, file) -> None: + self._adHoc = True + self._player.setCurrentSong(file) + + def backToNormal(self) -> None: + self._adHoc = False + self._player.setCurrentSong(self._songs[self._song]) + + @property + def playingAdHoc(self) -> bool: + return self._adHoc + diff --git a/solo-tool-project/src/solo_tool/storage.py b/solo-tool-project/src/solo_tool/storage.py new file mode 100644 index 0000000..0c5577f --- /dev/null +++ b/solo-tool-project/src/solo_tool/storage.py @@ -0,0 +1,87 @@ +from typing import Protocol +from abc import abstractmethod + +from pathlib import Path +from glob import glob +import json +import requests +from os import getenv + +class StorageBackend(Protocol): + @abstractmethod + def listSessions(self) -> list[str]: + raise NotImplementedError + + @abstractmethod + def readSession(self, id: str) -> dict: + raise NotImplementedError + + @abstractmethod + def writeSession(self, session: dict, id: str) -> None: + raise NotImplementedError + + @abstractmethod + def writeRecording(self, recording: Path, destination: str) -> None: + raise NotImplementedError + +class FileSystemStorageBackend(StorageBackend): + def __init__(self, storagePath: str): + self._storagePath = Path(storagePath) + + def listSessions(self) -> list[str]: + #return [Path(f).stem for f in glob(f"{self._storagePath / "sessions"}/*.json")] + return [Path(f).stem for f in glob(str(self._storagePath / "sessions" / "*.json"))] + + def readSession(self, id: str) -> dict: + with open(self._storagePath / "sessions" / f"{id}.json", "r") as f: + session = json.load(f) + return session + + def writeSession(self, session: dict, id: str) -> None: + with open(self._storagePath / "sessions" / f"{id}.json", "w") as f: + json.dump(session, f) + + def writeRecording(self, recording: Path, destination: str) -> None: + pass + +class FileBrowserStorageBackend(StorageBackend): + def __init__(self, serverUrl: str): + self._baseUrl = serverUrl + self._username = getenv("ST_USER") + self._password = getenv("ST_PASS") + self._apiKey = self._getApiKey() + + def listSessions(self) -> list[str]: + url = f"{self._baseUrl}/api/resources/sessions" + response = self._request("GET", url) + return [item["name"][0:-5] for item in response.json()["items"] if item["extension"] == ".json"] + + def readSession(self, id: str) -> dict: + url = f"{self._baseUrl}/api/raw/sessions/{id}.json" + response = self._request("GET", url) + return json.loads(response.content) + + def writeSession(self, session: dict, id: str) -> None: + url = f"{self._baseUrl}/api/resources/sessions/{id}.json" + self._request("PUT", url, json=session) + + def writeRecording(self, recording: Path, destination: str) -> None: + url = f"{self._baseUrl}/api/resources/recordings/{destination}" + with open(recording, "rb") as file: + self._request("POST", url, {"Content-Type" : "audio/mpeg"}, data=file) + + def _getApiKey(self) -> str: + response = requests.post(f"{self._baseUrl}/api/login", json={"username":self._username, "password":self._password}) + return response.content + + def _request(self, verb: str, url: str, moreHeaders: dict={}, **kwargs): + headers = moreHeaders | {"X-Auth" : self._apiKey} + response = requests.request(verb, url, headers=headers, **kwargs) + if response.status_code == requests.codes.UNAUTHORIZED: + # if unauthorized, the key might have expired + self._apiKey = self._getApiKey() + headers["X-Auth"] = self._apiKey + response = requests.request(verb, url, headers=headers, **kwargs) + response.raise_for_status() + return response + diff --git a/solo-tool-project/test/session_manager_unittest.py b/solo-tool-project/test/session_manager_unittest.py index 5786b23..ace1ccb 100644 --- a/solo-tool-project/test/session_manager_unittest.py +++ b/solo-tool-project/test/session_manager_unittest.py @@ -25,7 +25,7 @@ def testSessionFile(sessionPath, testSongs): @pytest.fixture def sessionManager(sessionPath): - return SessionManager(str(sessionPath)) + return SessionManager(str(sessionPath.parent)) def test_loadSession(sessionManager, mockPlayer, testSessionFile): sessions = sessionManager.getSessions() -- cgit v1.2.3