Skip to content
Snippets Groups Projects
Commit cd009379 authored by dilawar's avatar dilawar :ant:
Browse files

refactor: major refactor.

parent 1313c1b1
No related branches found
No related tags found
1 merge request!2version 0.2.0
Pipeline #3702 failed with stages
in 1 minute and 54 seconds
......@@ -3,65 +3,25 @@
(c) 2022-, Subconscious Compute
"""
import validators
import functools
import shutil
import sys
import typing as T
import requests, requests.utils
import pickle
import zipfile
import tempfile
from pathlib import Path
import logging
from rich.logging import RichHandler
FORMAT = "%(message)s"
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
)
import bitia.helper as bhelper
from bitia.logger import logger
import bitia
from bitia.checksumdir import dirhash
import bitia.session as bsession
import bitia.config as bconfig
logger = logging.getLogger("bitia")
BITIA_MAIN_SCRIPT_NAME: T.Final[str] = "__main__.bitia.sh"
DEFAULT_SERVER: T.Final[str] = "https://public.bitia.link/api/v1"
import typer
app = typer.Typer()
def bitia_dir() -> Path:
"""CLI cache directory"""
bdir = Path(tempfile.gettempdir()) / "bitia"
bdir.mkdir(parents=True, exist_ok=True)
return bdir
def _save_session():
global g_session
with SESSION_PICKLE_FILE.open("wb") as fsession:
pickle.dump(g_session, fsession)
def _load_session():
global g_session
if not SESSION_PICKLE_FILE.is_file():
return None
with SESSION_PICKLE_FILE.open("rb") as fsession:
try:
g_session = pickle.load(fsession)
except Exception as e:
return None
g_session = requests.session()
SESSION_PICKLE_FILE = bitia_dir() / ".session.pickle"
def version_callback(value: bool):
"""callback for version"""
if value:
print(bitia.version())
def session(func):
......@@ -69,9 +29,9 @@ def session(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
_load_session()
bsession.load_session()
retval = func(*args, **kwargs)
_save_session()
bsession.save_session()
return retval
return wrapper
......@@ -79,25 +39,40 @@ def session(func):
@app.command("submit")
@session
def submit_pipeline(user_input: str, rerun: bool = False, server: str = DEFAULT_SERVER):
def submit_pipeline(
user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER
):
"""Submit your pipelin (url, directory, zip_file).
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
pipeline_zip = user_input_to_pipeline(user_input)
containers = submit_job(pipeline_zip, rerun=rerun, server=server)
pipeline_zip = bhelper.user_input_to_pipeline(user_input)
containers = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server)
logger.info(f" Got containers {containers}")
return containers
@app.command("run")
@session
def run_pipeline(user_input: str, rerun: bool = False, server: str = DEFAULT_SERVER):
def run_pipeline(
user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER
):
"""Run a pipeline"""
pipeline_zip = bhelper.user_input_to_pipeline(user_input)
res = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server)
return bhelper.log_container(res["container"], server=server)
@app.command("create")
@session
def create_remote_infra(
user_input: str, rerun: bool = False, server: str = bconfig.DEFAULT_SERVER
):
"""Run a pipeline"""
pipeline_zip = user_input_to_pipeline(user_input)
res = submit_job(pipeline_zip, rerun=rerun, server=server)
return _log_container(res["container"], server=server)
pipeline_zip = bhelper.user_input_to_pipeline(user_input)
res = bhelper.submit_job(pipeline_zip, rerun=rerun, server=server)
return bhelper.log_container(res["container"], server=server)
@app.command()
......@@ -106,149 +81,5 @@ def version():
print(bitia.version())
def _log_container(container: str, server: str):
assert (
container is not None
), "Failed to determine the container that is runnning the pipeline. There is probably a bug in server end."
logger.info(f"Fetching logs from {container}")
for line in _fetch_logs(container, server=server):
print(line.decode().rstrip())
def _check_server_status(server: str) -> int:
res = requests.get(server)
return res.status_code
def version_callback(value: bool):
if value:
print(bitia.version())
def dir_info(user_dir: Path) -> dict:
"""Check if directory is in good condition."""
files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()]
size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files)
if size_in_mb > 25.0:
logger.warning(
"The size of pipeline is >25MB ({size_in_mb} MB)."
" You should try to reduce the size of the pipeline. TODO: See this link."
)
return dict(size_in_mb=size_in_mb, num_files=len(files), files=files)
def prepare_archive(user_dir: Path) -> Path:
"""Prepare the file to upload. Store it in temp directory"""
dinfo = dir_info(user_dir)
dhash = dirhash(user_dir)
outfile = bitia_dir() / "pipelines" / f"{dhash}.zip"
if outfile.is_file():
logger.info(f"Reusing the existing pipeline `{outfile}`.")
return outfile
logger.info(f"Preparing the zipfile pipeline from {user_dir}")
logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}")
outfile.parent.mkdir(parents=True, exist_ok=True)
assert dinfo["files"], f"No file found in {user_dir}"
with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile:
for entry in dinfo["files"]:
logger.info(f"Adding {entry} to zipfile")
zfile.write(entry)
# check the prepared zip file.
with zipfile.ZipFile(outfile) as zfile:
assert zfile.namelist(), "Empty zipfile"
# assert non-zero size of the zip file.
assert outfile.is_file(), f"{outfile} does not exists"
return outfile
def create_pipeline_from_single_script(script: Path) -> Path:
"""Create a pipelinefile from a single script"""
assert script.is_file(), f"{script} is not a file"
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / BITIA_MAIN_SCRIPT_NAME
# move the script to this directory.
shutil.copy2(script, pipeline_dir)
script_name = script.name
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}")
return prepare_archive(pipeline_dir)
def create_pipeline_from_command(cmd: str) -> Path:
"""Create a pipeline from user input.
Returns
-------
The directory in which pipeline was created.
"""
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / BITIA_MAIN_SCRIPT_NAME
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\n\n{cmd}")
logger.info("Wrote pipeline %s", pipeline_file.read_text())
return prepare_archive(pipeline_dir)
def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False):
"""Submit job to the API and stream the output."""
numbytes = pipeline_zip.stat().st_size
if (code := _check_server_status(server)) != 200:
logger.warning(
"%s may not be alive (status code: %s). Try other one?", server, code
)
sys.exit(1)
assert numbytes > 0
logger.info(
"Submitting %s (size=%.2f KB) to the %s",
pipeline_zip,
numbytes / 1024.0,
server,
)
# submit and print the output.
with pipeline_zip.open("rb") as f_pipeline:
files = {"pipeline_zip": f_pipeline}
response = g_session.post(
f"{server}/submit/?rerun={rerun}",
files=files,
data=dict(filename=pipeline_zip, rerun=rerun),
)
return response.json()
def user_input_to_pipeline(user_input: str) -> Path:
"""Create a pipeline file from user_input"""
if (path := Path(user_input)).exists():
if path.is_dir():
pipeline_zip = prepare_archive(path)
elif path.is_file() and path.suffix.lower() == ".zip":
pipeline_zip = path
elif path.is_file():
pipeline_zip = create_pipeline_from_single_script(path)
else:
raise NotImplementedError(f"{path} is not yet supported")
elif validators.url(user_input):
logger.warning("Fetching pipeline from url is not supported")
sys.exit(-1)
else:
# generate a temporary pipeline and submit.
pipeline_zip = create_pipeline_from_command(user_input)
logging.info(f"Created pipeline in {pipeline_zip}")
return pipeline_zip
def _fetch_logs(container: str, *, server):
logger.info(f"Fetching logs for container `{container}`")
return g_session.post(
f"{server}/logs",
json=dict(container=container, follow=True),
stream=True,
)
if __name__ == "__main__":
_load_session()
app()
_save_session()
import typing as T
from pathlib import Path
import tempfile
BITIA_MAIN_SCRIPT_NAME: T.Final[str] = "__main__.bitia.sh"
DEFAULT_SERVER: T.Final[str] = "https://public.bitia.link/api/v1"
def bitia_dir() -> Path:
"""CLI cache directory"""
bdir = Path(tempfile.gettempdir()) / "bitia"
bdir.mkdir(parents=True, exist_ok=True)
return bdir
"""Helper module"""
__author__ = "Dilawar Singh"
__email__ = "dilawar@subcom.tech"
import sys
import zipfile
import shutil
from pathlib import Path
import tempfile
import validators
import requests
import bitia.config as bconfig
import bitia.session as bsession
from bitia.checksumdir import dirhash
from bitia.logger import logger
def log_container(container: str, server: str):
assert (
container
), "Failed to determine the container that is runnning the pipeline. There is probably a bug in server end."
logger.info(f"Fetching logs from {container}")
for line in bsession.fetch_logs(container, server=server):
print(line.decode().rstrip())
def _check_server_status(server: str) -> int:
res = requests.get(server)
return res.status_code
def dir_info(user_dir: Path) -> dict:
"""Check if directory is in good condition."""
files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()]
size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files)
if size_in_mb > 25.0:
logger.warning(
"The size of pipeline is >25MB ({size_in_mb} MB)."
" You should try to reduce the size of the pipeline. TODO: See this link."
)
return dict(size_in_mb=size_in_mb, num_files=len(files), files=files)
def prepare_archive(user_dir: Path) -> Path:
"""Prepare the file to upload. Store it in temp directory"""
dinfo = dir_info(user_dir)
dhash = dirhash(user_dir)
outfile = bconfig.bitia_dir() / "pipelines" / f"{dhash}.zip"
if outfile.is_file():
logger.info(f"Reusing the existing pipeline `{outfile}`.")
return outfile
logger.info(f"Preparing the zipfile pipeline from {user_dir}")
logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}")
outfile.parent.mkdir(parents=True, exist_ok=True)
assert dinfo["files"], f"No file found in {user_dir}"
with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile:
for entry in dinfo["files"]:
logger.info(f"Adding {entry} to zipfile")
zfile.write(entry)
# check the prepared zip file.
with zipfile.ZipFile(outfile) as zfile:
assert zfile.namelist(), "Empty zipfile"
# assert non-zero size of the zip file.
assert outfile.is_file(), f"{outfile} does not exists"
return outfile
def create_pipeline_from_single_script(script: Path) -> Path:
"""Create a pipelinefile from a single script"""
assert script.is_file(), f"{script} is not a file"
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME
# move the script to this directory.
shutil.copy2(script, pipeline_dir)
script_name = script.name
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}")
return prepare_archive(pipeline_dir)
def create_pipeline_from_command(cmd: str) -> Path:
"""Create a pipeline from user input.
Returns
-------
The directory in which pipeline was created.
"""
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\n\n{cmd}")
logger.info("Wrote pipeline %s", pipeline_file.read_text())
return prepare_archive(pipeline_dir)
def submit_job(pipeline_zip: Path, *, server: str, rerun: bool = False):
"""Submit job to the API and stream the output."""
numbytes = pipeline_zip.stat().st_size
if (code := _check_server_status(server)) != 200:
logger.warning(
"%s may not be alive (status code: %s). Try other one?", server, code
)
sys.exit(1)
assert numbytes > 0
logger.info(
"Submitting %s (size=%.2f KB) to the %s",
pipeline_zip,
numbytes / 1024.0,
server,
)
# submit and print the output.
with pipeline_zip.open("rb") as f_pipeline:
files = {"pipeline_zip": f_pipeline}
response = bsession.post(
f"{server}/submit/?rerun={rerun}",
files=files,
data=dict(filename=pipeline_zip, rerun=rerun),
)
return response.json()
def user_input_to_pipeline(user_input: str) -> Path:
"""Create a pipeline file from user_input"""
if (path := Path(user_input)).exists():
if path.is_dir():
pipeline_zip = prepare_archive(path)
elif path.is_file() and path.suffix.lower() == ".zip":
pipeline_zip = path
elif path.is_file():
pipeline_zip = create_pipeline_from_single_script(path)
else:
raise NotImplementedError(f"{path} is not yet supported")
elif validators.url(user_input):
logger.warning("Fetching pipeline from url is not supported")
sys.exit(-1)
else:
# generate a temporary pipeline and submit.
pipeline_zip = create_pipeline_from_command(user_input)
logger.info(f"Created pipeline in {pipeline_zip}")
return pipeline_zip
from rich.logging import RichHandler
import logging
FORMAT = "%(message)s"
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
)
logger = logging.getLogger("bitia")
"""API module. Uses session to avoid redundant connections"""
import requests
import pickle
import bitia.config as bconfig
from bitia.logger import logger
g_session = requests.session()
SESSION_PICKLE_FILE = bconfig.bitia_dir() / ".session.pickle"
def fetch_logs(container: str, *, server):
"""Fetch logs from a container."""
logger.info(f"Fetching logs for container `{container}`")
return g_session.post(
f"{server}/logs",
json=dict(container=container, follow=True),
stream=True,
)
def save_session():
"""Save the current requests session as pickle"""
global g_session
with SESSION_PICKLE_FILE.open("wb") as fsession:
pickle.dump(g_session, fsession)
def load_session():
"""Load the pickled session."""
global g_session
if not SESSION_PICKLE_FILE.is_file():
return None
with SESSION_PICKLE_FILE.open("rb") as fsession:
try:
g_session = pickle.load(fsession)
except Exception:
return None
def post(*args, **kwargs):
return g_session.post(*args, **kwargs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment