Skip to content
Snippets Groups Projects
Commit 26b5407e authored by dilawar's avatar dilawar :ant:
Browse files

feat: adds request session support.

parent 205a3ebb
No related branches found
No related tags found
1 merge request!2version 0.2.0
......@@ -4,38 +4,38 @@
"""
import validators
import functools
import shutil
import sys
import typing as T
import requests
import requests, requests.utils
import pickle
import zipfile
from pathlib import Path
import tempfile
import bitia
from bitia.checksumdir import dirhash
from pathlib import Path
import logging
from rich.logging import RichHandler
FORMAT = "%(message)s"
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
)
import bitia
from bitia.checksumdir import dirhash
logger = logging.getLogger("bitia")
g_default_bitia_main_script_name: T.Final[str] = "__main__.bitia.sh"
BITIA_MAIN_SCRIPT_NAME: T.Final[str] = "__main__.bitia.sh"
DEFAULT_SERVER: T.Final[str] = "https://public.bitia.link/api/v1"
import typer
app = typer.Typer()
def version_callback(value: bool):
if value:
print(bitia.version())
def bitia_dir() -> Path:
"""CLI cache directory"""
bdir = Path(tempfile.gettempdir()) / "bitia"
......@@ -43,18 +43,91 @@ def bitia_dir() -> Path:
return bdir
def _save_session():
global g_session
with SESSION_PICKLE_FILE.open("wb") as fsession:
pickle.dump(g_session, fsession)
def _load_session():
global g_session
if not SESSION_PICKLE_FILE.is_file():
return None
with SESSION_PICKLE_FILE.open("rb") as fsession:
try:
g_session = pickle.load(fsession)
except Exception as e:
return None
g_session = requests.session()
SESSION_PICKLE_FILE = bitia_dir() / ".session.pickle"
def session(func):
"""Load a session before and save the session after the function call"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
_load_session()
retval = func(*args, **kwargs)
_save_session()
return retval
return wrapper
@app.command("submit")
@session
def submit_pipeline(user_input: str, server: str = DEFAULT_SERVER):
"""Submit your pipelin (url, directory, zip_file).
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
pipeline_zip = user_input_to_pipeline(user_input)
containers = submit_job(pipeline_zip, server=server)
return containers
@app.command("run")
@session
def run_pipeline(user_input: str, server: str = DEFAULT_SERVER):
"""Run a pipeline"""
pipeline_zip = user_input_to_pipeline(user_input)
containers = submit_job(pipeline_zip, server=server)
return _log_container(containers, server=server)
@app.command()
def version():
"""version information"""
print(bitia.version())
def _log_container(containers: T.List[T.Any], server: str):
for container in containers:
for line in _fetch_logs(container["Name"], server=server):
print(line.decode().rstrip())
def _check_server_status(server: str) -> int:
res = requests.get(server)
return res.status_code
def version_callback(value: bool):
if value:
print(bitia.version())
def dir_info(user_dir: Path) -> dict:
"""Check if directory is in good condition."""
files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()]
size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files)
if size_in_mb > 20:
logger.error(
"The size of pipeline is more than 20MB. Uploading this big pipeline is now allowed."
)
quit(-1)
if size_in_mb > 10:
if size_in_mb > 25.0:
logger.warning(
"The size of pipeline is >10MB ({size_in_mb} MB)."
"The size of pipeline is >25MB ({size_in_mb} MB)."
" You should try to reduce the size of the pipeline. TODO: See this link."
)
return dict(size_in_mb=size_in_mb, num_files=len(files), files=files)
......@@ -64,9 +137,12 @@ def prepare_archive(user_dir: Path) -> Path:
"""Prepare the file to upload. Store it in temp directory"""
dinfo = dir_info(user_dir)
dhash = dirhash(user_dir)
outfile = bitia_dir() / "pipelines" / f"{dhash}.zip"
if outfile.is_file():
logger.info(f"Reusing the existing pipeline `{outfile}`.")
return outfile
logger.info(f"Preparing the zipfile pipeline from {user_dir}")
logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}")
outfile = bitia_dir() / "pipelines" / f"{dhash}.zip"
outfile.parent.mkdir(parents=True, exist_ok=True)
assert dinfo["files"], f"No file found in {user_dir}"
with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile:
......@@ -87,11 +163,11 @@ def create_pipeline_from_single_script(script: Path) -> Path:
"""Create a pipelinefile from a single script"""
assert script.is_file(), f"{script} is not a file"
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / g_default_bitia_main_script_name
pipeline_file = pipeline_dir / BITIA_MAIN_SCRIPT_NAME
# move the script to this directory.
shutil.copy2(script, pipeline_dir)
script_name = script.name
with pipeline_file.open('w', newline='\n') as outf:
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}")
return prepare_archive(pipeline_dir)
......@@ -104,37 +180,42 @@ def create_pipeline_from_command(cmd: str) -> Path:
The directory in which pipeline was created.
"""
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / g_default_bitia_main_script_name
with pipeline_file.open('w', newline='\n') as outf:
pipeline_file = pipeline_dir / BITIA_MAIN_SCRIPT_NAME
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\n\n{cmd}")
logging.info("Wrote pipeline %s", pipeline_file.read_text())
return prepare_archive(pipeline_dir)
def submit_job(pipeline_zip: Path, server: str):
def submit_job(pipeline_zip: Path, *, server: str):
"""Submit job to the API and stream the output."""
session = requests.Session()
numbytes = pipeline_zip.stat().st_size
if (code := _check_server_status(server)) != 200:
logger.warning(
"%s may not be alive (status code: %s). Try other one?", server, code
)
sys.exit(1)
assert numbytes > 0
logger.info(
f"Submitting {pipeline_zip} (size={numbytes/1024.0:.2f} KB) to the {server}"
)
files = {"pipeline_zip": open(str(pipeline_zip), "rb")}
response = session.post(
f"{server}/api/v1/submit", files=files, data=dict(filename=pipeline_zip), stream=True
"Submitting %s (size=%.2f KB) to the %s",
pipeline_zip,
numbytes / 1024.0,
server,
)
for line in response.iter_lines():
print(line.decode())
# submit and print the output.
with pipeline_zip.open("rb") as f_pipeline:
files = {"pipeline_zip": f_pipeline}
response = g_session.post(
f"{server}/submit",
files=files,
data=dict(filename=pipeline_zip),
)
return response.json()
@app.command("run")
@app.command("submit")
def run_pipeline(user_input: str, server: str = "https://public.bitia.link"):
"""Submit your pipelin (url, directory, zip_file).
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
def user_input_to_pipeline(user_input: str) -> Path:
"""Create a pipeline file from user_input"""
if (path := Path(user_input)).exists():
if path.is_dir():
pipeline_zip = prepare_archive(path)
......@@ -151,14 +232,19 @@ def run_pipeline(user_input: str, server: str = "https://public.bitia.link"):
# generate a temporary pipeline and submit.
pipeline_zip = create_pipeline_from_command(user_input)
logging.info(f"Created pipeline in {pipeline_zip}")
submit_job(pipeline_zip, server)
return pipeline_zip
@app.command()
def version():
"""version information"""
print(bitia.version())
def _fetch_logs(container: str, *, server):
logger.info(f"Fetching logs for container `{container}`")
return g_session.post(
f"{server}/logs",
json=dict(container=container, follow=True, tail=-1),
stream=True,
)
if __name__ == "__main__":
_load_session()
app()
_save_session()
......@@ -7,5 +7,12 @@ import subprocess
def test_sanity():
assert bitia.version()
def test_run_simple():
bitia.__main__.run_pipeline("ls -ltr /")
output = bitia.__main__.run_pipeline("ls -ltr /")
print(output)
if __name__ == "__main__":
test_sanity()
test_run_simple()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment