Skip to content
Snippets Groups Projects

version 0.2.0

Merged dilawar requested to merge devel into main
Files
17
+ 143
126
"""BiTIA command line interface to submit job to the BiTIA server.
(c) 2022-, Subconscious Compute
https://bitia.link
(c) 2022-, Subconscious Compute, https://subcom.tech
"""
import validators
import shutil
import sys
import typing as T
import requests
import zipfile
from pathlib import Path
import tempfile
import bitia
from bitia.checksumdir import dirhash
import os
import functools
from enum import Enum
import logging
from rich.logging import RichHandler
from rich.progress import track
FORMAT = "%(message)s"
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
)
import bitia.helper as bhelper
from bitia.logger import logger, cprint, set_logger_level
logger = logging.getLogger("bitia")
from bitia import version as bversion
import bitia.pipeline as bpipeline
import bitia.session as bsession
import bitia.config as bconfig
g_default_bitia_main_script_name: T.Final[str] = "__main__.bitia.sh"
import typer
app = typer.Typer()
class VerbosityLevel(str, Enum):
debug = "debug"
info = "info"
warning = "warning"
def version_callback(value: bool):
"""callback for version"""
if value:
print(bitia.version())
def bitia_dir() -> Path:
"""CLI cache directory"""
bdir = Path(tempfile.gettempdir()) / "bitia"
bdir.mkdir(parents=True, exist_ok=True)
return bdir
def dir_info(user_dir: Path) -> dict:
"""Check if directory is in good condition."""
files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()]
size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files)
if size_in_mb > 20:
logger.error(
"The size of pipeline is more than 20MB. Uploading this big pipeline is now allowed."
)
quit(-1)
if size_in_mb > 10:
logger.warning(
"The size of pipeline is >10MB ({size_in_mb} MB)."
" You should try to reduce the size of the pipeline. TODO: See this link."
)
return dict(size_in_mb=size_in_mb, num_files=len(files), files=files)
def prepare_archive(user_dir: Path) -> Path:
"""Prepare the file to upload. Store it in temp directory"""
dinfo = dir_info(user_dir)
dhash = dirhash(user_dir)
logger.info(f"Preparing the zipfile pipeline from {user_dir}")
logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}")
outfile = bitia_dir() / "pipelines" / f"{dhash}.zip"
outfile.parent.mkdir(parents=True, exist_ok=True)
assert dinfo["files"], f"No file found in {user_dir}"
with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile:
for entry in dinfo["files"]:
logger.info(f"Adding {entry} to zipfile")
zfile.write(entry)
# check the prepared zip file.
with zipfile.ZipFile(outfile) as zfile:
assert zfile.namelist(), "Empty zipfile"
# assert non-zero size of the zip file.
assert outfile.is_file(), f"{outfile} does not exists"
return outfile
def create_pipeline_from_single_script(script: Path) -> Path:
"""Create a pipelinefile from a single script"""
assert script.is_file(), f"{script} is not a file"
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / g_default_bitia_main_script_name
# move the script to this directory.
shutil.copy2(script, pipeline_dir)
script_name = script.name
with pipeline_file.open('w', newline='\n') as outf:
outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}")
return prepare_archive(pipeline_dir)
def create_pipeline_from_command(cmd: str) -> Path:
"""Create a pipeline from user input.
Returns
-------
The directory in which pipeline was created.
print(version())
def session(func):
"""Load a session before and save the session after the function call"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
bsession.load_session()
retval = func(*args, **kwargs)
bsession.save_session()
return retval
return wrapper
@app.command("create-container")
@session
def create_remote_container(
user_input, *, recreate: bool = False, output_lines: T.List[str] = []
):
"""Create container for the pipeline. The container starts running
immediately on the server. Use command `logs` to stream the output.
"""
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / g_default_bitia_main_script_name
with pipeline_file.open('w', newline='\n') as outf:
outf.write(f"#!/bin/sh\n\n{cmd}")
logging.info("Wrote pipeline %s", pipeline_file.read_text())
return prepare_archive(pipeline_dir)
def submit_job(pipeline_zip: Path, server: str):
"""Submit job to the API and stream the output."""
session = requests.Session()
numbytes = pipeline_zip.stat().st_size
assert numbytes > 0
logger.info(
f"Submitting {pipeline_zip} (size={numbytes/1024.0:.2f} KB) to the {server}"
pipeline = bpipeline.user_input_to_pipeline(user_input)
# for a command pipeline, always create a new container.
if pipeline.is_a_command():
recreate = True
res = bhelper.post_pipeline_task(
pipeline,
endpoint="container/create",
params=dict(recreate="true" if recreate else "false"),
server=bconfig.get_server(),
stream=True,
)
files = {"pipeline_zip": open(str(pipeline_zip), "rb")}
response = session.post(
f"{server}/api/v1/submit", files=files, data=dict(filename=pipeline_zip), stream=True
res.raise_for_status()
for line in track(
res.iter_lines(), description="BiTIA is setting up required infra..."
):
output_lines.append(line.decode().rstrip())
logger.info(output_lines[-1])
return res
@app.command("list-container")
@session
def list_remote_container(
user_input, server: str = bconfig.get_server()
) -> T.List[str]:
"""List the remote server associated with the pipeline."""
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`")
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
res = bhelper.get(
endpoint="container/list",
server=bconfig.get_server(),
params=dict(pipeline_sha256=pipeline.checksum),
)
for line in response.iter_lines():
print(line.decode())
res.raise_for_status()
containers = res.json()["containers"].split(",")
cprint(containers)
return containers
@app.command("logs")
@session
def stream_log(user_input, server: str = bconfig.get_server()):
"""Stream logs for the most recent run of a given pipeline."""
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
res = bhelper.get(
endpoint="logs",
params=dict(pipeline_sha256=pipeline.checksum),
server=bconfig.get_server(),
stream=True,
)
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
@app.command("run")
@app.command("submit")
def run_pipeline(user_input: str, server: str = "https://public.bitia.link"):
@session
def submit_pipeline(user_input, *, rerun: bool = False, output_lines: T.List[str]):
"""Submit your pipelin (url, directory, zip_file).
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
if (path := Path(user_input)).exists():
if path.is_dir():
pipeline_zip = prepare_archive(path)
elif path.is_file() and path.suffix.lower() == ".zip":
pipeline_zip = path
elif path.is_file():
pipeline_zip = create_pipeline_from_single_script(path)
else:
raise NotImplementedError(f"{path} is not yet supported")
elif validators.url(user_input):
logger.warning("Fetching pipeline from url is not supported")
sys.exit(-1)
res = create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
logger.info("Remote container: %s", res)
containers = list_remote_container(user_input)
cprint(f"{containers}")
return containers
@app.command("run")
@session
def run_user_input(user_input, *, rerun: bool = False, output_lines: T.List[str] = []):
"""Run a pipeline"""
create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
containers = list_remote_container(user_input)
return [
bhelper.log_container(container, server=bconfig.get_server())
for container in containers
]
@app.command("checksum")
def checksum(user_input):
pipeline = bpipeline.user_input_to_pipeline(user_input)
cprint(pipeline.checksum)
@app.callback()
def main(
verbose: VerbosityLevel = typer.Option(
VerbosityLevel.warning, case_sensitive=False
),
server: T.Optional[str] = None,
):
"""
Callback
"""
set_logger_level(verbose.value)
if server is not None:
bconfig.set_server(server)
elif os.environ.get("BITIA_SERVER") is not None:
bconfig.set_server(os.environ["BITIA_SERVER"])
else:
# generate a temporary pipeline and submit.
pipeline_zip = create_pipeline_from_command(user_input)
logging.info(f"Created pipeline in {pipeline_zip}")
submit_job(pipeline_zip, server)
# TODO: Read from default config file.
pass
cprint(f"Using server {bconfig.get_server()}")
@app.command()
def version():
"""version information"""
print(bitia.version())
cprint(bversion())
if __name__ == "__main__":
Loading