diff options
Diffstat (limited to 'solo-tool-project/src')
| -rw-r--r-- | solo-tool-project/src/solo_tool/session_manager.py | 117 | 
1 files changed, 68 insertions, 49 deletions
| diff --git a/solo-tool-project/src/solo_tool/session_manager.py b/solo-tool-project/src/solo_tool/session_manager.py index 2e99937..38dc827 100644 --- a/solo-tool-project/src/solo_tool/session_manager.py +++ b/solo-tool-project/src/solo_tool/session_manager.py @@ -1,96 +1,115 @@ -from typing import BinaryIO, Protocol +from typing import Protocol  from abc import abstractmethod +from . import SoloTool + +from pathlib import Path  from glob import glob  import json -from pathlib import Path -from . import SoloTool  import requests -class SessionManager(Protocol): -    @abstractmethod -    def getSessions(self) -> list[str]: -        raise NotImplementedError +class SessionManager(): +    def __init__(self, sessionPath: str): +        self._sessionPath = sessionPath -    @abstractmethod -    def loadSession(self, key: str, player=None) -> SoloTool: -        raise NotImplementedError +        from re import search +        match = search(r"^([a-z0-9]+://)", sessionPath) +        if not match or match.group(0) == "file://": +            self._backend = _FileSystemBackend(sessionPath) +        elif match.group(0) in ["http://", "https://"]: +            self._backend = _FileBrowserBackend(sessionPath) +        else: +            raise ValueError(f"Unsupported session path: {sessionPath}") -    @abstractmethod -    def saveSession(self, soloTool: SoloTool, key: str) -> None: -        raise NotImplementedError +    def getSessions(self) -> list[str]: +        return self._backend.listIds() -    @staticmethod -    def _dict2st(session: dict, player) -> SoloTool: -        st = SoloTool(player=player) +    def loadSession(self, id: str, player=None) -> SoloTool: +        session = self._backend.read(id) +        st = SoloTool(player=player)          for i, entry in enumerate(session):              songPath = entry["path"]              keyPoints = entry["key_points"]              st.addSong(songPath)              st._keyPoints[i] = keyPoints - +                  return st -    @staticmethod -    def _st2dict(soloTool: SoloTool) -> dict: +    def saveSession(self, soloTool: SoloTool, id: str) -> None:          session = [] +          for i, song in enumerate(soloTool.songs):              entry = {                  "path": song,                  "key_points" : soloTool._keyPoints[i]              }              session.append(entry) -        return session -class _FileSystemSessionManager(SessionManager): +        self._backend.write(session, id) + +class _Backend(Protocol): +    @abstractmethod +    def listIds(self) -> list[str]: +        raise NotImplementedError + +    @abstractmethod +    def read(self, id: str) -> dict: +        raise NotImplementedError + +    @abstractmethod +    def write(self, session: dict, id: str) -> None: +        raise NotImplementedError + +class _FileSystemBackend(_Backend):      def __init__(self, sessionPath: str):          self._sessionPath = Path(sessionPath) -    def getSessions(self) -> list[str]: +    def listIds(self) -> list[str]:          return [Path(f).stem for f in glob(f"{self._sessionPath}/*.json")] -    def loadSession(self, key: str, player=None) -> SoloTool: -        with open(self._sessionPath / f"{key}.json", "r") as f: +    def read(self, id: str) -> dict: +        with open(self._sessionPath / f"{id}.json", "r") as f:              session = json.load(f) -        return SessionManager._dict2st(session, player) +        return session -    def saveSession(self, soloTool: SoloTool, key: str) -> None: -        session = SessionManager._st2dict(soloTool) -        with open(self._sessionPath / f"{key}.json", "w") as f: +    def write(self, session: dict, id: str) -> None: +        with open(self._sessionPath / f"{id}.json", "w") as f:              json.dump(session, f) -class _FileBrowserSessionManager(SessionManager): -    def __init__(self, sessionUrl: str): -        self._baseUrl = "https://files.0xf7.com" +class _FileBrowserBackend(_Backend): +    def __init__(self, serverUrl: str): +        self._baseUrl = serverUrl          self._username = "solo-tool"          self._password = "mwC0ML8vLpJLPCLHKuxkiOxtIaE"          self._apiKey = self._getApiKey() -    def getSessions(self) -> list[str]: -        url = f"{self._baseUrl}/api/resources/sessions" -        response = requests.get(url, headers={"X-Auth":self._apiKey}) -        response.raise_for_status() +    def listIds(self) -> list[str]: +        url = f"{self._baseUrl}/api/resources" +        response = self._request("GET", url)          return [item["name"][0:-5] for item in response.json()["items"] if item["extension"] == ".json"] -    def loadSession(self, key: str, player=None) -> SoloTool: -        url = f"{self._baseUrl}/api/raw/sessions/{key}.json" -        response = requests.get(url, headers={"X-Auth":self._apiKey}) -        response.raise_for_status() -        return SessionManager._dict2st(json.loads(response.content), player=player) +    def read(self, id: str) -> dict: +        url = f"{self._baseUrl}/api/raw/{id}.json" +        response = self._request("GET", url) +        return json.loads(response.content) -    def saveSession(self, soloTool: SoloTool, key: str) -> None: -        pass +    def write(self, session: dict, id: str) -> None: +        url = f"{self._baseUrl}/api/resources/{id}.json" +        self._request("PUT", url, json=session)      def _getApiKey(self) -> str:          response = requests.post(f"{self._baseUrl}/api/login", json={"username":self._username, "password":self._password})          return response.content -def getSessionManager(sessionPath: str) -> SessionManager: -    from re import search -    match = search(r"^([a-z0-9]+://)", sessionPath) -    if not match or match.group(0) == "file://": -        return _FileSystemSessionManager(sessionPath) -    elif match.group(0) in ["http://", "https://"]: -        return _FileBrowserSessionManager(sessionPath) +    def _request(self, verb: str, url: str, **kwargs): +        headers = {"X-Auth" : self._apiKey} +        response = requests.request(verb, url, headers=headers, **kwargs) +        if response.status_code == requests.codes.UNAUTHORIZED: +            # if unauthorized, the key might have expired +            self._apiKey = self._getApiKey() +            headers["X-Auth"] = self._apiKey +            response = requests.request(verb, url, headers=headers, **kwargs) +        response.raise_for_status() +        return response | 
