Skip to content
Snippets Groups Projects

version 0.2.0

Merged dilawar requested to merge devel into main
Files
8
+ 16
12
@@ -12,7 +12,8 @@ import functools
import bitia.helper as bhelper
from bitia.logger import logger
import bitia
from bitia import version as bversion
import bitia.pipeline as bpipeline
import bitia.session as bsession
import bitia.config as bconfig
@@ -25,7 +26,7 @@ app = typer.Typer()
def version_callback(value: bool):
"""callback for version"""
if value:
print(bitia.version())
print(version())
def session(func):
@@ -45,9 +46,9 @@ def session(func):
@session
def create_remote_container(user_input: str, recreate: bool = False):
"""Run a pipeline"""
pipeline_zip, _ = bhelper.user_input_to_pipeline(user_input)
pipeline = bpipeline.user_input_to_pipeline(user_input)
res = bhelper.post_pipeline_task(
pipeline_zip,
pipeline.zipfile,
endpoint="container/create",
params=dict(recreate="true" if recreate else "false"),
server=bconfig.server(),
@@ -56,6 +57,7 @@ def create_remote_container(user_input: str, recreate: bool = False):
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
return res
@app.command("list-container")
@@ -64,12 +66,13 @@ def list_remote_container(
user_input: str, server: str = bconfig.g_server
) -> T.List[str]:
"""List the remote server associated with the pipeline."""
_, pipeline_hash = bhelper.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}")
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`")
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
res = bhelper.get(
endpoint="container/list",
server=bconfig.server(),
params=dict(pipeline_sha256=pipeline_hash),
params=dict(pipeline_sha256=pipeline.checksum),
)
res.raise_for_status()
return res.json()["containers"].split(",")
@@ -79,11 +82,11 @@ def list_remote_container(
@session
def stream_log(user_input: str, server: str = bconfig.g_server):
"""Stream logs for the most recent run of a given pipeline."""
_, pipeline_hash = bhelper.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}")
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
res = bhelper.get(
endpoint="logs",
params=dict(pipeline_sha256=pipeline_hash),
params=dict(pipeline_sha256=pipeline.checksum),
server=bconfig.server(),
stream=True,
)
@@ -100,7 +103,8 @@ def submit_pipeline(user_input: str, rerun: bool = False):
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
create_remote_container(user_input, recreate=rerun)
res = create_remote_container(user_input, recreate=rerun)
logger.info(f"Remote container: {res}")
containers = list_remote_container(user_input)
logger.info(f" Got containers {containers}")
return containers
@@ -138,7 +142,7 @@ def main(verbose: bool = False, server: T.Optional[str] = None):
@app.command()
def version():
"""version information"""
print(bitia.version())
print(bversion())
if __name__ == "__main__":
Loading