Skip to content
Snippets Groups Projects
Commit d13dd449 authored by dilawar's avatar dilawar :ant:
Browse files

temp:

parent 2632adea
No related branches found
No related tags found
1 merge request!2version 0.2.0
Pipeline #3705 failed with stages
in 1 minute and 41 seconds
"""BiTIA command line interface to submit job to the BiTIA server. """BiTIA command line interface to submit job to the BiTIA server.
(c) 2022-, Subconscious Compute https://bitia.link
(c) 2022-, Subconscious Compute, https://subcom.tech
""" """
import typing as T
import functools import functools
import bitia.helper as bhelper import bitia.helper as bhelper
...@@ -37,6 +40,61 @@ def session(func): ...@@ -37,6 +40,61 @@ def session(func):
return wrapper return wrapper
@app.command("create-container")
@session
def create_remote_container(
user_input: str, recreate: bool = False, server: str = bconfig.DEFAULT_SERVER
):
"""Run a pipeline"""
pipeline_zip, _ = bhelper.user_input_to_pipeline(user_input)
res = bhelper.post_pipeline_task(
pipeline_zip,
server=server,
endpoint="container/create",
params=dict(recreate="true" if recreate else "false"),
stream=True,
)
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
@app.command("list-container")
@session
def list_remote_container(
user_input: str, server: str = bconfig.DEFAULT_SERVER
) -> T.List[str]:
"""List the remote server associated with the pipeline."""
_, pipeline_hash = bhelper.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}")
res = bhelper.get(
endpoint="container/list",
params=dict(pipeline_sha256=pipeline_hash),
server=server,
)
res.raise_for_status()
_json = res.json()
print(_json)
return _json["containers"]
@app.command("logs")
@session
def stream_log(user_input: str, server: str = bconfig.DEFAULT_SERVER) -> T.List[str]:
"""Stream logs for the most recent run of a given pipeline."""
_, pipeline_hash = bhelper.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}")
res = bhelper.get(
endpoint="logs",
params=dict(pipeline_sha256=pipeline_hash),
server=server,
stream=True,
)
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
@app.command("submit") @app.command("submit")
@session @session
def submit_pipeline( def submit_pipeline(
...@@ -47,10 +105,8 @@ def submit_pipeline( ...@@ -47,10 +105,8 @@ def submit_pipeline(
Prepare the user directory to send to the server. User can also provide link Prepare the user directory to send to the server. User can also provide link
to the pipeline to run. to the pipeline to run.
""" """
pipeline_zip = bhelper.user_input_to_pipeline(user_input) create_remote_container(user_input, recreate=rerun)
containers = bhelper.post_pipeline_task( containers = list_remote_container(user_input)
pipeline_zip, rerun=rerun, server=server, endpoint=f"submit/?rerun={rerun}"
)
logger.info(f" Got containers {containers}") logger.info(f" Got containers {containers}")
return containers return containers
...@@ -58,32 +114,12 @@ def submit_pipeline( ...@@ -58,32 +114,12 @@ def submit_pipeline(
@app.command("run") @app.command("run")
@session @session
def run_pipeline( def run_pipeline(
user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER user_input: str, *, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER
):
"""Run a pipeline"""
pipeline_zip = bhelper.user_input_to_pipeline(user_input)
res = bhelper.post_pipeline_task(
pipeline_zip, endpoint=f"submit?rerun={rerun}", server=server
)
return bhelper.log_container(res["container"], server=server)
@app.command("create")
@session
def create_remote_infra(
user_input: str, recreate: bool = False, server: str = bconfig.DEFAULT_SERVER
): ):
"""Run a pipeline""" """Run a pipeline"""
pipeline_zip = bhelper.user_input_to_pipeline(user_input) create_remote_container(user_input, recreate=rerun, server=server)
res = bhelper.post_pipeline_task( containers = list_remote_container(user_input, server=server)
pipeline_zip, return [bhelper.log_container(container, server=server) for container in containers]
server=server,
endpoint=f"container/create?recreate={recreate}",
stream=True,
)
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
@app.command() @app.command()
......
...@@ -3,11 +3,13 @@ ...@@ -3,11 +3,13 @@
__author__ = "Dilawar Singh" __author__ = "Dilawar Singh"
__email__ = "dilawar@subcom.tech" __email__ = "dilawar@subcom.tech"
import hashlib
import sys import sys
import zipfile import zipfile
import shutil import shutil
from pathlib import Path from pathlib import Path
import tempfile import tempfile
import typing as T
import validators import validators
import requests import requests
...@@ -22,7 +24,6 @@ def log_container(container: str, server: str): ...@@ -22,7 +24,6 @@ def log_container(container: str, server: str):
assert ( assert (
container container
), "Failed to determine the container that is runnning the pipeline. There is probably a bug in server end." ), "Failed to determine the container that is runnning the pipeline. There is probably a bug in server end."
logger.info(f"Fetching logs from {container}")
for line in bsession.fetch_logs(container, server=server): for line in bsession.fetch_logs(container, server=server):
print(line.decode().rstrip()) print(line.decode().rstrip())
...@@ -98,7 +99,14 @@ def create_pipeline_from_command(cmd: str) -> Path: ...@@ -98,7 +99,14 @@ def create_pipeline_from_command(cmd: str) -> Path:
return prepare_archive(pipeline_dir) return prepare_archive(pipeline_dir)
def post_pipeline_task(pipeline_zip: Path, *, endpoint: str, server: str, **kwargs): def post_pipeline_task(
pipeline_zip: Path,
*,
endpoint: str,
server: str,
params: T.Dict[str, str] = {},
**kwargs,
):
"""Submit to the api for a given endpoint and pipeline file""" """Submit to the api for a given endpoint and pipeline file"""
numbytes = pipeline_zip.stat().st_size numbytes = pipeline_zip.stat().st_size
if (code := _check_server_status(server)) != 200: if (code := _check_server_status(server)) != 200:
...@@ -121,12 +129,45 @@ def post_pipeline_task(pipeline_zip: Path, *, endpoint: str, server: str, **kwar ...@@ -121,12 +129,45 @@ def post_pipeline_task(pipeline_zip: Path, *, endpoint: str, server: str, **kwar
return bsession.post( return bsession.post(
f"{server}/{endpoint}", f"{server}/{endpoint}",
files=files, files=files,
params=params,
data=dict(filename=pipeline_zip).update(**kwargs), data=dict(filename=pipeline_zip).update(**kwargs),
stream=kwargs.get("stream", False), stream=kwargs.get("stream", False),
) )
def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False): def post(
endpoint: str,
*,
server: str,
stream: bool = False,
params: T.Dict[str, str] = {},
**kwargs,
):
"""A generic post function."""
logger.info(f"Posting with data {kwargs}")
return bsession.post(
f"{server}/{endpoint}", json=kwargs, params=params, stream=stream
)
def get(
endpoint: str,
*,
server: str,
stream: bool = False,
params: T.Dict[str, str] = {},
**kwargs,
):
"""A generic post function."""
logger.info(f"Posting with data {kwargs}")
return bsession.get(
f"{server}/{endpoint}", params=params, json=kwargs, stream=stream
)
def submit_job(
pipeline_zip: Path, *, server: str, rerun: bool = False, params: dict = {}
):
"""Submit job to the API and stream the output.""" """Submit job to the API and stream the output."""
numbytes = pipeline_zip.stat().st_size numbytes = pipeline_zip.stat().st_size
if (code := _check_server_status(server)) != 200: if (code := _check_server_status(server)) != 200:
...@@ -148,12 +189,30 @@ def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False): ...@@ -148,12 +189,30 @@ def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False):
response = bsession.post( response = bsession.post(
f"{server}/submit/?rerun={rerun}", f"{server}/submit/?rerun={rerun}",
files=files, files=files,
data=dict(filename=pipeline_zip, rerun=rerun), params=params,
json=dict(filename=pipeline_zip, rerun=rerun),
) )
return response.json() return response.json()
def user_input_to_pipeline(user_input: str) -> Path: def sha256sum(infile: Path) -> str:
"""Compute sha256sum of a file.
Credit
------
Thanks https://stackoverflow.com/a/44873382/1805129
"""
h = hashlib.sha256()
b = bytearray(128 * 1024)
mv = memoryview(b)
with infile.open("rb", buffering=0) as handler:
while n := handler.readinto(mv):
h.update(mv[:n])
return h.hexdigest()
def user_input_to_pipeline(user_input: str) -> tuple[Path, str]:
"""Create a pipeline file from user_input""" """Create a pipeline file from user_input"""
if (path := Path(user_input)).exists(): if (path := Path(user_input)).exists():
if path.is_dir(): if path.is_dir():
...@@ -171,4 +230,4 @@ def user_input_to_pipeline(user_input: str) -> Path: ...@@ -171,4 +230,4 @@ def user_input_to_pipeline(user_input: str) -> Path:
# generate a temporary pipeline and submit. # generate a temporary pipeline and submit.
pipeline_zip = create_pipeline_from_command(user_input) pipeline_zip = create_pipeline_from_command(user_input)
logger.info(f"Created pipeline in {pipeline_zip}") logger.info(f"Created pipeline in {pipeline_zip}")
return pipeline_zip return pipeline_zip, sha256sum(pipeline_zip)
...@@ -5,7 +5,7 @@ import pickle ...@@ -5,7 +5,7 @@ import pickle
import bitia.config as bconfig import bitia.config as bconfig
from bitia.logger import logger from bitia.logger import logger
g_session = requests.session() g_session = requests.Session()
SESSION_PICKLE_FILE = bconfig.bitia_dir() / ".session.pickle" SESSION_PICKLE_FILE = bconfig.bitia_dir() / ".session.pickle"
...@@ -13,8 +13,7 @@ def fetch_logs(container: str, *, server): ...@@ -13,8 +13,7 @@ def fetch_logs(container: str, *, server):
"""Fetch logs from a container.""" """Fetch logs from a container."""
logger.info(f"Fetching logs for container `{container}`") logger.info(f"Fetching logs for container `{container}`")
return g_session.post( return g_session.post(
f"{server}/logs", f"{server}/logs/?container={container}",
json=dict(container=container, follow=True),
stream=True, stream=True,
) )
...@@ -26,17 +25,26 @@ def save_session(): ...@@ -26,17 +25,26 @@ def save_session():
pickle.dump(g_session, fsession) pickle.dump(g_session, fsession)
def load_session(): def load_session(force_new: bool = False):
"""Load the pickled session.""" """Load the pickled session."""
global g_session global g_session
if force_new:
g_session = requests.Session()
return
if not SESSION_PICKLE_FILE.is_file(): if not SESSION_PICKLE_FILE.is_file():
return None g_session = requests.Session()
return
with SESSION_PICKLE_FILE.open("rb") as fsession: with SESSION_PICKLE_FILE.open("rb") as fsession:
try: try:
logger.info(f"Loading session from {fsession.name}")
g_session = pickle.load(fsession) g_session = pickle.load(fsession)
except Exception: except Exception:
return None g_session = requests.Session()
def post(*args, **kwargs): def post(*args, **kwargs):
return g_session.post(*args, **kwargs) return g_session.post(*args, **kwargs)
def get(*args, **kwargs):
return g_session.get(*args, **kwargs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment