pelican-port-forwarding/api/pelican.py

50 lines
1.5 KiB
Python
Raw Permalink Normal View History

import json
2025-03-02 14:40:53 +00:00
from api import Api, ApiAuthType
from typing import Tuple
class PelicanApi(Api):
def __init__(self, base_url, auth_token, client_auth_token):
2025-03-02 14:40:53 +00:00
self.base_url = base_url
self.auth_token = auth_token
self.client_auth_token = client_auth_token
2025-03-02 14:40:53 +00:00
def apply_authentication(self, use_alt_auth: bool = False) -> Tuple[ApiAuthType, dict]:
if use_alt_auth:
return ApiAuthType.Header, {
"Authorization": "Bearer " + self.client_auth_token
}
2025-03-02 14:40:53 +00:00
return ApiAuthType.Header, {
"Authorization": "Bearer " + self.auth_token
}
def transform(self, data):
if "object" not in data:
print("No object in data")
print(json.dumps(data))
return data
2025-03-02 14:40:53 +00:00
type = data["object"]
if type == "list":
return [self.transform(x) for x in data["data"]]
if type in ["allocation", "server", "node", "stats"]:
2025-03-02 14:40:53 +00:00
return data["attributes"]
return data
def nodes(self):
return self._get("/application/nodes")
def servers(self):
return self._get("/application/servers")
def allocations(self, node_id: int):
return self._get("/application/nodes/" + str(node_id) + "/allocations")
def server_allocations(self, identifier):
return self._get("/client/servers/" + str(identifier) + "/network/allocations")
def resources(self, identifier):
return self._get("/client/servers/" + str(identifier) + "/resources")