from io import StringIO
from pathlib import Path
import time
from typing import Optional
import os
import requests
import pandas as pd
from cs_kit.exceptions import APIException
[docs]class ComputeStudio:
"""
Python client for the ComputeStudio webapp.
- Run simulations
- Update simulation metadata
- Download your results
.. code-block:: python
client = ComputeStudio("PSLmodels", "TaxBrain")
client.create()
Learn how to get your API token from the
`Authentication docs <https://docs.compute.studio/api/auth.html>`_. Once you have your token,
you can save it in a file named ``.cs_api_token`` in the home directory of your
computer. You can also set it as an environment variable or pass it directly
to the ``ComputeStudio`` class.
"""
host = "https://compute.studio"
def __init__(self, owner: str, title: str, api_token: Optional[str] = None):
self.owner = owner
self.title = title
api_token = self.get_token(api_token)
self.auth_header = {"Authorization": f"Token {api_token}"}
self.sim_url = f"{self.host}/{owner}/{title}/api/v1/"
self.inputs_url = f"{self.host}/{owner}/{title}/api/v1/inputs/"
[docs] def create(self, adjustment: dict = None, meta_parameters: dict = None):
"""
Create a simulation on Compute Studio.
Parameters
----------
adjustment : dict
Parameter values in the `ParamTools format <https://paramtools.dev/api/reference.html>`_.
meta_parameters: dict
Meta parameters for the simulation in a ``key:value`` format.
Returns
--------
response: dict
Response from the Compute Studio server. Use this to get the simulation ID and status.
"""
adjustment = adjustment or {}
meta_parameters = meta_parameters or {}
resp = requests.post(
self.sim_url,
json={"adjustment": adjustment, "meta_parameters": meta_parameters},
headers=self.auth_header,
)
if resp.status_code == 201:
data = resp.json()
pollresp = requests.get(
f"{self.sim_url}{data['sim']['model_pk']}/edit/",
headers=self.auth_header,
)
polldata = pollresp.json()
while pollresp.status_code == 200 and polldata["status"] == "PENDING":
time.sleep(3)
pollresp = requests.get(
f"{self.sim_url}{data['sim']['model_pk']}/edit/",
headers=self.auth_header,
)
polldata = pollresp.json()
if pollresp.status_code == 200 and polldata["status"] == "SUCCESS":
simresp = requests.get(
f"{self.sim_url}{data['sim']['model_pk']}/remote/",
headers=self.auth_header,
)
return simresp.json()
else:
raise APIException(pollresp.json())
raise APIException(resp.json())
[docs] def detail(
self,
model_pk: int,
include_outputs: bool = False,
wait: bool = True,
polling_interval: int = 5,
timeout: int = 600,
):
"""
Get detail for a simulation.
Parameters
----------
model_pk : int
ID for the simulation.
include_outputs: bool
Include outputs from the simulation in addition to the simulation metadata.
wait: bool
Meta parameters for the simulation in a key:value format.
polling_interval: int
Polling interval dictates how often the status of the results will be checked.
timeout: int
Time in seconds to wait for the simulation to finish.
Returns
--------
response: dict
Response from the Compute Studio server.
"""
if include_outputs:
url = f"{self.sim_url}{model_pk}/"
else:
url = f"{self.sim_url}{model_pk}/remote/"
start = time.time()
while True:
if (time.time() - start) > timeout:
raise TimeoutError(f"Simulation not ready in under {timeout} seconds.")
resp = requests.get(url, headers=self.auth_header)
if resp.status_code == 202 and wait:
continue # waiting on the simulation to finish.
elif resp.status_code == 202 and not wait:
return resp.json()
elif resp.status_code == 200:
return resp.json()
else:
raise APIException(resp.json())
time.sleep(polling_interval)
[docs] def results(self, model_pk: int, timeout: int = 600):
"""
Retrieve and parse results into the appropriate data structure. Currently,
CSV outputs are loaded into a pandas `DataFrame`. Other outputs are returned
as is.
Parameters
----------
model_pk: int
ID for the simulation.
timeout: int
Time in seconds to wait for the simulation to finish.
Returns
-------
result: dict
Dictionary of simulation outputs formated as title:output.
"""
result = self.detail(model_pk, include_outputs=True, wait=True, timeout=timeout)
res = {}
for output in result["outputs"]["downloadable"]:
if output["media_type"] == "CSV":
res[output["title"]] = pd.read_csv(StringIO(output["data"]))
else:
res[output["title"]] = output["data"]
return res
[docs] def update(
self,
model_pk: int,
title: Optional[str] = None,
is_public: Optional[bool] = None,
notify_on_completion: Optional[bool] = None,
):
"""
Update meta data about a simulation.
.. code-block:: python
cs.update(
model_pk=123,
title="hello world",
is_public=True,
notify_on_completion=True
)
Parameters
----------
model_pk: int
ID for the simulation.
title: str
Title of the simulation.
is_public: bool
Set whether simulation is public or private.
Notify_on_completion: bool
Send an email notification when the simulation completes.
Returns
-------
response: dict
Response from the Compute Studio server.
"""
vals = [
("title", title),
("is_public", is_public),
("notify_on_completion", notify_on_completion),
]
sim_kwargs = {}
for name, val in vals:
if val is not None:
sim_kwargs[name] = val
resp = requests.put(
f"{self.sim_url}{model_pk}/", json=sim_kwargs, headers=self.auth_header,
)
if resp.status_code == 200:
return resp.json()
else:
raise APIException(resp.json())
[docs] def get_token(self, api_token):
"""Retrieve the API token"""
token_file_path = Path.home() / ".cs-api-token"
if api_token:
return api_token
elif os.environ.get("CS_API_TOKEN", None) is not None:
return os.environ["CS_API_TOKEN"]
elif token_file_path.exists():
with open(token_file_path, "r") as f:
return f.read().strip()
else:
raise APIException(
f"API token not found. It can be passed as an argument to "
f"this class, as an environment variable at CS_API_TOKEN, "
f"or read from {token_file_path}"
)