aboutsummaryrefslogtreecommitdiffstats
path: root/solo-tool-project/src/solo_tool/session_manager.py
blob: 38dc82774e0bcbbbb2a0264dd9cc1071671b486d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from typing import Protocol
from abc import abstractmethod
from . import SoloTool

from pathlib import Path
from glob import glob
import json
import requests

class SessionManager():
    def __init__(self, sessionPath: str):
        self._sessionPath = sessionPath

        from re import search
        match = search(r"^([a-z0-9]+://)", sessionPath)
        if not match or match.group(0) == "file://":
            self._backend = _FileSystemBackend(sessionPath)
        elif match.group(0) in ["http://", "https://"]:
            self._backend = _FileBrowserBackend(sessionPath)
        else:
            raise ValueError(f"Unsupported session path: {sessionPath}")

    def getSessions(self) -> list[str]:
        return self._backend.listIds()

    def loadSession(self, id: str, player=None) -> SoloTool:
        session = self._backend.read(id)

        st = SoloTool(player=player)
        for i, entry in enumerate(session):
            songPath = entry["path"]
            keyPoints = entry["key_points"]

            st.addSong(songPath)
            st._keyPoints[i] = keyPoints
        
        return st

    def saveSession(self, soloTool: SoloTool, id: str) -> None:
        session = []

        for i, song in enumerate(soloTool.songs):
            entry = {
                "path": song,
                "key_points" : soloTool._keyPoints[i]
            }
            session.append(entry)

        self._backend.write(session, id)

class _Backend(Protocol):
    @abstractmethod
    def listIds(self) -> list[str]:
        raise NotImplementedError

    @abstractmethod
    def read(self, id: str) -> dict:
        raise NotImplementedError

    @abstractmethod
    def write(self, session: dict, id: str) -> None:
        raise NotImplementedError

class _FileSystemBackend(_Backend):
    def __init__(self, sessionPath: str):
        self._sessionPath = Path(sessionPath)

    def listIds(self) -> list[str]:
        return [Path(f).stem for f in glob(f"{self._sessionPath}/*.json")]

    def read(self, id: str) -> dict:
        with open(self._sessionPath / f"{id}.json", "r") as f:
            session = json.load(f)
        return session

    def write(self, session: dict, id: str) -> None:
        with open(self._sessionPath / f"{id}.json", "w") as f:
            json.dump(session, f)

class _FileBrowserBackend(_Backend):
    def __init__(self, serverUrl: str):
        self._baseUrl = serverUrl
        self._username = "solo-tool"
        self._password = "mwC0ML8vLpJLPCLHKuxkiOxtIaE"
        self._apiKey = self._getApiKey()

    def listIds(self) -> list[str]:
        url = f"{self._baseUrl}/api/resources"
        response = self._request("GET", url)
        return [item["name"][0:-5] for item in response.json()["items"] if item["extension"] == ".json"]

    def read(self, id: str) -> dict:
        url = f"{self._baseUrl}/api/raw/{id}.json"
        response = self._request("GET", url)
        return json.loads(response.content)

    def write(self, session: dict, id: str) -> None:
        url = f"{self._baseUrl}/api/resources/{id}.json"
        self._request("PUT", url, json=session)

    def _getApiKey(self) -> str:
        response = requests.post(f"{self._baseUrl}/api/login", json={"username":self._username, "password":self._password})
        return response.content

    def _request(self, verb: str, url: str, **kwargs):
        headers = {"X-Auth" : self._apiKey}
        response = requests.request(verb, url, headers=headers, **kwargs)
        if response.status_code == requests.codes.UNAUTHORIZED:
            # if unauthorized, the key might have expired
            self._apiKey = self._getApiKey()
            headers["X-Auth"] = self._apiKey
            response = requests.request(verb, url, headers=headers, **kwargs)
        response.raise_for_status()
        return response