From 26b5407e2d73f8fab1913743f83be981641143cf Mon Sep 17 00:00:00 2001 From: Dilawar Singh <dilawar@subcom.tech> Date: Wed, 5 Oct 2022 08:41:51 +0530 Subject: [PATCH] feat: adds request session support. --- bitia/__main__.py | 174 ++++++++++++++++++++++++++++++++----------- tests/test_sanity.py | 9 ++- 2 files changed, 138 insertions(+), 45 deletions(-) diff --git a/bitia/__main__.py b/bitia/__main__.py index dbe9006..5862c29 100644 --- a/bitia/__main__.py +++ b/bitia/__main__.py @@ -4,38 +4,38 @@ """ import validators +import functools import shutil import sys import typing as T -import requests +import requests, requests.utils +import pickle import zipfile -from pathlib import Path import tempfile -import bitia -from bitia.checksumdir import dirhash - +from pathlib import Path import logging + from rich.logging import RichHandler FORMAT = "%(message)s" + logging.basicConfig( level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()] ) +import bitia +from bitia.checksumdir import dirhash + logger = logging.getLogger("bitia") -g_default_bitia_main_script_name: T.Final[str] = "__main__.bitia.sh" +BITIA_MAIN_SCRIPT_NAME: T.Final[str] = "__main__.bitia.sh" +DEFAULT_SERVER: T.Final[str] = "https://public.bitia.link/api/v1" import typer app = typer.Typer() -def version_callback(value: bool): - if value: - print(bitia.version()) - - def bitia_dir() -> Path: """CLI cache directory""" bdir = Path(tempfile.gettempdir()) / "bitia" @@ -43,18 +43,91 @@ def bitia_dir() -> Path: return bdir +def _save_session(): + global g_session + with SESSION_PICKLE_FILE.open("wb") as fsession: + pickle.dump(g_session, fsession) + + +def _load_session(): + global g_session + if not SESSION_PICKLE_FILE.is_file(): + return None + with SESSION_PICKLE_FILE.open("rb") as fsession: + try: + g_session = pickle.load(fsession) + except Exception as e: + return None + + +g_session = requests.session() +SESSION_PICKLE_FILE = bitia_dir() / ".session.pickle" + + +def session(func): + """Load a session before and save the session after the function call""" + + @functools.wraps(func) + def wrapper(*args, **kwargs): + _load_session() + retval = func(*args, **kwargs) + _save_session() + return retval + + return wrapper + + +@app.command("submit") +@session +def submit_pipeline(user_input: str, server: str = DEFAULT_SERVER): + """Submit your pipelin (url, directory, zip_file). + + Prepare the user directory to send to the server. User can also provide link + to the pipeline to run. + """ + pipeline_zip = user_input_to_pipeline(user_input) + containers = submit_job(pipeline_zip, server=server) + return containers + + +@app.command("run") +@session +def run_pipeline(user_input: str, server: str = DEFAULT_SERVER): + """Run a pipeline""" + pipeline_zip = user_input_to_pipeline(user_input) + containers = submit_job(pipeline_zip, server=server) + return _log_container(containers, server=server) + + +@app.command() +def version(): + """version information""" + print(bitia.version()) + + +def _log_container(containers: T.List[T.Any], server: str): + for container in containers: + for line in _fetch_logs(container["Name"], server=server): + print(line.decode().rstrip()) + + +def _check_server_status(server: str) -> int: + res = requests.get(server) + return res.status_code + + +def version_callback(value: bool): + if value: + print(bitia.version()) + + def dir_info(user_dir: Path) -> dict: """Check if directory is in good condition.""" files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()] size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files) - if size_in_mb > 20: - logger.error( - "The size of pipeline is more than 20MB. Uploading this big pipeline is now allowed." - ) - quit(-1) - if size_in_mb > 10: + if size_in_mb > 25.0: logger.warning( - "The size of pipeline is >10MB ({size_in_mb} MB)." + "The size of pipeline is >25MB ({size_in_mb} MB)." " You should try to reduce the size of the pipeline. TODO: See this link." ) return dict(size_in_mb=size_in_mb, num_files=len(files), files=files) @@ -64,9 +137,12 @@ def prepare_archive(user_dir: Path) -> Path: """Prepare the file to upload. Store it in temp directory""" dinfo = dir_info(user_dir) dhash = dirhash(user_dir) + outfile = bitia_dir() / "pipelines" / f"{dhash}.zip" + if outfile.is_file(): + logger.info(f"Reusing the existing pipeline `{outfile}`.") + return outfile logger.info(f"Preparing the zipfile pipeline from {user_dir}") logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}") - outfile = bitia_dir() / "pipelines" / f"{dhash}.zip" outfile.parent.mkdir(parents=True, exist_ok=True) assert dinfo["files"], f"No file found in {user_dir}" with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile: @@ -87,11 +163,11 @@ def create_pipeline_from_single_script(script: Path) -> Path: """Create a pipelinefile from a single script""" assert script.is_file(), f"{script} is not a file" pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) - pipeline_file = pipeline_dir / g_default_bitia_main_script_name + pipeline_file = pipeline_dir / BITIA_MAIN_SCRIPT_NAME # move the script to this directory. shutil.copy2(script, pipeline_dir) script_name = script.name - with pipeline_file.open('w', newline='\n') as outf: + with pipeline_file.open("w", newline="\n") as outf: outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}") return prepare_archive(pipeline_dir) @@ -104,37 +180,42 @@ def create_pipeline_from_command(cmd: str) -> Path: The directory in which pipeline was created. """ pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) - pipeline_file = pipeline_dir / g_default_bitia_main_script_name - with pipeline_file.open('w', newline='\n') as outf: + pipeline_file = pipeline_dir / BITIA_MAIN_SCRIPT_NAME + with pipeline_file.open("w", newline="\n") as outf: outf.write(f"#!/bin/sh\n\n{cmd}") logging.info("Wrote pipeline %s", pipeline_file.read_text()) return prepare_archive(pipeline_dir) -def submit_job(pipeline_zip: Path, server: str): +def submit_job(pipeline_zip: Path, *, server: str): """Submit job to the API and stream the output.""" - session = requests.Session() numbytes = pipeline_zip.stat().st_size + if (code := _check_server_status(server)) != 200: + logger.warning( + "%s may not be alive (status code: %s). Try other one?", server, code + ) + sys.exit(1) assert numbytes > 0 logger.info( - f"Submitting {pipeline_zip} (size={numbytes/1024.0:.2f} KB) to the {server}" - ) - files = {"pipeline_zip": open(str(pipeline_zip), "rb")} - response = session.post( - f"{server}/api/v1/submit", files=files, data=dict(filename=pipeline_zip), stream=True + "Submitting %s (size=%.2f KB) to the %s", + pipeline_zip, + numbytes / 1024.0, + server, ) - for line in response.iter_lines(): - print(line.decode()) + # submit and print the output. + with pipeline_zip.open("rb") as f_pipeline: + files = {"pipeline_zip": f_pipeline} + response = g_session.post( + f"{server}/submit", + files=files, + data=dict(filename=pipeline_zip), + ) + return response.json() -@app.command("run") -@app.command("submit") -def run_pipeline(user_input: str, server: str = "https://public.bitia.link"): - """Submit your pipelin (url, directory, zip_file). - Prepare the user directory to send to the server. User can also provide link - to the pipeline to run. - """ +def user_input_to_pipeline(user_input: str) -> Path: + """Create a pipeline file from user_input""" if (path := Path(user_input)).exists(): if path.is_dir(): pipeline_zip = prepare_archive(path) @@ -151,14 +232,19 @@ def run_pipeline(user_input: str, server: str = "https://public.bitia.link"): # generate a temporary pipeline and submit. pipeline_zip = create_pipeline_from_command(user_input) logging.info(f"Created pipeline in {pipeline_zip}") - submit_job(pipeline_zip, server) + return pipeline_zip -@app.command() -def version(): - """version information""" - print(bitia.version()) +def _fetch_logs(container: str, *, server): + logger.info(f"Fetching logs for container `{container}`") + return g_session.post( + f"{server}/logs", + json=dict(container=container, follow=True, tail=-1), + stream=True, + ) if __name__ == "__main__": + _load_session() app() + _save_session() diff --git a/tests/test_sanity.py b/tests/test_sanity.py index 6049d4c..a5fe11d 100644 --- a/tests/test_sanity.py +++ b/tests/test_sanity.py @@ -7,5 +7,12 @@ import subprocess def test_sanity(): assert bitia.version() + def test_run_simple(): - bitia.__main__.run_pipeline("ls -ltr /") + output = bitia.__main__.run_pipeline("ls -ltr /") + print(output) + + +if __name__ == "__main__": + test_sanity() + test_run_simple() -- GitLab