Skip to content
Snippets Groups Projects
Commit 7397064b authored by dilawar's avatar dilawar :ant:
Browse files

feat: major refactor

pipeline has its own class now.
parent 2fbc934c
No related branches found
No related tags found
1 merge request!2version 0.2.0
Pipeline #3756 failed with stages
in 1 minute and 59 seconds
......@@ -12,7 +12,8 @@ import functools
import bitia.helper as bhelper
from bitia.logger import logger
import bitia
from bitia import version as bversion
import bitia.pipeline as bpipeline
import bitia.session as bsession
import bitia.config as bconfig
......@@ -25,7 +26,7 @@ app = typer.Typer()
def version_callback(value: bool):
"""callback for version"""
if value:
print(bitia.version())
print(version())
def session(func):
......@@ -45,9 +46,9 @@ def session(func):
@session
def create_remote_container(user_input: str, recreate: bool = False):
"""Run a pipeline"""
pipeline_zip, _ = bhelper.user_input_to_pipeline(user_input)
pipeline = bpipeline.user_input_to_pipeline(user_input)
res = bhelper.post_pipeline_task(
pipeline_zip,
pipeline.zipfile,
endpoint="container/create",
params=dict(recreate="true" if recreate else "false"),
server=bconfig.server(),
......@@ -56,6 +57,7 @@ def create_remote_container(user_input: str, recreate: bool = False):
res.raise_for_status()
for line in res.iter_lines():
print(line.decode().rstrip())
return res
@app.command("list-container")
......@@ -64,12 +66,13 @@ def list_remote_container(
user_input: str, server: str = bconfig.g_server
) -> T.List[str]:
"""List the remote server associated with the pipeline."""
_, pipeline_hash = bhelper.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}")
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`")
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
res = bhelper.get(
endpoint="container/list",
server=bconfig.server(),
params=dict(pipeline_sha256=pipeline_hash),
params=dict(pipeline_sha256=pipeline.checksum),
)
res.raise_for_status()
return res.json()["containers"].split(",")
......@@ -79,11 +82,11 @@ def list_remote_container(
@session
def stream_log(user_input: str, server: str = bconfig.g_server):
"""Stream logs for the most recent run of a given pipeline."""
_, pipeline_hash = bhelper.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline_hash}")
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.info(f"Finding container for {user_input}, sha256sum={pipeline.checksum}")
res = bhelper.get(
endpoint="logs",
params=dict(pipeline_sha256=pipeline_hash),
params=dict(pipeline_sha256=pipeline.checksum),
server=bconfig.server(),
stream=True,
)
......@@ -100,7 +103,8 @@ def submit_pipeline(user_input: str, rerun: bool = False):
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
create_remote_container(user_input, recreate=rerun)
res = create_remote_container(user_input, recreate=rerun)
logger.info(f"Remote container: {res}")
containers = list_remote_container(user_input)
logger.info(f" Got containers {containers}")
return containers
......@@ -138,7 +142,7 @@ def main(verbose: bool = False, server: T.Optional[str] = None):
@app.command()
def version():
"""version information"""
print(bitia.version())
print(bversion())
if __name__ == "__main__":
......
from pathlib import Path
import hashlib
from bitia.logger import logger
def sha256sum(infile: Path) -> str:
"""Compute sha256sum of a file.
Credit
------
Thanks https://stackoverflow.com/a/44873382/1805129
"""
h = hashlib.sha256()
b = bytearray(128 * 1024)
mv = memoryview(b)
with infile.open("rb", buffering=0) as handler:
while n := handler.readinto(mv):
h.update(mv[:n])
return h.hexdigest()
def dir_info(user_dir: Path) -> dict:
"""Check if directory is in good condition."""
files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()]
size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files)
if size_in_mb > 25.0:
logger.warning(
"The size of pipeline is >25MB ({size_in_mb} MB)."
" You should try to reduce the size of the pipeline. TODO: See this link."
)
return dict(size_in_mb=size_in_mb, num_files=len(files), files=files)
......@@ -18,6 +18,7 @@ import requests
import bitia.config as bconfig
import bitia.session as bsession
from bitia.pipeline import Pipeline
from bitia.checksumdir import dirhash
from bitia.logger import logger
......@@ -35,79 +36,6 @@ def _check_server_status(server: str) -> int:
return res.status_code
def dir_info(user_dir: Path) -> dict:
"""Check if directory is in good condition."""
files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()]
size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files)
if size_in_mb > 25.0:
logger.warning(
"The size of pipeline is >25MB ({size_in_mb} MB)."
" You should try to reduce the size of the pipeline. TODO: See this link."
)
return dict(size_in_mb=size_in_mb, num_files=len(files), files=files)
def prepare_archive(user_dir: Path) -> Path:
"""Prepare the file to upload. Store it in temp directory"""
dinfo = dir_info(user_dir)
dhash = dirhash(user_dir)
outfile = bconfig.bitia_dir() / "pipelines" / f"{dhash}.zip"
if outfile.is_file():
logger.info(f"Reusing the existing pipeline `{outfile}`.")
return outfile
logger.info(f"Preparing the zipfile pipeline from {user_dir}")
logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}")
outfile.parent.mkdir(parents=True, exist_ok=True)
assert dinfo["files"], f"No file found in {user_dir}"
with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile:
for entry in dinfo["files"]:
logger.info(f"Adding {entry} to zipfile")
zfile.write(entry)
# check the prepared zip file.
with zipfile.ZipFile(outfile) as zfile:
assert zfile.namelist(), "Empty zipfile"
# assert non-zero size of the zip file.
assert outfile.is_file(), f"{outfile} does not exists"
return outfile
def create_pipeline_from_single_script(script: Path) -> Path:
"""Create a pipelinefile from a single script"""
assert script.is_file(), f"{script} is not a file"
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME
# move the script to this directory.
shutil.copy2(script, pipeline_dir)
script_name = script.name
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}")
return prepare_archive(pipeline_dir)
def create_pipeline_from_command(
cmd: str, add_timestamp: bool = True, add_uuid1: bool = True
) -> Path:
"""Create a pipeline from user input.
Returns
-------
The directory in which pipeline was created.
"""
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME
with pipeline_file.open("w", newline="\n") as outf:
outf.writeline("#!/bin/sh")
if add_timestamp:
outf.writeline(f"# timestamp={datetime.now().isoformat()}")
if add_uuid1:
outf.writeline(f"# uuid={uuid.uuid1()}")
outf.writeline(f"{cmd}")
logger.info("Wrote pipeline %s", pipeline_file.read_text())
return prepare_archive(pipeline_dir)
def post_pipeline_task(
pipeline_zip: Path,
*,
......@@ -202,41 +130,3 @@ def submit_job(
json=dict(filename=pipeline_zip, rerun=rerun),
)
return response.json()
def sha256sum(infile: Path) -> str:
"""Compute sha256sum of a file.
Credit
------
Thanks https://stackoverflow.com/a/44873382/1805129
"""
h = hashlib.sha256()
b = bytearray(128 * 1024)
mv = memoryview(b)
with infile.open("rb", buffering=0) as handler:
while n := handler.readinto(mv):
h.update(mv[:n])
return h.hexdigest()
def user_input_to_pipeline(user_input: str) -> T.Tuple[Path, str]:
"""Create a pipeline file from user_input"""
if (path := Path(user_input)).exists():
if path.is_dir():
pipeline_zip = prepare_archive(path)
elif path.is_file() and path.suffix.lower() == ".zip":
pipeline_zip = path
elif path.is_file():
pipeline_zip = create_pipeline_from_single_script(path)
else:
raise NotImplementedError(f"{path} is not yet supported")
elif validators.url(user_input):
logger.warning("Fetching pipeline from url is not supported")
sys.exit(-1)
else:
# generate a temporary pipeline and submit.
pipeline_zip = create_pipeline_from_command(user_input)
logger.info(f"Created pipeline in {pipeline_zip}")
return pipeline_zip, sha256sum(pipeline_zip)
"""Pipleline"""
import typing as T
import uuid
import shutil
import tempfile
import zipfile
from pathlib import Path
from datetime import datetime
import validators
import bitia.config as bconfig
from bitia.common import sha256sum, dir_info
from bitia.logger import logger
from bitia.checksumdir import dirhash
class Singleon(type):
_instances: T.Dict[T.Any, T.Any] = {}
def __call__(cls, zipfile, user_input=None, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleon, cls).__call__(
zipfile, user_input, **kwargs
)
else:
prev_instance = cls._instances[cls]
if prev_instance.user_input != user_input:
cls._instances[cls] = super(Singleon, cls).__call__(
zipfile, user_input, **kwargs
)
return cls._instances[cls]
class Pipeline(metaclass=Singleon):
def __init__(self, zipfile: Path, user_input=None, **kwargs):
self.zipfile: Path = zipfile
self.hash: str = sha256sum(zipfile)
self.user_input = user_input
def __eq__(self, other):
return self.checksum == other.checksum and self.size == other.size
@property
def size(self) -> int:
return self.zipfile.stat().st_size
@property
def checksum(self) -> str:
return self.hash
@classmethod
def from_command(
cls, cmd: str, add_timestamp: bool = True, add_uuid1: bool = True
) -> "Pipeline":
"""Create a pipeline from user input.
Returns
-------
The directory in which pipeline was created.
"""
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME
lines: T.List[str] = []
lines.append("#!/bin/sh")
if add_timestamp:
lines.append(f"# timestamp={datetime.now().isoformat()}")
if add_uuid1:
lines.append(f"# uuid={uuid.uuid1()}")
lines.append(f"{cmd}")
pipeline_file.write_text("\n".join(lines))
logger.debug("Wrote pipeline %s", pipeline_file.read_text())
pipeline = Pipeline.from_directory(pipeline_dir)
pipeline.user_input = cmd
return pipeline
@classmethod
def from_script(cls, script: Path) -> "Pipeline":
"""Create a pipelinefile from a single script"""
assert script.is_file(), f"{script} is not a file"
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME
# move the script to this directory.
shutil.copy2(script, pipeline_dir)
script_name = script.name
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}")
pipeline = Pipeline.from_directory(pipeline_dir)
pipeline.user_input = script
return pipeline
@classmethod
def from_directory(cls, user_dir: Path) -> "Pipeline":
"""Create pipeline from a directory"""
dinfo = dir_info(user_dir)
content_hash = dirhash(user_dir)
outfile = (
bconfig.bitia_dir() / "pipelines" / f"{content_hash}-{user_dir.name}.zip"
)
if not outfile.is_file():
logger.info(f"Preparing the zipfile pipeline from {user_dir}")
logger.info(
f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}"
)
outfile.parent.mkdir(parents=True, exist_ok=True)
assert dinfo["files"], f"No file found in {user_dir}"
with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile:
for entry in dinfo["files"]:
logger.info(f"Adding {entry} to zipfile")
zfile.write(entry)
# check the prepared zip file.
with zipfile.ZipFile(outfile) as zfile:
assert zfile.namelist(), "Empty zipfile"
else:
logger.info(f"Reusing the existing pipeline `{outfile}`.")
# assert non-zero size of the zip file.
assert outfile.is_file(), f"{outfile} does not exists"
return cls(outfile)
def user_input_to_pipeline(user_input: str) -> Pipeline:
"""Create a pipeline file from user_input"""
if (path := Path(user_input)).exists():
if path.is_dir():
return Pipeline.from_directory(path)
if path.is_file() and path.suffix.lower() == ".zip":
return Pipeline(path, str(path.resolve()))
return Pipeline.from_script(path)
if validators.url(user_input):
raise NotImplementedError("Fetching pipeline from url is not supported")
return Pipeline.from_command(user_input)
"""Test pipleline"""
from bitia.pipeline import Pipeline
def test_pipeline_sanity():
uinput = "ls -ltrh"
pipeline = Pipeline.from_command(uinput)
assert pipeline
assert pipeline.user_input == uinput
assert pipeline.size > 0
assert pipeline.checksum
def test_pipeline_singleton():
u1 = "ls -ltrh"
u2 = "ls -ltrh /"
p1 = Pipeline.from_command(u1)
p2 = Pipeline.from_command(u1)
p3 = Pipeline.from_command(u2)
p4 = Pipeline.from_command(u2)
assert p1 == p2
assert p3 == p4
assert p2 != p3
assert p1 != p4
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment