From d13dd449a7fe9a2d88075c8f3c3b8dc87d4f32eb Mon Sep 17 00:00:00 2001 From: Dilawar Singh <dilawar@subcom.tech> Date: Thu, 6 Oct 2022 16:54:38 +0530 Subject: [PATCH] temp: --- bitia/__main__.py | 94 ++++++++++++++++++++++++++++++++--------------- bitia/helper.py | 71 ++++++++++++++++++++++++++++++++--- bitia/session.py | 20 +++++++--- 3 files changed, 144 insertions(+), 41 deletions(-) diff --git a/bitia/__main__.py b/bitia/__main__.py index 6422b1e..9402df0 100644 --- a/bitia/__main__.py +++ b/bitia/__main__.py @@ -1,8 +1,11 @@ """BiTIA command line interface to submit job to the BiTIA server. -(c) 2022-, Subconscious Compute +https://bitia.link + +(c) 2022-, Subconscious Compute, https://subcom.tech """ +import typing as T import functools import bitia.helper as bhelper @@ -37,6 +40,61 @@ def session(func): return wrapper +@app.command("create-container") +@session +def create_remote_container( + user_input: str, recreate: bool = False, server: str = bconfig.DEFAULT_SERVER +): + """Run a pipeline""" + pipeline_zip, _ = bhelper.user_input_to_pipeline(user_input) + res = bhelper.post_pipeline_task( + pipeline_zip, + server=server, + endpoint="container/create", + params=dict(recreate="true" if recreate else "false"), + stream=True, + ) + res.raise_for_status() + for line in res.iter_lines(): + print(line.decode().rstrip()) + + +@app.command("list-container") +@session +def list_remote_container( + user_input: str, server: str = bconfig.DEFAULT_SERVER +) -> T.List[str]: + """List the remote server associated with the pipeline.""" + _, pipeline_hash = bhelper.user_input_to_pipeline(user_input) + logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}") + res = bhelper.get( + endpoint="container/list", + params=dict(pipeline_sha256=pipeline_hash), + server=server, + ) + res.raise_for_status() + _json = res.json() + print(_json) + return _json["containers"] + + +@app.command("logs") +@session +def stream_log(user_input: str, server: str = bconfig.DEFAULT_SERVER) -> T.List[str]: + """Stream logs for the most recent run of a given pipeline.""" + _, pipeline_hash = bhelper.user_input_to_pipeline(user_input) + logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}") + res = bhelper.get( + endpoint="logs", + params=dict(pipeline_sha256=pipeline_hash), + server=server, + stream=True, + ) + res.raise_for_status() + for line in res.iter_lines(): + print(line.decode().rstrip()) + + @app.command("submit") @session def submit_pipeline( @@ -47,10 +105,8 @@ def submit_pipeline( Prepare the user directory to send to the server. User can also provide link to the pipeline to run. """ - pipeline_zip = bhelper.user_input_to_pipeline(user_input) - containers = bhelper.post_pipeline_task( - pipeline_zip, rerun=rerun, server=server, endpoint=f"submit/?rerun={rerun}" - ) + create_remote_container(user_input, recreate=rerun) + containers = list_remote_container(user_input) logger.info(f" Got containers {containers}") return containers @@ -58,32 +114,12 @@ def submit_pipeline( @app.command("run") @session def run_pipeline( - user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER -): - """Run a pipeline""" - pipeline_zip = bhelper.user_input_to_pipeline(user_input) - res = bhelper.post_pipeline_task( - pipeline_zip, endpoint=f"submit?rerun={rerun}", server=server - ) - return bhelper.log_container(res["container"], server=server) - - -@app.command("create") -@session -def create_remote_infra( - user_input: str, recreate: bool = False, server: str = bconfig.DEFAULT_SERVER + user_input: str, *, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER ): """Run a pipeline""" - pipeline_zip = bhelper.user_input_to_pipeline(user_input) - res = bhelper.post_pipeline_task( - pipeline_zip, - server=server, - endpoint=f"container/create?recreate={recreate}", - stream=True, - ) - res.raise_for_status() - for line in res.iter_lines(): - print(line.decode().rstrip()) + create_remote_container(user_input, recreate=rerun, server=server) + containers = list_remote_container(user_input, server=server) + return [bhelper.log_container(container, server=server) for container in containers] @app.command() diff --git a/bitia/helper.py b/bitia/helper.py index 7facd6b..52cbee8 100644 --- a/bitia/helper.py +++ b/bitia/helper.py @@ -3,11 +3,13 @@ __author__ = "Dilawar Singh" __email__ = "dilawar@subcom.tech" +import hashlib import sys import zipfile import shutil from pathlib import Path import tempfile +import typing as T import validators import requests @@ -22,7 +24,6 @@ def log_container(container: str, server: str): assert ( container ), "Failed to determine the container that is runnning the pipeline. There is probably a bug in server end." - logger.info(f"Fetching logs from {container}") for line in bsession.fetch_logs(container, server=server): print(line.decode().rstrip()) @@ -98,7 +99,14 @@ def create_pipeline_from_command(cmd: str) -> Path: return prepare_archive(pipeline_dir) -def post_pipeline_task(pipeline_zip: Path, *, endpoint: str, server: str, **kwargs): +def post_pipeline_task( + pipeline_zip: Path, + *, + endpoint: str, + server: str, + params: T.Dict[str, str] = {}, + **kwargs, +): """Submit to the api for a given endpoint and pipeline file""" numbytes = pipeline_zip.stat().st_size if (code := _check_server_status(server)) != 200: @@ -121,12 +129,45 @@ def post_pipeline_task(pipeline_zip: Path, *, endpoint: str, server: str, **kwar return bsession.post( f"{server}/{endpoint}", files=files, + params=params, data=dict(filename=pipeline_zip).update(**kwargs), stream=kwargs.get("stream", False), ) -def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False): +def post( + endpoint: str, + *, + server: str, + stream: bool = False, + params: T.Dict[str, str] = {}, + **kwargs, +): + """A generic post function.""" + logger.info(f"Posting with data {kwargs}") + return bsession.post( + f"{server}/{endpoint}", json=kwargs, params=params, stream=stream + ) + + +def get( + endpoint: str, + *, + server: str, + stream: bool = False, + params: T.Dict[str, str] = {}, + **kwargs, +): + """A generic post function.""" + logger.info(f"Posting with data {kwargs}") + return bsession.get( + f"{server}/{endpoint}", params=params, json=kwargs, stream=stream + ) + + +def submit_job( + pipeline_zip: Path, *, server: str, rerun: bool = False, params: dict = {} +): """Submit job to the API and stream the output.""" numbytes = pipeline_zip.stat().st_size if (code := _check_server_status(server)) != 200: @@ -148,12 +189,30 @@ def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False): response = bsession.post( f"{server}/submit/?rerun={rerun}", files=files, - data=dict(filename=pipeline_zip, rerun=rerun), + params=params, + json=dict(filename=pipeline_zip, rerun=rerun), ) return response.json() -def user_input_to_pipeline(user_input: str) -> Path: +def sha256sum(infile: Path) -> str: + """Compute sha256sum of a file. + + Credit + ------ + Thanks https://stackoverflow.com/a/44873382/1805129 + + """ + h = hashlib.sha256() + b = bytearray(128 * 1024) + mv = memoryview(b) + with infile.open("rb", buffering=0) as handler: + while n := handler.readinto(mv): + h.update(mv[:n]) + return h.hexdigest() + + +def user_input_to_pipeline(user_input: str) -> tuple[Path, str]: """Create a pipeline file from user_input""" if (path := Path(user_input)).exists(): if path.is_dir(): @@ -171,4 +230,4 @@ def user_input_to_pipeline(user_input: str) -> Path: # generate a temporary pipeline and submit. pipeline_zip = create_pipeline_from_command(user_input) logger.info(f"Created pipeline in {pipeline_zip}") - return pipeline_zip + return pipeline_zip, sha256sum(pipeline_zip) diff --git a/bitia/session.py b/bitia/session.py index 6851c52..3dd0c28 100644 --- a/bitia/session.py +++ b/bitia/session.py @@ -5,7 +5,7 @@ import pickle import bitia.config as bconfig from bitia.logger import logger -g_session = requests.session() +g_session = requests.Session() SESSION_PICKLE_FILE = bconfig.bitia_dir() / ".session.pickle" @@ -13,8 +13,7 @@ def fetch_logs(container: str, *, server): """Fetch logs from a container.""" logger.info(f"Fetching logs for container `{container}`") return g_session.post( - f"{server}/logs", - json=dict(container=container, follow=True), + f"{server}/logs/?container={container}", stream=True, ) @@ -26,17 +25,26 @@ def save_session(): pickle.dump(g_session, fsession) -def load_session(): +def load_session(force_new: bool = False): """Load the pickled session.""" global g_session + if force_new: + g_session = requests.Session() + return if not SESSION_PICKLE_FILE.is_file(): - return None + g_session = requests.Session() + return with SESSION_PICKLE_FILE.open("rb") as fsession: try: + logger.info(f"Loading session from {fsession.name}") g_session = pickle.load(fsession) except Exception: - return None + g_session = requests.Session() def post(*args, **kwargs): return g_session.post(*args, **kwargs) + + +def get(*args, **kwargs): + return g_session.get(*args, **kwargs) -- GitLab