Skip to content
Snippets Groups Projects

Hotfix: fix to assert error post installation

Merged dilawar requested to merge hotfix into main
Files
6
+ 14
14
@@ -13,7 +13,7 @@ from enum import Enum
from rich.progress import track
import bitia.helper as bhelper
from bitia.logger import logger, cprint, set_logger_level
from bitia.logger import logger, cprint, set_logger_level, console
from bitia import version as bversion
import bitia.pipeline as bpipeline
@@ -81,10 +81,13 @@ def create_remote_container(
@app.command("list-container")
@session
def list_remote_container(
user_input, server: str = bconfig.get_server()
) -> T.List[str]:
def list_containers(user_input):
"""List the remote server associated with the pipeline."""
for container in _list_remote_container(user_input):
cprint(container)
def _list_remote_container(user_input) -> T.List[str]:
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`")
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
@@ -94,9 +97,7 @@ def list_remote_container(
params=dict(pipeline_sha256=pipeline.checksum),
)
res.raise_for_status()
containers = res.json()["containers"].split(",")
cprint(containers)
return containers
return res.json()["containers"].split(",")
@app.command("logs")
@@ -113,7 +114,7 @@ def stream_log(user_input, server: str = bconfig.get_server()):
)
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
cprint(line.decode().rstrip())
@app.command("submit")
@@ -126,7 +127,7 @@ def submit_pipeline(user_input, *, rerun: bool = False, output_lines: T.List[str
"""
res = create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
logger.info("Remote container: %s", res)
containers = list_remote_container(user_input)
containers = _list_remote_container(user_input)
cprint(f"{containers}")
return containers
@@ -136,11 +137,10 @@ def submit_pipeline(user_input, *, rerun: bool = False, output_lines: T.List[str
def run_user_input(user_input, *, rerun: bool = False, output_lines: T.List[str] = []):
"""Run a pipeline"""
create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
containers = list_remote_container(user_input)
return [
bhelper.log_container(container, server=bconfig.get_server())
for container in containers
]
containers = _list_remote_container(user_input)
for container in containers:
for _bl in bhelper.log_container(container, server=bconfig.get_server()):
print(_bl.decode().rstrip())
@app.command("checksum")
Loading