1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
from typing import Protocol
from abc import abstractmethod
from . import SoloTool
from pathlib import Path
from glob import glob
import json
import requests
class SessionManager():
def __init__(self, sessionPath: str):
self._sessionPath = sessionPath
from re import search
match = search(r"^([a-z0-9]+://)", sessionPath)
if not match or match.group(0) == "file://":
self._backend = _FileSystemBackend(sessionPath)
elif match.group(0) in ["http://", "https://"]:
self._backend = _FileBrowserBackend(sessionPath)
else:
raise ValueError(f"Unsupported session path: {sessionPath}")
def getSessions(self) -> list[str]:
return self._backend.listIds()
def loadSession(self, id: str, player=None) -> SoloTool:
session = self._backend.read(id)
st = SoloTool(player=player)
for i, entry in enumerate(session):
songPath = entry["path"]
keyPoints = entry["key_points"]
st.addSong(songPath)
st._keyPoints[i] = keyPoints
return st
def saveSession(self, soloTool: SoloTool, id: str) -> None:
session = []
for i, song in enumerate(soloTool.songs):
entry = {
"path": song,
"key_points" : soloTool._keyPoints[i]
}
session.append(entry)
self._backend.write(session, id)
class _Backend(Protocol):
@abstractmethod
def listIds(self) -> list[str]:
raise NotImplementedError
@abstractmethod
def read(self, id: str) -> dict:
raise NotImplementedError
@abstractmethod
def write(self, session: dict, id: str) -> None:
raise NotImplementedError
class _FileSystemBackend(_Backend):
def __init__(self, sessionPath: str):
self._sessionPath = Path(sessionPath)
def listIds(self) -> list[str]:
return [Path(f).stem for f in glob(f"{self._sessionPath}/*.json")]
def read(self, id: str) -> dict:
with open(self._sessionPath / f"{id}.json", "r") as f:
session = json.load(f)
return session
def write(self, session: dict, id: str) -> None:
with open(self._sessionPath / f"{id}.json", "w") as f:
json.dump(session, f)
class _FileBrowserBackend(_Backend):
def __init__(self, serverUrl: str):
self._baseUrl = serverUrl
self._username = "solo-tool"
self._password = "mwC0ML8vLpJLPCLHKuxkiOxtIaE"
self._apiKey = self._getApiKey()
def listIds(self) -> list[str]:
url = f"{self._baseUrl}/api/resources"
response = self._request("GET", url)
return [item["name"][0:-5] for item in response.json()["items"] if item["extension"] == ".json"]
def read(self, id: str) -> dict:
url = f"{self._baseUrl}/api/raw/{id}.json"
response = self._request("GET", url)
return json.loads(response.content)
def write(self, session: dict, id: str) -> None:
url = f"{self._baseUrl}/api/resources/{id}.json"
self._request("PUT", url, json=session)
def _getApiKey(self) -> str:
response = requests.post(f"{self._baseUrl}/api/login", json={"username":self._username, "password":self._password})
return response.content
def _request(self, verb: str, url: str, **kwargs):
headers = {"X-Auth" : self._apiKey}
response = requests.request(verb, url, headers=headers, **kwargs)
if response.status_code == requests.codes.UNAUTHORIZED:
# if unauthorized, the key might have expired
self._apiKey = self._getApiKey()
headers["X-Auth"] = self._apiKey
response = requests.request(verb, url, headers=headers, **kwargs)
response.raise_for_status()
return response
|