From cd009379aad7b8f4cb410f5c191705bd56f660b5 Mon Sep 17 00:00:00 2001 From: Dilawar Singh <dilawar@subcom.tech> Date: Thu, 6 Oct 2022 11:42:18 +0530 Subject: [PATCH] refactor: major refactor. --- bitia/__main__.py | 233 +++++++--------------------------------------- bitia/config.py | 13 +++ bitia/helper.py | 144 ++++++++++++++++++++++++++++ bitia/logger.py | 10 ++ bitia/session.py | 42 +++++++++ 5 files changed, 241 insertions(+), 201 deletions(-) create mode 100644 bitia/config.py create mode 100644 bitia/logger.py create mode 100644 bitia/session.py diff --git a/bitia/__main__.py b/bitia/__main__.py index 8ac9a68..877f61a 100644 --- a/bitia/__main__.py +++ b/bitia/__main__.py @@ -3,65 +3,25 @@ (c) 2022-, Subconscious Compute """ -import validators import functools -import shutil -import sys -import typing as T -import requests, requests.utils -import pickle -import zipfile -import tempfile -from pathlib import Path -import logging -from rich.logging import RichHandler - -FORMAT = "%(message)s" - -logging.basicConfig( - level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()] -) +import bitia.helper as bhelper +from bitia.logger import logger import bitia -from bitia.checksumdir import dirhash +import bitia.session as bsession +import bitia.config as bconfig -logger = logging.getLogger("bitia") - -BITIA_MAIN_SCRIPT_NAME: T.Final[str] = "__main__.bitia.sh" -DEFAULT_SERVER: T.Final[str] = "https://public.bitia.link/api/v1" import typer app = typer.Typer() -def bitia_dir() -> Path: - """CLI cache directory""" - bdir = Path(tempfile.gettempdir()) / "bitia" - bdir.mkdir(parents=True, exist_ok=True) - return bdir - - -def _save_session(): - global g_session - with SESSION_PICKLE_FILE.open("wb") as fsession: - pickle.dump(g_session, fsession) - - -def _load_session(): - global g_session - if not SESSION_PICKLE_FILE.is_file(): - return None - with SESSION_PICKLE_FILE.open("rb") as fsession: - try: - g_session = pickle.load(fsession) - except Exception as e: - return None - - -g_session = requests.session() -SESSION_PICKLE_FILE = bitia_dir() / ".session.pickle" +def version_callback(value: bool): + """callback for version""" + if value: + print(bitia.version()) def session(func): @@ -69,9 +29,9 @@ def session(func): @functools.wraps(func) def wrapper(*args, **kwargs): - _load_session() + bsession.load_session() retval = func(*args, **kwargs) - _save_session() + bsession.save_session() return retval return wrapper @@ -79,25 +39,40 @@ def session(func): @app.command("submit") @session -def submit_pipeline(user_input: str, rerun: bool = False, server: str = DEFAULT_SERVER): +def submit_pipeline( + user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER +): """Submit your pipelin (url, directory, zip_file). Prepare the user directory to send to the server. User can also provide link to the pipeline to run. """ - pipeline_zip = user_input_to_pipeline(user_input) - containers = submit_job(pipeline_zip, rerun=rerun, server=server) + pipeline_zip = bhelper.user_input_to_pipeline(user_input) + containers = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server) logger.info(f" Got containers {containers}") return containers @app.command("run") @session -def run_pipeline(user_input: str, rerun: bool = False, server: str = DEFAULT_SERVER): +def run_pipeline( + user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER +): + """Run a pipeline""" + pipeline_zip = bhelper.user_input_to_pipeline(user_input) + res = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server) + return bhelper.log_container(res["container"], server=server) + + +@app.command("create") +@session +def create_remote_infra( + user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER +): """Run a pipeline""" - pipeline_zip = user_input_to_pipeline(user_input) - res = submit_job(pipeline_zip, rerun=rerun, server=server) - return _log_container(res["container"], server=server) + pipeline_zip = bhelper.user_input_to_pipeline(user_input) + res = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server) + return bhelper.log_container(res["container"], server=server) @app.command() @@ -106,149 +81,5 @@ def version(): print(bitia.version()) -def _log_container(container: str, server: str): - assert ( - container is not None - ), "Failed to determine the container that is runnning the pipeline. There is probably a bug in server end." - logger.info(f"Fetching logs from {container}") - for line in _fetch_logs(container, server=server): - print(line.decode().rstrip()) - - -def _check_server_status(server: str) -> int: - res = requests.get(server) - return res.status_code - - -def version_callback(value: bool): - if value: - print(bitia.version()) - - -def dir_info(user_dir: Path) -> dict: - """Check if directory is in good condition.""" - files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()] - size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files) - if size_in_mb > 25.0: - logger.warning( - "The size of pipeline is >25MB ({size_in_mb} MB)." - " You should try to reduce the size of the pipeline. TODO: See this link." - ) - return dict(size_in_mb=size_in_mb, num_files=len(files), files=files) - - -def prepare_archive(user_dir: Path) -> Path: - """Prepare the file to upload. Store it in temp directory""" - dinfo = dir_info(user_dir) - dhash = dirhash(user_dir) - outfile = bitia_dir() / "pipelines" / f"{dhash}.zip" - if outfile.is_file(): - logger.info(f"Reusing the existing pipeline `{outfile}`.") - return outfile - logger.info(f"Preparing the zipfile pipeline from {user_dir}") - logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}") - outfile.parent.mkdir(parents=True, exist_ok=True) - assert dinfo["files"], f"No file found in {user_dir}" - with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile: - for entry in dinfo["files"]: - logger.info(f"Adding {entry} to zipfile") - zfile.write(entry) - - # check the prepared zip file. - with zipfile.ZipFile(outfile) as zfile: - assert zfile.namelist(), "Empty zipfile" - - # assert non-zero size of the zip file. - assert outfile.is_file(), f"{outfile} does not exists" - return outfile - - -def create_pipeline_from_single_script(script: Path) -> Path: - """Create a pipelinefile from a single script""" - assert script.is_file(), f"{script} is not a file" - pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) - pipeline_file = pipeline_dir / BITIA_MAIN_SCRIPT_NAME - # move the script to this directory. - shutil.copy2(script, pipeline_dir) - script_name = script.name - with pipeline_file.open("w", newline="\n") as outf: - outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}") - return prepare_archive(pipeline_dir) - - -def create_pipeline_from_command(cmd: str) -> Path: - """Create a pipeline from user input. - - Returns - ------- - The directory in which pipeline was created. - """ - pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) - pipeline_file = pipeline_dir / BITIA_MAIN_SCRIPT_NAME - with pipeline_file.open("w", newline="\n") as outf: - outf.write(f"#!/bin/sh\n\n{cmd}") - logger.info("Wrote pipeline %s", pipeline_file.read_text()) - return prepare_archive(pipeline_dir) - - -def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False): - """Submit job to the API and stream the output.""" - numbytes = pipeline_zip.stat().st_size - if (code := _check_server_status(server)) != 200: - logger.warning( - "%s may not be alive (status code: %s). Try other one?", server, code - ) - sys.exit(1) - assert numbytes > 0 - logger.info( - "Submitting %s (size=%.2f KB) to the %s", - pipeline_zip, - numbytes / 1024.0, - server, - ) - - # submit and print the output. - with pipeline_zip.open("rb") as f_pipeline: - files = {"pipeline_zip": f_pipeline} - response = g_session.post( - f"{server}/submit/?rerun={rerun}", - files=files, - data=dict(filename=pipeline_zip, rerun=rerun), - ) - return response.json() - - -def user_input_to_pipeline(user_input: str) -> Path: - """Create a pipeline file from user_input""" - if (path := Path(user_input)).exists(): - if path.is_dir(): - pipeline_zip = prepare_archive(path) - elif path.is_file() and path.suffix.lower() == ".zip": - pipeline_zip = path - elif path.is_file(): - pipeline_zip = create_pipeline_from_single_script(path) - else: - raise NotImplementedError(f"{path} is not yet supported") - elif validators.url(user_input): - logger.warning("Fetching pipeline from url is not supported") - sys.exit(-1) - else: - # generate a temporary pipeline and submit. - pipeline_zip = create_pipeline_from_command(user_input) - logging.info(f"Created pipeline in {pipeline_zip}") - return pipeline_zip - - -def _fetch_logs(container: str, *, server): - logger.info(f"Fetching logs for container `{container}`") - return g_session.post( - f"{server}/logs", - json=dict(container=container, follow=True), - stream=True, - ) - - if __name__ == "__main__": - _load_session() app() - _save_session() diff --git a/bitia/config.py b/bitia/config.py new file mode 100644 index 0000000..4afc627 --- /dev/null +++ b/bitia/config.py @@ -0,0 +1,13 @@ +import typing as T +from pathlib import Path +import tempfile + +BITIA_MAIN_SCRIPT_NAME: T.Final[str] = "__main__.bitia.sh" +DEFAULT_SERVER: T.Final[str] = "https://public.bitia.link/api/v1" + + +def bitia_dir() -> Path: + """CLI cache directory""" + bdir = Path(tempfile.gettempdir()) / "bitia" + bdir.mkdir(parents=True, exist_ok=True) + return bdir diff --git a/bitia/helper.py b/bitia/helper.py index 1bdd23f..6d468fc 100644 --- a/bitia/helper.py +++ b/bitia/helper.py @@ -1,2 +1,146 @@ +"""Helper module""" + __author__ = "Dilawar Singh" __email__ = "dilawar@subcom.tech" + +import sys +import zipfile +import shutil +from pathlib import Path +import tempfile + +import validators +import requests + +import bitia.config as bconfig +import bitia.session as bsession +from bitia.checksumdir import dirhash +from bitia.logger import logger + + +def log_container(container: str, server: str): + assert ( + container + ), "Failed to determine the container that is runnning the pipeline. There is probably a bug in server end." + logger.info(f"Fetching logs from {container}") + for line in bsession.fetch_logs(container, server=server): + print(line.decode().rstrip()) + + +def _check_server_status(server: str) -> int: + res = requests.get(server) + return res.status_code + + +def dir_info(user_dir: Path) -> dict: + """Check if directory is in good condition.""" + files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()] + size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files) + if size_in_mb > 25.0: + logger.warning( + "The size of pipeline is >25MB ({size_in_mb} MB)." + " You should try to reduce the size of the pipeline. TODO: See this link." + ) + return dict(size_in_mb=size_in_mb, num_files=len(files), files=files) + + +def prepare_archive(user_dir: Path) -> Path: + """Prepare the file to upload. Store it in temp directory""" + dinfo = dir_info(user_dir) + dhash = dirhash(user_dir) + outfile = bconfig.bitia_dir() / "pipelines" / f"{dhash}.zip" + if outfile.is_file(): + logger.info(f"Reusing the existing pipeline `{outfile}`.") + return outfile + logger.info(f"Preparing the zipfile pipeline from {user_dir}") + logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}") + outfile.parent.mkdir(parents=True, exist_ok=True) + assert dinfo["files"], f"No file found in {user_dir}" + with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile: + for entry in dinfo["files"]: + logger.info(f"Adding {entry} to zipfile") + zfile.write(entry) + + # check the prepared zip file. + with zipfile.ZipFile(outfile) as zfile: + assert zfile.namelist(), "Empty zipfile" + + # assert non-zero size of the zip file. + assert outfile.is_file(), f"{outfile} does not exists" + return outfile + + +def create_pipeline_from_single_script(script: Path) -> Path: + """Create a pipelinefile from a single script""" + assert script.is_file(), f"{script} is not a file" + pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) + pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME + # move the script to this directory. + shutil.copy2(script, pipeline_dir) + script_name = script.name + with pipeline_file.open("w", newline="\n") as outf: + outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}") + return prepare_archive(pipeline_dir) + + +def create_pipeline_from_command(cmd: str) -> Path: + """Create a pipeline from user input. + + Returns + ------- + The directory in which pipeline was created. + """ + pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) + pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME + with pipeline_file.open("w", newline="\n") as outf: + outf.write(f"#!/bin/sh\n\n{cmd}") + logger.info("Wrote pipeline %s", pipeline_file.read_text()) + return prepare_archive(pipeline_dir) + + +def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False): + """Submit job to the API and stream the output.""" + numbytes = pipeline_zip.stat().st_size + if (code := _check_server_status(server)) != 200: + logger.warning( + "%s may not be alive (status code: %s). Try other one?", server, code + ) + sys.exit(1) + assert numbytes > 0 + logger.info( + "Submitting %s (size=%.2f KB) to the %s", + pipeline_zip, + numbytes / 1024.0, + server, + ) + + # submit and print the output. + with pipeline_zip.open("rb") as f_pipeline: + files = {"pipeline_zip": f_pipeline} + response = bsession.post( + f"{server}/submit/?rerun={rerun}", + files=files, + data=dict(filename=pipeline_zip, rerun=rerun), + ) + return response.json() + + +def user_input_to_pipeline(user_input: str) -> Path: + """Create a pipeline file from user_input""" + if (path := Path(user_input)).exists(): + if path.is_dir(): + pipeline_zip = prepare_archive(path) + elif path.is_file() and path.suffix.lower() == ".zip": + pipeline_zip = path + elif path.is_file(): + pipeline_zip = create_pipeline_from_single_script(path) + else: + raise NotImplementedError(f"{path} is not yet supported") + elif validators.url(user_input): + logger.warning("Fetching pipeline from url is not supported") + sys.exit(-1) + else: + # generate a temporary pipeline and submit. + pipeline_zip = create_pipeline_from_command(user_input) + logger.info(f"Created pipeline in {pipeline_zip}") + return pipeline_zip diff --git a/bitia/logger.py b/bitia/logger.py new file mode 100644 index 0000000..30eadc1 --- /dev/null +++ b/bitia/logger.py @@ -0,0 +1,10 @@ +from rich.logging import RichHandler +import logging + +FORMAT = "%(message)s" + +logging.basicConfig( + level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()] +) + +logger = logging.getLogger("bitia") diff --git a/bitia/session.py b/bitia/session.py new file mode 100644 index 0000000..6851c52 --- /dev/null +++ b/bitia/session.py @@ -0,0 +1,42 @@ +"""API module. Uses session to avoid redundant connections""" +import requests +import pickle + +import bitia.config as bconfig +from bitia.logger import logger + +g_session = requests.session() +SESSION_PICKLE_FILE = bconfig.bitia_dir() / ".session.pickle" + + +def fetch_logs(container: str, *, server): + """Fetch logs from a container.""" + logger.info(f"Fetching logs for container `{container}`") + return g_session.post( + f"{server}/logs", + json=dict(container=container, follow=True), + stream=True, + ) + + +def save_session(): + """Save the current requests session as pickle""" + global g_session + with SESSION_PICKLE_FILE.open("wb") as fsession: + pickle.dump(g_session, fsession) + + +def load_session(): + """Load the pickled session.""" + global g_session + if not SESSION_PICKLE_FILE.is_file(): + return None + with SESSION_PICKLE_FILE.open("rb") as fsession: + try: + g_session = pickle.load(fsession) + except Exception: + return None + + +def post(*args, **kwargs): + return g_session.post(*args, **kwargs) -- GitLab