diff --git a/bitia/__main__.py b/bitia/__main__.py index ca99960fa210749b5338c9a8d17e32b1463d97d9..16f88530bacfb05e85498add1cb469050d1e54a6 100644 --- a/bitia/__main__.py +++ b/bitia/__main__.py @@ -12,7 +12,8 @@ import functools import bitia.helper as bhelper from bitia.logger import logger -import bitia +from bitia import version as bversion +import bitia.pipeline as bpipeline import bitia.session as bsession import bitia.config as bconfig @@ -25,7 +26,7 @@ app = typer.Typer() def version_callback(value: bool): """callback for version""" if value: - print(bitia.version()) + print(version()) def session(func): @@ -45,9 +46,9 @@ def session(func): @session def create_remote_container(user_input: str, recreate: bool = False): """Run a pipeline""" - pipeline_zip, _ = bhelper.user_input_to_pipeline(user_input) + pipeline = bpipeline.user_input_to_pipeline(user_input) res = bhelper.post_pipeline_task( - pipeline_zip, + pipeline.zipfile, endpoint="container/create", params=dict(recreate="true" if recreate else "false"), server=bconfig.server(), @@ -56,6 +57,7 @@ def create_remote_container(user_input: str, recreate: bool = False): res.raise_for_status() for line in res.iter_lines(): print(line.decode().rstrip()) + return res @app.command("list-container") @@ -64,12 +66,13 @@ def list_remote_container( user_input: str, server: str = bconfig.g_server ) -> T.List[str]: """List the remote server associated with the pipeline.""" - _, pipeline_hash = bhelper.user_input_to_pipeline(user_input) - logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}") + pipeline = bpipeline.user_input_to_pipeline(user_input) + logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`") + logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}") res = bhelper.get( endpoint="container/list", server=bconfig.server(), - params=dict(pipeline_sha256=pipeline_hash), + params=dict(pipeline_sha256=pipeline.checksum), ) res.raise_for_status() return res.json()["containers"].split(",") @@ -79,11 +82,11 @@ def list_remote_container( @session def stream_log(user_input: str, server: str = bconfig.g_server): """Stream logs for the most recent run of a given pipeline.""" - _, pipeline_hash = bhelper.user_input_to_pipeline(user_input) - logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}") + pipeline = bpipeline.user_input_to_pipeline(user_input) + logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}") res = bhelper.get( endpoint="logs", - params=dict(pipeline_sha256=pipeline_hash), + params=dict(pipeline_sha256=pipeline.checksum), server=bconfig.server(), stream=True, ) @@ -100,7 +103,8 @@ def submit_pipeline(user_input: str, rerun: bool = False): Prepare the user directory to send to the server. User can also provide link to the pipeline to run. """ - create_remote_container(user_input, recreate=rerun) + res = create_remote_container(user_input, recreate=rerun) + logger.info(f"Remote container: {res}") containers = list_remote_container(user_input) logger.info(f" Got containers {containers}") return containers @@ -138,7 +142,7 @@ def main(verbose: bool = False, server: T.Optional[str] = None): @app.command() def version(): """version information""" - print(bitia.version()) + print(bversion()) if __name__ == "__main__": diff --git a/bitia/common.py b/bitia/common.py new file mode 100644 index 0000000000000000000000000000000000000000..eed395adce92535c165b2a2f6f672f20d7a54401 --- /dev/null +++ b/bitia/common.py @@ -0,0 +1,33 @@ +from pathlib import Path +import hashlib + +from bitia.logger import logger + + +def sha256sum(infile: Path) -> str: + """Compute sha256sum of a file. + + Credit + ------ + Thanks https://stackoverflow.com/a/44873382/1805129 + + """ + h = hashlib.sha256() + b = bytearray(128 * 1024) + mv = memoryview(b) + with infile.open("rb", buffering=0) as handler: + while n := handler.readinto(mv): + h.update(mv[:n]) + return h.hexdigest() + + +def dir_info(user_dir: Path) -> dict: + """Check if directory is in good condition.""" + files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()] + size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files) + if size_in_mb > 25.0: + logger.warning( + "The size of pipeline is >25MB ({size_in_mb} MB)." + " You should try to reduce the size of the pipeline. TODO: See this link." + ) + return dict(size_in_mb=size_in_mb, num_files=len(files), files=files) diff --git a/bitia/helper.py b/bitia/helper.py index 7785111b8a843b4561f3924203247c4af58dad0e..cbf2165775197fa96e2a61d6102e40b0698f4bd7 100644 --- a/bitia/helper.py +++ b/bitia/helper.py @@ -18,6 +18,7 @@ import requests import bitia.config as bconfig import bitia.session as bsession +from bitia.pipeline import Pipeline from bitia.checksumdir import dirhash from bitia.logger import logger @@ -35,79 +36,6 @@ def _check_server_status(server: str) -> int: return res.status_code -def dir_info(user_dir: Path) -> dict: - """Check if directory is in good condition.""" - files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()] - size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files) - if size_in_mb > 25.0: - logger.warning( - "The size of pipeline is >25MB ({size_in_mb} MB)." - " You should try to reduce the size of the pipeline. TODO: See this link." - ) - return dict(size_in_mb=size_in_mb, num_files=len(files), files=files) - - -def prepare_archive(user_dir: Path) -> Path: - """Prepare the file to upload. Store it in temp directory""" - dinfo = dir_info(user_dir) - dhash = dirhash(user_dir) - outfile = bconfig.bitia_dir() / "pipelines" / f"{dhash}.zip" - if outfile.is_file(): - logger.info(f"Reusing the existing pipeline `{outfile}`.") - return outfile - logger.info(f"Preparing the zipfile pipeline from {user_dir}") - logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}") - outfile.parent.mkdir(parents=True, exist_ok=True) - assert dinfo["files"], f"No file found in {user_dir}" - with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile: - for entry in dinfo["files"]: - logger.info(f"Adding {entry} to zipfile") - zfile.write(entry) - - # check the prepared zip file. - with zipfile.ZipFile(outfile) as zfile: - assert zfile.namelist(), "Empty zipfile" - - # assert non-zero size of the zip file. - assert outfile.is_file(), f"{outfile} does not exists" - return outfile - - -def create_pipeline_from_single_script(script: Path) -> Path: - """Create a pipelinefile from a single script""" - assert script.is_file(), f"{script} is not a file" - pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) - pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME - # move the script to this directory. - shutil.copy2(script, pipeline_dir) - script_name = script.name - with pipeline_file.open("w", newline="\n") as outf: - outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}") - return prepare_archive(pipeline_dir) - - -def create_pipeline_from_command( - cmd: str, add_timestamp: bool = True, add_uuid1: bool = True -) -> Path: - """Create a pipeline from user input. - - Returns - ------- - The directory in which pipeline was created. - """ - pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) - pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME - with pipeline_file.open("w", newline="\n") as outf: - outf.writeline("#!/bin/sh") - if add_timestamp: - outf.writeline(f"# timestamp={datetime.now().isoformat()}") - if add_uuid1: - outf.writeline(f"# uuid={uuid.uuid1()}") - outf.writeline(f"{cmd}") - logger.info("Wrote pipeline %s", pipeline_file.read_text()) - return prepare_archive(pipeline_dir) - - def post_pipeline_task( pipeline_zip: Path, *, @@ -202,41 +130,3 @@ def submit_job( json=dict(filename=pipeline_zip, rerun=rerun), ) return response.json() - - -def sha256sum(infile: Path) -> str: - """Compute sha256sum of a file. - - Credit - ------ - Thanks https://stackoverflow.com/a/44873382/1805129 - - """ - h = hashlib.sha256() - b = bytearray(128 * 1024) - mv = memoryview(b) - with infile.open("rb", buffering=0) as handler: - while n := handler.readinto(mv): - h.update(mv[:n]) - return h.hexdigest() - - -def user_input_to_pipeline(user_input: str) -> T.Tuple[Path, str]: - """Create a pipeline file from user_input""" - if (path := Path(user_input)).exists(): - if path.is_dir(): - pipeline_zip = prepare_archive(path) - elif path.is_file() and path.suffix.lower() == ".zip": - pipeline_zip = path - elif path.is_file(): - pipeline_zip = create_pipeline_from_single_script(path) - else: - raise NotImplementedError(f"{path} is not yet supported") - elif validators.url(user_input): - logger.warning("Fetching pipeline from url is not supported") - sys.exit(-1) - else: - # generate a temporary pipeline and submit. - pipeline_zip = create_pipeline_from_command(user_input) - logger.info(f"Created pipeline in {pipeline_zip}") - return pipeline_zip, sha256sum(pipeline_zip) diff --git a/bitia/pipeline.py b/bitia/pipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..fa2671a132e23b6d0cf5934809863685798a2f7d --- /dev/null +++ b/bitia/pipeline.py @@ -0,0 +1,133 @@ +"""Pipleline""" + +import typing as T +import uuid +import shutil +import tempfile +import zipfile +from pathlib import Path +from datetime import datetime + +import validators +import bitia.config as bconfig +from bitia.common import sha256sum, dir_info +from bitia.logger import logger +from bitia.checksumdir import dirhash + + +class Singleon(type): + _instances: T.Dict[T.Any, T.Any] = {} + + def __call__(cls, zipfile, user_input=None, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(Singleon, cls).__call__( + zipfile, user_input, **kwargs + ) + else: + prev_instance = cls._instances[cls] + if prev_instance.user_input != user_input: + cls._instances[cls] = super(Singleon, cls).__call__( + zipfile, user_input, **kwargs + ) + return cls._instances[cls] + + +class Pipeline(metaclass=Singleon): + def __init__(self, zipfile: Path, user_input=None, **kwargs): + self.zipfile: Path = zipfile + self.hash: str = sha256sum(zipfile) + self.user_input = user_input + + def __eq__(self, other): + return self.checksum == other.checksum and self.size == other.size + + @property + def size(self) -> int: + return self.zipfile.stat().st_size + + @property + def checksum(self) -> str: + return self.hash + + @classmethod + def from_command( + cls, cmd: str, add_timestamp: bool = True, add_uuid1: bool = True + ) -> "Pipeline": + """Create a pipeline from user input. + + Returns + ------- + The directory in which pipeline was created. + """ + pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) + pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME + lines: T.List[str] = [] + lines.append("#!/bin/sh") + if add_timestamp: + lines.append(f"# timestamp={datetime.now().isoformat()}") + if add_uuid1: + lines.append(f"# uuid={uuid.uuid1()}") + lines.append(f"{cmd}") + pipeline_file.write_text("\n".join(lines)) + logger.debug("Wrote pipeline %s", pipeline_file.read_text()) + pipeline = Pipeline.from_directory(pipeline_dir) + pipeline.user_input = cmd + return pipeline + + @classmethod + def from_script(cls, script: Path) -> "Pipeline": + """Create a pipelinefile from a single script""" + assert script.is_file(), f"{script} is not a file" + pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_")) + pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME + # move the script to this directory. + shutil.copy2(script, pipeline_dir) + script_name = script.name + with pipeline_file.open("w", newline="\n") as outf: + outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}") + pipeline = Pipeline.from_directory(pipeline_dir) + pipeline.user_input = script + return pipeline + + @classmethod + def from_directory(cls, user_dir: Path) -> "Pipeline": + """Create pipeline from a directory""" + dinfo = dir_info(user_dir) + content_hash = dirhash(user_dir) + outfile = ( + bconfig.bitia_dir() / "pipelines" / f"{content_hash}-{user_dir.name}.zip" + ) + if not outfile.is_file(): + logger.info(f"Preparing the zipfile pipeline from {user_dir}") + logger.info( + f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}" + ) + outfile.parent.mkdir(parents=True, exist_ok=True) + assert dinfo["files"], f"No file found in {user_dir}" + with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile: + for entry in dinfo["files"]: + logger.info(f"Adding {entry} to zipfile") + zfile.write(entry) + + # check the prepared zip file. + with zipfile.ZipFile(outfile) as zfile: + assert zfile.namelist(), "Empty zipfile" + else: + logger.info(f"Reusing the existing pipeline `{outfile}`.") + + # assert non-zero size of the zip file. + assert outfile.is_file(), f"{outfile} does not exists" + return cls(outfile) + + +def user_input_to_pipeline(user_input: str) -> Pipeline: + """Create a pipeline file from user_input""" + if (path := Path(user_input)).exists(): + if path.is_dir(): + return Pipeline.from_directory(path) + if path.is_file() and path.suffix.lower() == ".zip": + return Pipeline(path, str(path.resolve())) + return Pipeline.from_script(path) + if validators.url(user_input): + raise NotImplementedError("Fetching pipeline from url is not supported") + return Pipeline.from_command(user_input) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..3ac169f586122c4192e03b2b1fbeb60ab03f536d --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,25 @@ +"""Test pipleline""" + +from bitia.pipeline import Pipeline + + +def test_pipeline_sanity(): + uinput = "ls -ltrh" + pipeline = Pipeline.from_command(uinput) + assert pipeline + assert pipeline.user_input == uinput + assert pipeline.size > 0 + assert pipeline.checksum + + +def test_pipeline_singleton(): + u1 = "ls -ltrh" + u2 = "ls -ltrh /" + p1 = Pipeline.from_command(u1) + p2 = Pipeline.from_command(u1) + p3 = Pipeline.from_command(u2) + p4 = Pipeline.from_command(u2) + assert p1 == p2 + assert p3 == p4 + assert p2 != p3 + assert p1 != p4