diff --git a/bitia/__main__.py b/bitia/__main__.py index 877f61a8c1477c6d8d1110c8c2d04edb61eb3fd0..6422b1ea114324329384827a5ddc35537dd00004 100644 --- a/bitia/__main__.py +++ b/bitia/__main__.py @@ -48,7 +48,9 @@ def submit_pipeline( to the pipeline to run. """ pipeline_zip = bhelper.user_input_to_pipeline(user_input) - containers = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server) + containers = bhelper.post_pipeline_task( + pipeline_zip, rerun=rerun, server=server, endpoint=f"submit/?rerun={rerun}" + ) logger.info(f" Got containers {containers}") return containers @@ -60,19 +62,28 @@ def run_pipeline( ): """Run a pipeline""" pipeline_zip = bhelper.user_input_to_pipeline(user_input) - res = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server) + res = bhelper.post_pipeline_task( + pipeline_zip, endpoint=f"submit?rerun={rerun}", server=server + ) return bhelper.log_container(res["container"], server=server) @app.command("create") @session def create_remote_infra( - user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER + user_input: str, recreate: bool = False, server: str = bconfig.DEFAULT_SERVER ): """Run a pipeline""" pipeline_zip = bhelper.user_input_to_pipeline(user_input) - res = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server) - return bhelper.log_container(res["container"], server=server) + res = bhelper.post_pipeline_task( + pipeline_zip, + server=server, + endpoint=f"container/create?recreate={recreate}", + stream=True, + ) + res.raise_for_status() + for line in res.iter_lines(): + print(line.decode().rstrip()) @app.command() diff --git a/bitia/helper.py b/bitia/helper.py index 6d468fcf51d2ea29c3a9ae00fdb62736c124a065..7facd6bc59d84da70edaeadd31c03c3b4f7be8e7 100644 --- a/bitia/helper.py +++ b/bitia/helper.py @@ -98,6 +98,34 @@ def create_pipeline_from_command(cmd: str) -> Path: return prepare_archive(pipeline_dir) +def post_pipeline_task(pipeline_zip: Path, *, endpoint: str, server: str, **kwargs): + """Submit to the api for a given endpoint and pipeline file""" + numbytes = pipeline_zip.stat().st_size + if (code := _check_server_status(server)) != 200: + logger.warning( + "%s may not be alive (status code: %s). Try other one?", server, code + ) + sys.exit(1) + assert numbytes > 0 + logger.info( + "Submitting %s (size=%.2f KB) to the %s", + pipeline_zip, + numbytes / 1024.0, + server, + ) + + # submit and print the output. + endpoint = endpoint.strip("/") + with pipeline_zip.open("rb") as f_pipeline: + files = {"pipeline_zip": f_pipeline} + return bsession.post( + f"{server}/{endpoint}", + files=files, + data=dict(filename=pipeline_zip).update(**kwargs), + stream=kwargs.get("stream", False), + ) + + def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False): """Submit job to the API and stream the output.""" numbytes = pipeline_zip.stat().st_size diff --git a/bitia/logger.py b/bitia/logger.py index 30eadc12b51e4ff87aa7c94316865c98c4116ab0..e2c9bf8d0f62a2c0fa5aeed9169adfc7c88e452e 100644 --- a/bitia/logger.py +++ b/bitia/logger.py @@ -4,7 +4,9 @@ import logging FORMAT = "%(message)s" logging.basicConfig( - level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()] + level="INFO", + format=FORMAT, + datefmt="[%X]", # handlers=[RichHandler()] ) logger = logging.getLogger("bitia")