Newer
Older
"""BiTIA command line interface to submit job to the BiTIA server.
https://bitia.link
(c) 2022-, Subconscious Compute, https://subcom.tech
import subprocess
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from bitia.logger import logger, cprint, set_logger_level, console
from bitia import version as bversion
import bitia.pipeline as bpipeline
import bitia.session as bsession
import bitia.config as bconfig
app_cli = FastAPI()
origins = [
"http://localhost:3000",
"http://localhost:8000",
]
app_cli.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class VerbosityLevel(str, Enum):
debug = "debug"
info = "info"
warning = "warning"
def version_callback(value: bool):
"""callback for version"""
if value:
def session(func):
"""Load a session before and save the session after the function call"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
@app_cli.get("/create-container")
user_input,
*,
recreate: bool = False,
output_lines: T.List[str] = [],
):
"""Create container for the pipeline. The container starts running
immediately on the server. Use command `logs` to stream the output.
"""
# for a command pipeline, always create a new container.
if pipeline.is_a_command():
recreate = True
endpoint="container/create",
params=dict(recreate="true" if recreate else "false"),
_lines = (
track(res.iter_lines(), description="BiTIA is setting up required infra...")
if not bconfig.get_config("plain", default=False)
else res.iter_lines()
)
for line in _lines:
output_lines.append(line.decode().rstrip())
logger.info(output_lines[-1])
@app_cli.get("/list-container")
if not _list_remote_container(user_input):
for container in _list_remote_container(user_input):
cprint(container)
def _list_remote_container(user_input) -> T.List[str]:
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`")
logger.info(
f"Finding container for user input `{user_input}` with sha256sum={pipeline.checksum}"
)
@app.command("artifacts")
@session
def get_generated_artifacts(user_input):
# check if the containers for the corresponding pipeline exists
if not _list_remote_container(user_input):
cprint(
f"Artifacts for this pipeline: {user_input} doesn't exist, please run the pipeline using `bitia run` first..."
return "Artifacts for this pipeline doesn't exist, please run the pipeline using `bitia run` first..."
pipeline = bpipeline.user_input_to_pipeline(user_input)
res = bhelper.get(
endpoint="artifacts",
server=bconfig.get_server(),
params=dict(pipeline_sha256=pipeline.checksum),
)
res.raise_for_status()
path = res._content
assert path is not None
path = path.decode()[1:-1].rstrip()
cprint("Directory is being served at: ")
return f"Directory is being served at: {server}/{path}"
logger.info(
f"Finding container for user input {user_input} with sha256sum={pipeline.checksum}"
)
stream=True,
)
res.raise_for_status()
for line in res.iter_lines():
def submit_pipeline(user_input, *, rerun: bool = False, output_lines: T.List[str]):
"""Submit your pipelin (url, directory, zip_file).
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
res = create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
logger.info("Remote container: %s", res)
def run_user_input(user_input, *, rerun: bool = False, output_lines: T.List[str] = []):
create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
containers = _list_remote_container(user_input)
for container in containers:
for _bl in bhelper.log_container(container, server=bconfig.get_server()):
print(_bl.decode().rstrip())
@app.command("checksum")
def checksum(user_input):
pipeline = bpipeline.user_input_to_pipeline(user_input)
def main(
verbose: VerbosityLevel = typer.Option(
VerbosityLevel.warning, case_sensitive=False
),
bconfig.set_config("plain", plain)
bconfig.set_config("verbosity", verbose.value)
if server is not None:
bconfig.set_server(server)
elif os.environ.get("BITIA_SERVER"):
bconfig.set_server(os.environ["BITIA_SERVER"])
else:
@app.command()
def version():
"""version information"""
@app.command()
def ui():
"""spin up a frontend server for the cli"""
parent_dir = Path(__file__).parent.parent
ui_path = parent_dir / 'new-ui'
subprocess.run(['npm', 'run', 'dev'], cwd=ui_path)