Skip to content
Snippets Groups Projects
Commit 2632adea authored by dilawar's avatar dilawar :ant:
Browse files

chore: `create` is working

server requires `container/create` endpoint.
parent cd009379
No related branches found
No related tags found
1 merge request!2version 0.2.0
Pipeline #3703 failed with stages
in 1 minute and 51 seconds
......@@ -48,7 +48,9 @@ def submit_pipeline(
to the pipeline to run.
"""
pipeline_zip = bhelper.user_input_to_pipeline(user_input)
containers = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server)
containers = bhelper.post_pipeline_task(
pipeline_zip, rerun=rerun, server=server, endpoint=f"submit/?rerun={rerun}"
)
logger.info(f" Got containers {containers}")
return containers
......@@ -60,19 +62,28 @@ def run_pipeline(
):
"""Run a pipeline"""
pipeline_zip = bhelper.user_input_to_pipeline(user_input)
res = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server)
res = bhelper.post_pipeline_task(
pipeline_zip, endpoint=f"submit?rerun={rerun}", server=server
)
return bhelper.log_container(res["container"], server=server)
@app.command("create")
@session
def create_remote_infra(
user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER
user_input: str, recreate: bool = False, server: str = bconfig.DEFAULT_SERVER
):
"""Run a pipeline"""
pipeline_zip = bhelper.user_input_to_pipeline(user_input)
res = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server)
return bhelper.log_container(res["container"], server=server)
res = bhelper.post_pipeline_task(
pipeline_zip,
server=server,
endpoint=f"container/create?recreate={recreate}",
stream=True,
)
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
@app.command()
......
......@@ -98,6 +98,34 @@ def create_pipeline_from_command(cmd: str) -> Path:
return prepare_archive(pipeline_dir)
def post_pipeline_task(pipeline_zip: Path, *, endpoint: str, server: str, **kwargs):
"""Submit to the api for a given endpoint and pipeline file"""
numbytes = pipeline_zip.stat().st_size
if (code := _check_server_status(server)) != 200:
logger.warning(
"%s may not be alive (status code: %s). Try other one?", server, code
)
sys.exit(1)
assert numbytes > 0
logger.info(
"Submitting %s (size=%.2f KB) to the %s",
pipeline_zip,
numbytes / 1024.0,
server,
)
# submit and print the output.
endpoint = endpoint.strip("/")
with pipeline_zip.open("rb") as f_pipeline:
files = {"pipeline_zip": f_pipeline}
return bsession.post(
f"{server}/{endpoint}",
files=files,
data=dict(filename=pipeline_zip).update(**kwargs),
stream=kwargs.get("stream", False),
)
def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False):
"""Submit job to the API and stream the output."""
numbytes = pipeline_zip.stat().st_size
......
......@@ -4,7 +4,9 @@ import logging
FORMAT = "%(message)s"
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
level="INFO",
format=FORMAT,
datefmt="[%X]", # handlers=[RichHandler()]
)
logger = logging.getLogger("bitia")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment