Skip to content
Snippets Groups Projects
Commit 9c1d0a18 authored by dilawar's avatar dilawar :ant:
Browse files

refactor: Too many cleanups

parent db482951
No related branches found
Tags v0.1.2
1 merge request!2version 0.2.0
Pipeline #3761 failed with stages
in 2 minutes
......@@ -4,7 +4,7 @@ import logging
from rich.logging import RichHandler
import logging.handlers
log_level = os.environ.get("BITIA_LOGLEVEL", "NOTSET").upper()
log_level = os.environ.get("BITIA_LOGLEVEL", "WARNING").upper()
logging.basicConfig(
format="%(message)s",
level=log_level,
......
......@@ -8,9 +8,12 @@ https://bitia.link
import typing as T
import os
import functools
from enum import Enum
from rich.progress import track
import bitia.helper as bhelper
from bitia.logger import logger
from bitia.logger import logger, cprint, set_logger_level
from bitia import version as bversion
import bitia.pipeline as bpipeline
......@@ -23,6 +26,12 @@ import typer
app = typer.Typer()
class VerbosityLevel(str, Enum):
debug = "debug"
info = "info"
warning = "warning"
def version_callback(value: bool):
"""callback for version"""
if value:
......@@ -44,48 +53,62 @@ def session(func):
@app.command("create-container")
@session
def create_remote_container(user_input, recreate: bool = False):
"""Run a pipeline"""
def create_remote_container(
user_input, *, recreate: bool = False, output_lines: T.List[str] = []
):
"""Create container for the pipeline. The container starts running
immediately on the server. Use command `logs` to stream the output.
"""
pipeline = bpipeline.user_input_to_pipeline(user_input)
# for a command pipeline, always create a new container.
if pipeline.is_a_command():
recreate = True
res = bhelper.post_pipeline_task(
pipeline.zipfile,
pipeline,
endpoint="container/create",
params=dict(recreate="true" if recreate else "false"),
server=bconfig.server(),
server=bconfig.get_server(),
stream=True,
)
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
for line in track(
res.iter_lines(), description="BiTIA is setting up required infra..."
):
output_lines.append(line.decode().rstrip())
logger.info(output_lines[-1])
return res
@app.command("list-container")
@session
def list_remote_container(user_input, server: str = bconfig.g_server) -> T.List[str]:
def list_remote_container(
user_input, server: str = bconfig.get_server()
) -> T.List[str]:
"""List the remote server associated with the pipeline."""
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`")
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
res = bhelper.get(
endpoint="container/list",
server=bconfig.server(),
server=bconfig.get_server(),
params=dict(pipeline_sha256=pipeline.checksum),
)
res.raise_for_status()
return res.json()["containers"].split(",")
containers = res.json()["containers"].split(",")
cprint(containers)
return containers
@app.command("logs")
@session
def stream_log(user_input, server: str = bconfig.g_server):
def stream_log(user_input, server: str = bconfig.get_server()):
"""Stream logs for the most recent run of a given pipeline."""
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
res = bhelper.get(
endpoint="logs",
params=dict(pipeline_sha256=pipeline.checksum),
server=bconfig.server(),
server=bconfig.get_server(),
stream=True,
)
res.raise_for_status()
......@@ -95,27 +118,27 @@ def stream_log(user_input, server: str = bconfig.g_server):
@app.command("submit")
@session
def submit_pipeline(user_input, rerun: bool = False):
def submit_pipeline(user_input, *, rerun: bool = False, output_lines: T.List[str]):
"""Submit your pipelin (url, directory, zip_file).
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
res = create_remote_container(user_input, recreate=rerun)
logger.info(f"Remote container: {res}")
res = create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
logger.info("Remote container: %s", res)
containers = list_remote_container(user_input)
logger.info(f" Got containers {containers}")
cprint(f"{containers}")
return containers
@app.command("run")
@session
def run_user_input(user_input, *, rerun: bool = False):
def run_user_input(user_input, *, rerun: bool = False, output_lines: T.List[str] = []):
"""Run a pipeline"""
create_remote_container(user_input, recreate=rerun)
create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
containers = list_remote_container(user_input)
return [
bhelper.log_container(container, server=bconfig.server())
bhelper.log_container(container, server=bconfig.get_server())
for container in containers
]
......@@ -123,30 +146,35 @@ def run_user_input(user_input, *, rerun: bool = False):
@app.command("checksum")
def checksum(user_input):
pipeline = bpipeline.user_input_to_pipeline(user_input)
print(pipeline.checksum)
cprint(pipeline.checksum)
@app.callback()
def main(verbose: bool = False, server: T.Optional[str] = None):
def main(
verbose: VerbosityLevel = typer.Option(
VerbosityLevel.warning, case_sensitive=False
),
server: T.Optional[str] = None,
):
"""
Callback
"""
if verbose:
logger.info("--verbose mode enabled")
set_logger_level(verbose.value)
if server is not None:
bconfig.set_server(server)
elif os.environ.get("BITIA_SERVER") is not None:
bconfig.set_server(os.environ["BITIA_SERVER"])
else:
# TODO: Read from default config file.
pass
logger.info(f"Using server {bconfig.server()}")
cprint(f"Using server {bconfig.get_server()}")
@app.command()
def version():
"""version information"""
print(bversion())
cprint(bversion())
if __name__ == "__main__":
......
import os
import typing as T
from pathlib import Path
import tempfile
......@@ -14,8 +15,10 @@ def bitia_dir() -> Path:
return bdir
def server() -> str:
def get_server(use_env: bool = True) -> str:
"""Server to use"""
if use_env and os.environ.get("BITIA_SERVER") is not None:
return os.environ["BITIA_SERVER"]
return g_server
......
......@@ -37,7 +37,7 @@ def _check_server_status(server: str) -> int:
def post_pipeline_task(
pipeline_zip: Path,
pipeline: Pipeline,
*,
endpoint: str,
server: str,
......@@ -45,16 +45,17 @@ def post_pipeline_task(
**kwargs,
):
"""Submit to the api for a given endpoint and pipeline file"""
numbytes = pipeline_zip.stat().st_size
if (code := _check_server_status(server)) != 200:
logger.warning(
"%s may not be alive (status code: %s). Try other one?", server, code
)
sys.exit(1)
pipeline_zip = pipeline.zipfile
numbytes = pipeline_zip.stat().st_size
assert numbytes > 0
logger.info(
"Submitting %s (size=%.2f KB) to the %s",
pipeline_zip,
pipeline.zipfile,
numbytes / 1024.0,
server,
)
......@@ -67,7 +68,7 @@ def post_pipeline_task(
f"{server}/{endpoint}",
files=files,
params=params,
data=dict(filename=pipeline_zip).update(**kwargs),
data=dict(pipeline=pipeline.data()).update(**kwargs),
stream=kwargs.get("stream", False),
)
......@@ -83,7 +84,7 @@ def post(
"""A generic post function."""
logger.info(f"Posting with data {kwargs}")
return bsession.post(
f"{server}/{endpoint}", json=kwargs, params=params, stream=stream
f"{server}/{endpoint}", json=kwargs, params=params, stream=stream, **kwargs
)
......@@ -98,7 +99,7 @@ def get(
"""A generic post function."""
logger.info(f"Posting with data {kwargs}")
return bsession.get(
f"{server}/{endpoint}", params=params, json=kwargs, stream=stream
f"{server}/{endpoint}", params=params, json=kwargs, stream=stream, **kwargs
)
......
from rich.logging import RichHandler
import logging
from rich.logging import RichHandler
from rich.console import Console
FORMAT = "%(message)s"
logging.basicConfig(
level="INFO",
format=FORMAT,
datefmt="[%X]", # handlers=[RichHandler()]
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
)
logger = logging.getLogger("bitia")
console = Console()
def set_logger_level(level_name: str):
"""Set the global logging level"""
assert level_name in ["INFO", "DEBUG", "WARNING"]
lvl = logging.getLevelName(level_name)
logger.setLevel(lvl)
for handler in logger.handlers:
handler.setLevel(lvl)
def cprint(*args, **kwargs):
"""Forward to rich.console.print"""
console.print(*args, **kwargs)
......@@ -55,6 +55,13 @@ class Pipeline:
def checksum(self) -> str:
return self.hash
def data(self) -> T.Dict[str, T.Any]:
"""Return the pipeline key properties. This is often sent to the
server."""
return dict(
zip_filename=self.zipfile.name, checksum=self.checksum, size=self.size
)
@classmethod
def from_command(cls, cmd: str, *, comment: str = "") -> "Pipeline":
"""Create a pipeline from user command. The semantics is a bit different
......
......@@ -2,11 +2,10 @@ from bitia.__main__ import run_user_input
def test_cli_sanity():
nlines = 0
for line in run_user_input("ls -ltr /"):
nlines += 1
lines = []
for line in run_user_input("ls -ltr /", output_lines=lines):
print(11, line)
assert nlines > 10, "Too few lines"
assert len(lines) > 10, "Too few lines"
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment