Skip to content
Snippets Groups Projects
__main__.py 7.02 KiB
Newer Older
"""BiTIA command line interface to submit job to the BiTIA server.

dilawar's avatar
dilawar committed
https://bitia.link

(c) 2022-, Subconscious Compute, https://subcom.tech
dilawar's avatar
dilawar committed
import typing as T
import functools
dilawar's avatar
dilawar committed
from enum import Enum
import subprocess
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
dilawar's avatar
dilawar committed

from rich.progress import track
from pathlib import Path
dilawar's avatar
dilawar committed
import bitia.helper as bhelper
dilawar's avatar
dilawar committed
from bitia.logger import logger, cprint, set_logger_level, console
dilawar's avatar
dilawar committed
from bitia import version as bversion
import bitia.pipeline as bpipeline
dilawar's avatar
dilawar committed
import bitia.session as bsession
import bitia.config as bconfig
import typer

app = typer.Typer()

app_cli = FastAPI()
origins = [
    "http://localhost:3000",
    "http://localhost:8000",
]

app_cli.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
dilawar's avatar
dilawar committed
class VerbosityLevel(str, Enum):
    debug = "debug"
    info = "info"
    warning = "warning"


dilawar's avatar
dilawar committed
def version_callback(value: bool):
    """callback for version"""
    if value:
dilawar's avatar
dilawar committed
        print(version())


def session(func):
    """Load a session before and save the session after the function call"""

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
dilawar's avatar
dilawar committed
        bsession.load_session()
        retval = func(*args, **kwargs)
dilawar's avatar
dilawar committed
        bsession.save_session()
        return retval

    return wrapper

@app_cli.get("/create-container")
dilawar's avatar
dilawar committed
@app.command("create-container")
@session
dilawar's avatar
dilawar committed
def create_remote_container(
dilawar's avatar
dilawar committed
    user_input,
    *,
    recreate: bool = False,
    output_lines: T.List[str] = [],
dilawar's avatar
dilawar committed
):
    """Create container for the pipeline. The container starts running
    immediately on the server. Use command `logs` to stream the output.
    """
dilawar's avatar
dilawar committed
    pipeline = bpipeline.user_input_to_pipeline(user_input)
dilawar's avatar
dilawar committed
    # for a command pipeline, always create a new container.
    if pipeline.is_a_command():
        recreate = True
dilawar's avatar
dilawar committed
    res = bhelper.post_pipeline_task(
dilawar's avatar
dilawar committed
        pipeline,
dilawar's avatar
dilawar committed
        endpoint="container/create",
        params=dict(recreate="true" if recreate else "false"),
dilawar's avatar
dilawar committed
        server=bconfig.get_server(),
dilawar's avatar
dilawar committed
        stream=True,
    )
    res.raise_for_status()
dilawar's avatar
dilawar committed
    _lines = (
        track(res.iter_lines(), description="BiTIA is setting up required infra...")
        if not bconfig.get_config("plain", default=False)
        else res.iter_lines()
    )
    for line in _lines:
dilawar's avatar
dilawar committed
        output_lines.append(line.decode().rstrip())
        logger.info(output_lines[-1])
dilawar's avatar
dilawar committed
    return res
dilawar's avatar
dilawar committed

@app_cli.get("/list-container")
dilawar's avatar
dilawar committed
@app.command("list-container")
@session
dilawar's avatar
dilawar committed
def list_containers(user_input):
dilawar's avatar
dilawar committed
    """List the remote server associated with the pipeline."""
snapdgn's avatar
snapdgn committed
    if not _list_remote_container(user_input):
        for container in _list_remote_container(user_input):
            cprint(container)
            return container
dilawar's avatar
dilawar committed


def _list_remote_container(user_input) -> T.List[str]:
dilawar's avatar
dilawar committed
    pipeline = bpipeline.user_input_to_pipeline(user_input)
    logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`")
dilawar's avatar
dilawar committed
    logger.info(
        f"Finding container for user input `{user_input}` with sha256sum={pipeline.checksum}"
    )
dilawar's avatar
dilawar committed
    res = bhelper.get(
        endpoint="container/list",
dilawar's avatar
dilawar committed
        server=bconfig.get_server(),
dilawar's avatar
dilawar committed
        params=dict(pipeline_sha256=pipeline.checksum),
dilawar's avatar
dilawar committed
    )
snapdgn's avatar
snapdgn committed
    if res.status_code != 200:
        return []
dilawar's avatar
dilawar committed
    return res.json()["containers"].split(",")
dilawar's avatar
dilawar committed

@app_cli.get("/artifacts")
@app.command("artifacts")
@session
def get_generated_artifacts(user_input):
snapdgn's avatar
snapdgn committed
    # check if the containers for the corresponding pipeline exists
    if not _list_remote_container(user_input):
        cprint(
                f"Artifacts for this pipeline: {user_input} doesn't exist, please run the pipeline using `bitia run` first..."
snapdgn's avatar
snapdgn committed
        )
        return "Artifacts for this pipeline doesn't exist, please run the pipeline using `bitia run` first..."
    pipeline = bpipeline.user_input_to_pipeline(user_input)
    res = bhelper.get(
        endpoint="artifacts",
        server=bconfig.get_server(),
        params=dict(pipeline_sha256=pipeline.checksum),
    )
    res.raise_for_status()
    path = res._content
    assert path is not None
    path = path.decode()[1:-1].rstrip()
snapdgn's avatar
snapdgn committed
    server = bconfig.get_server().rstrip("/")
    cprint("Directory is being served at: ")
snapdgn's avatar
snapdgn committed
    cprint(f"{server}/{path}")
    return f"Directory is being served at: {server}/{path}"
@app_cli.get("/logs")
dilawar's avatar
dilawar committed
@app.command("logs")
@session
dilawar's avatar
dilawar committed
def stream_log(user_input):
dilawar's avatar
dilawar committed
    """Stream logs for the most recent run of a given pipeline."""
dilawar's avatar
dilawar committed
    pipeline = bpipeline.user_input_to_pipeline(user_input)
dilawar's avatar
dilawar committed
    logger.info(
        f"Finding container for user input {user_input} with sha256sum={pipeline.checksum}"
    )
dilawar's avatar
dilawar committed
    res = bhelper.get(
        endpoint="logs",
dilawar's avatar
dilawar committed
        params=dict(pipeline_sha256=pipeline.checksum),
dilawar's avatar
dilawar committed
        server=bconfig.get_server(),
dilawar's avatar
dilawar committed
        stream=True,
    )
    res.raise_for_status()
    for line in res.iter_lines():
dilawar's avatar
dilawar committed
        cprint(line.decode().rstrip())
dilawar's avatar
dilawar committed

@app_cli.get("/submit")
@app.command("submit")
@session
dilawar's avatar
dilawar committed
def submit_pipeline(user_input, *, rerun: bool = False, output_lines: T.List[str]):
    """Submit your pipelin (url, directory, zip_file).

    Prepare the user directory to send to the server. User can also provide link
    to the pipeline to run.
    """
dilawar's avatar
dilawar committed
    res = create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
    logger.info("Remote container: %s", res)
dilawar's avatar
dilawar committed
    containers = _list_remote_container(user_input)
dilawar's avatar
dilawar committed
    cprint(f"{containers}")
    return containers

@app_cli.get("/run")
@app.command("run")
@session
dilawar's avatar
dilawar committed
def run_user_input(user_input, *, rerun: bool = False, output_lines: T.List[str] = []):
    """Run a pipeline"""
dilawar's avatar
dilawar committed
    create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
dilawar's avatar
dilawar committed
    containers = _list_remote_container(user_input)
    for container in containers:
        for _bl in bhelper.log_container(container, server=bconfig.get_server()):
            print(_bl.decode().rstrip())
@app_cli.get("/checksum")
dilawar's avatar
dilawar committed
@app.command("checksum")
def checksum(user_input):
    pipeline = bpipeline.user_input_to_pipeline(user_input)
dilawar's avatar
dilawar committed
    cprint(pipeline.checksum)
    return pipeline.checksum
@app.callback()
dilawar's avatar
dilawar committed
def main(
    verbose: VerbosityLevel = typer.Option(
        VerbosityLevel.warning, case_sensitive=False
    ),
dilawar's avatar
dilawar committed
    plain: bool = False,
dilawar's avatar
dilawar committed
    server: T.Optional[str] = None,
):
dilawar's avatar
dilawar committed
    bconfig.set_config("plain", plain)
    bconfig.set_config("verbosity", verbose.value)
dilawar's avatar
dilawar committed
    set_logger_level(verbose.value)

    if server is not None:
        bconfig.set_server(server)
    elif os.environ.get("BITIA_SERVER"):
        bconfig.set_server(os.environ["BITIA_SERVER"])
    else:
dilawar's avatar
dilawar committed
        # TODO: Read from default config file.
dilawar's avatar
dilawar committed
    cprint(f"Using server {bconfig.get_server()}")
@app_cli.get("/version")
@app.command()
def version():
    """version information"""
dilawar's avatar
dilawar committed
    cprint(bversion())
    return bversion()
@app.command()
def ui():
    """spin up a frontend server for the cli"""
    parent_dir = Path(__file__).parent.parent
    ui_path = parent_dir / 'new-ui'
    subprocess.run(['npm', 'run', 'dev'], cwd=ui_path)
if __name__ == "__main__":
    app()