Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • bitia/bitia-cli
1 result
Show changes
Commits on Source (42)
......@@ -22,3 +22,14 @@ deploy:
script:
- python3 -m pip install poetry --upgrade
- bash .ci/release.sh
build:windows:
stage: build
tags:
- windows
before_script:
- choco.exe install -y python
script:
- python -m pip install .
- python -m bitia --help
- python -m bitia run "ls -ltrh /"
......@@ -6,12 +6,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.2.0] -
### Added
- dev: Session support. Same connection is used multiple time.
- Support for `create`, `logs` and `submit` endpoint.
- Support for `BITIA_SERVER` environment variable.
- Setting log level from the command line
### Fixed
## [0.1.3] - 2022-09-29
## Added
### Added
- Use the public API by default
- Supports Python3.8+
## Fixed
### Fixed
- Removed 'submit' command. It will be added later.
## [0.1.2] - 2022-09-29
......
......@@ -19,19 +19,14 @@ lint:
test: lint build
$(POETRY) run pytest tests bitia
bash tests/test_cli.sh
fix:
$(POETRY) run black bitia
$(POETRY) run black tests
gr runner gitlab-runner:
gitlab-runner exec docker build
gitlab-runner exec docker test
gitlab-runner exec docker deploy
test_pipeline: install
$(POETRY) run bitia_runner run tests/test_20220727
gitlab-runner exec docker build --env BITIA_SERVER=$$BITIA_SERVER
gitlab-runner exec docker deploy --env BITIA_SERVER=$$BITIA_SERVER
release:
rm -rf dist/*.whl
......@@ -41,5 +36,5 @@ doc html:
cd docs && make html
.PHONY : copr fix test install lint build \
all check test_pipeline \
all check \
runner gitlab-runner image image_upload
......@@ -9,6 +9,24 @@ python -m pip install bitia --user
python -m bitia --help
```
## Configuration
TODO: Order of searching configuration file.
### Unix like systems
1. `./bitia.toml`
2. `~/.bitia.toml`
3. `$HOME/.config/bitia.toml`
3. `/etc/bitia.toml`
### Windows
1. `bitia.toml`
2. `%APPDATA%\bitia.toml`
3. `%PROGRAMDATA%\bitia.toml`
## BiTIA runner
If you are self-hosting the BiTIA server, you need `bitia-runner` as well. See
......
......@@ -4,7 +4,7 @@ import logging
from rich.logging import RichHandler
import logging.handlers
log_level = os.environ.get("BITIA_LOGLEVEL", "NOTSET").upper()
log_level = os.environ.get("BITIA_LOGLEVEL", "WARNING").upper()
logging.basicConfig(
format="%(message)s",
level=log_level,
......
"""BiTIA command line interface to submit job to the BiTIA server.
(c) 2022-, Subconscious Compute
https://bitia.link
(c) 2022-, Subconscious Compute, https://subcom.tech
"""
import validators
import shutil
import sys
import typing as T
import requests
import zipfile
from pathlib import Path
import tempfile
import bitia
from bitia.checksumdir import dirhash
import os
import functools
from enum import Enum
import logging
from rich.logging import RichHandler
from rich.progress import track
FORMAT = "%(message)s"
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
)
import bitia.helper as bhelper
from bitia.logger import logger, cprint, set_logger_level, console
logger = logging.getLogger("bitia")
from bitia import version as bversion
import bitia.pipeline as bpipeline
import bitia.session as bsession
import bitia.config as bconfig
g_default_bitia_main_script_name: T.Final[str] = "__main__.bitia.sh"
import typer
app = typer.Typer()
class VerbosityLevel(str, Enum):
debug = "debug"
info = "info"
warning = "warning"
def version_callback(value: bool):
"""callback for version"""
if value:
print(bitia.version())
def bitia_dir() -> Path:
"""CLI cache directory"""
bdir = Path(tempfile.gettempdir()) / "bitia"
bdir.mkdir(parents=True, exist_ok=True)
return bdir
def dir_info(user_dir: Path) -> dict:
"""Check if directory is in good condition."""
files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()]
size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files)
if size_in_mb > 20:
logger.error(
"The size of pipeline is more than 20MB. Uploading this big pipeline is now allowed."
)
quit(-1)
if size_in_mb > 10:
logger.warning(
"The size of pipeline is >10MB ({size_in_mb} MB)."
" You should try to reduce the size of the pipeline. TODO: See this link."
)
return dict(size_in_mb=size_in_mb, num_files=len(files), files=files)
def prepare_archive(user_dir: Path) -> Path:
"""Prepare the file to upload. Store it in temp directory"""
dinfo = dir_info(user_dir)
dhash = dirhash(user_dir)
logger.info(f"Preparing the zipfile pipeline from {user_dir}")
logger.info(f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}")
outfile = bitia_dir() / "pipelines" / f"{dhash}.zip"
outfile.parent.mkdir(parents=True, exist_ok=True)
assert dinfo["files"], f"No file found in {user_dir}"
with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile:
for entry in dinfo["files"]:
logger.info(f"Adding {entry} to zipfile")
zfile.write(entry)
# check the prepared zip file.
with zipfile.ZipFile(outfile) as zfile:
assert zfile.namelist(), "Empty zipfile"
# assert non-zero size of the zip file.
assert outfile.is_file(), f"{outfile} does not exists"
return outfile
def create_pipeline_from_single_script(script: Path) -> Path:
"""Create a pipelinefile from a single script"""
assert script.is_file(), f"{script} is not a file"
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / g_default_bitia_main_script_name
# move the script to this directory.
shutil.copy2(script, pipeline_dir)
script_name = script.name
with pipeline_file.open('w', newline='\n') as outf:
outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}")
return prepare_archive(pipeline_dir)
def create_pipeline_from_command(cmd: str) -> Path:
"""Create a pipeline from user input.
Returns
-------
The directory in which pipeline was created.
print(version())
def session(func):
"""Load a session before and save the session after the function call"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
bsession.load_session()
retval = func(*args, **kwargs)
bsession.save_session()
return retval
return wrapper
@app.command("create-container")
@session
def create_remote_container(
user_input,
*,
recreate: bool = False,
output_lines: T.List[str] = [],
):
"""Create container for the pipeline. The container starts running
immediately on the server. Use command `logs` to stream the output.
"""
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / g_default_bitia_main_script_name
with pipeline_file.open('w', newline='\n') as outf:
outf.write(f"#!/bin/sh\n\n{cmd}")
logging.info("Wrote pipeline %s", pipeline_file.read_text())
return prepare_archive(pipeline_dir)
def submit_job(pipeline_zip: Path, server: str):
"""Submit job to the API and stream the output."""
session = requests.Session()
numbytes = pipeline_zip.stat().st_size
assert numbytes > 0
pipeline = bpipeline.user_input_to_pipeline(user_input)
# for a command pipeline, always create a new container.
if pipeline.is_a_command():
recreate = True
res = bhelper.post_pipeline_task(
pipeline,
endpoint="container/create",
params=dict(recreate="true" if recreate else "false"),
server=bconfig.get_server(),
stream=True,
)
res.raise_for_status()
_lines = (
track(res.iter_lines(), description="BiTIA is setting up required infra...")
if not bconfig.get_config("plain", default=False)
else res.iter_lines()
)
for line in _lines:
output_lines.append(line.decode().rstrip())
logger.info(output_lines[-1])
return res
@app.command("list-container")
@session
def list_containers(user_input):
"""List the remote server associated with the pipeline."""
for container in _list_remote_container(user_input):
cprint(container)
def _list_remote_container(user_input) -> T.List[str]:
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.debug(f"sha256 of `{pipeline.zipfile}` is `{pipeline.checksum}`")
logger.info(
f"Submitting {pipeline_zip} (size={numbytes/1024.0:.2f} KB) to the {server}"
f"Finding container for user input `{user_input}` with sha256sum={pipeline.checksum}"
)
files = {"pipeline_zip": open(str(pipeline_zip), "rb")}
response = session.post(
f"{server}/api/v1/submit", files=files, data=dict(filename=pipeline_zip), stream=True
res = bhelper.get(
endpoint="container/list",
server=bconfig.get_server(),
params=dict(pipeline_sha256=pipeline.checksum),
)
for line in response.iter_lines():
print(line.decode())
res.raise_for_status()
return res.json()["containers"].split(",")
@app.command("logs")
@session
def stream_log(user_input):
"""Stream logs for the most recent run of a given pipeline."""
pipeline = bpipeline.user_input_to_pipeline(user_input)
logger.info(
f"Finding container for user input {user_input} with sha256sum={pipeline.checksum}"
)
res = bhelper.get(
endpoint="logs",
params=dict(pipeline_sha256=pipeline.checksum),
server=bconfig.get_server(),
stream=True,
)
res.raise_for_status()
for line in res.iter_lines():
cprint(line.decode().rstrip())
@app.command("run")
@app.command("submit")
def run_pipeline(user_input: str, server: str = "https://public.bitia.link"):
@session
def submit_pipeline(user_input, *, rerun: bool = False, output_lines: T.List[str]):
"""Submit your pipelin (url, directory, zip_file).
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
if (path := Path(user_input)).exists():
if path.is_dir():
pipeline_zip = prepare_archive(path)
elif path.is_file() and path.suffix.lower() == ".zip":
pipeline_zip = path
elif path.is_file():
pipeline_zip = create_pipeline_from_single_script(path)
else:
raise NotImplementedError(f"{path} is not yet supported")
elif validators.url(user_input):
logger.warning("Fetching pipeline from url is not supported")
sys.exit(-1)
res = create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
logger.info("Remote container: %s", res)
containers = _list_remote_container(user_input)
cprint(f"{containers}")
return containers
@app.command("run")
@session
def run_user_input(user_input, *, rerun: bool = False, output_lines: T.List[str] = []):
"""Run a pipeline"""
create_remote_container(user_input, recreate=rerun, output_lines=output_lines)
containers = _list_remote_container(user_input)
for container in containers:
for _bl in bhelper.log_container(container, server=bconfig.get_server()):
print(_bl.decode().rstrip())
@app.command("checksum")
def checksum(user_input):
pipeline = bpipeline.user_input_to_pipeline(user_input)
cprint(pipeline.checksum)
@app.callback()
def main(
verbose: VerbosityLevel = typer.Option(
VerbosityLevel.warning, case_sensitive=False
),
plain: bool = False,
server: T.Optional[str] = None,
):
"""
Callback
"""
bconfig.set_config("plain", plain)
bconfig.set_config("verbosity", verbose.value)
set_logger_level(verbose.value)
if server is not None:
bconfig.set_server(server)
elif os.environ.get("BITIA_SERVER"):
bconfig.set_server(os.environ["BITIA_SERVER"])
else:
# generate a temporary pipeline and submit.
pipeline_zip = create_pipeline_from_command(user_input)
logging.info(f"Created pipeline in {pipeline_zip}")
submit_job(pipeline_zip, server)
# TODO: Read from default config file.
pass
cprint(f"Using server {bconfig.get_server()}")
@app.command()
def version():
"""version information"""
print(bitia.version())
cprint(bversion())
if __name__ == "__main__":
......
......@@ -25,7 +25,7 @@ HASH_FUNCS = {
def dirhash(
dirname: Path,
hashfunc: str = "md5",
hashfunc: str = "sha256",
excluded_files: T.List[Path] = [],
ignore_hidden: bool = False,
followlinks: bool = False,
......@@ -55,13 +55,11 @@ def dirhash(
return _reduce_hash(hashvalues, hash_func)
def filehash(filepath: Path, hashfunc: str = "md5"):
def filehash(filepath: Path, hashfunc: str = "sha256"):
"""Compute checksum of a file"""
hasher = HASH_FUNCS[hashfunc]()
blocksize = 64 * 1024
if not filepath.is_file():
return hasher.hexdigest()
assert filepath.is_file(), f"{filepath} is not a file"
with filepath.open("rb") as fp:
while True:
data = fp.read(blocksize)
......
from pathlib import Path
from enum import Enum
from bitia.logger import logger
from bitia.checksumdir import filehash
def sha256sum(infile: Path) -> str:
"""Compute sha256sum of a file."""
return filehash(infile, "sha256")
def dir_info(user_dir: Path) -> dict:
"""Check if directory is in good condition."""
files = [f.resolve() for f in user_dir.glob("**/*") if f.is_file()]
size_in_mb = sum(f.stat().st_size / 1024.0 / 1024.0 for f in files)
if size_in_mb > 25.0:
logger.warning(
"The size of pipeline is >25MB ({size_in_mb} MB)."
" You should try to reduce the size of the pipeline. TODO: See this link."
)
return dict(size_in_mb=size_in_mb, num_files=len(files), files=files)
import os
import typing as T
from pathlib import Path
import tempfile
import validators
BITIA_MAIN_SCRIPT_NAME: T.Final[str] = "__main__.bitia.sh"
_config = {"server": "https://public.bitia.link/api/v1"}
def set_config(key, val):
global _config
_config[key] = val
def get_config(key, *, strict: bool = True, default=None):
return _config[key] if strict else _config.get(key, default)
def bitia_dir() -> Path:
"""CLI cache directory"""
bdir = Path(tempfile.gettempdir()) / "bitia"
bdir.mkdir(parents=True, exist_ok=True)
return bdir
def get_server(use_env: bool = True) -> str:
"""Server to use"""
if use_env and os.environ.get("BITIA_SERVER") is not None:
return os.environ["BITIA_SERVER"]
return get_config("server")
def set_server(server: str):
"""set bitia server"""
assert validators.url(server), f"{server} is not a valid url"
set_config("server", server)
"""Helper module"""
__author__ = "Dilawar Singh"
__email__ = "dilawar@subcom.tech"
import sys
from pathlib import Path
import typing as T
import requests
import bitia.session as bsession
from bitia.pipeline import Pipeline
from bitia.logger import logger
def log_container(container: str, server: str, *, timestamps: bool = False):
assert (
container
), "Failed to determine the container that is runnning the pipeline. There is probably a bug in server end."
for line in bsession.fetch_logs(container, server=server, timestamps=timestamps):
yield line
def _check_server_status(server: str) -> int:
res = requests.get(server)
return res.status_code
def post_pipeline_task(
pipeline: Pipeline,
*,
endpoint: str,
server: str,
params: T.Dict[str, str] = {},
**kwargs,
):
"""Submit to the api for a given endpoint and pipeline file"""
if (code := _check_server_status(server)) != 200:
logger.warning(
"%s may not be alive (status code: %s). Try other one?", server, code
)
sys.exit(1)
pipeline_zip = pipeline.zipfile
numbytes = pipeline_zip.stat().st_size
assert numbytes > 0
logger.info(
"Submitting %s (size=%.2f KB) to the %s",
pipeline.zipfile,
numbytes / 1024.0,
server,
)
# submit and print the output.
endpoint = endpoint.strip("/")
with pipeline_zip.open("rb") as f_pipeline:
files = {"pipeline_zip": f_pipeline}
return bsession.post(
f"{server}/{endpoint}",
files=files,
params=params,
data=dict(pipeline=pipeline.data()).update(**kwargs),
stream=kwargs.get("stream", False),
)
def post(
endpoint: str,
*,
server: str,
stream: bool = False,
params: T.Dict[str, str] = {},
**kwargs,
):
"""A generic post function."""
logger.info(f"Posting with data {kwargs}")
return bsession.post(
f"{server}/{endpoint}", json=kwargs, params=params, stream=stream, **kwargs
)
def get(
endpoint: str,
*,
server: str,
stream: bool = False,
params: T.Dict[str, str] = {},
**kwargs,
):
"""A generic post function."""
logger.info(f"Posting with data {kwargs}")
return bsession.get(
f"{server}/{endpoint}", params=params, json=kwargs, stream=stream, **kwargs
)
def submit_job(
pipeline_zip: Path, *, server: str, rerun: bool = False, params: dict = {}
):
"""Submit job to the API and stream the output."""
numbytes = pipeline_zip.stat().st_size
if (code := _check_server_status(server)) != 200:
logger.warning(
"%s may not be alive (status code: %s). Try other one?", server, code
)
sys.exit(1)
assert numbytes > 0
logger.info(
"Submitting %s (size=%.2f KB) to the %s",
pipeline_zip,
numbytes / 1024.0,
server,
)
# submit and print the output.
with pipeline_zip.open("rb") as f_pipeline:
files = {"pipeline_zip": f_pipeline}
response = bsession.post(
f"{server}/submit/?rerun={rerun}",
files=files,
params=params,
json=dict(filename=pipeline_zip, rerun=rerun),
)
return response.json()
import logging
from rich.logging import RichHandler
from rich.console import Console
FORMAT = "%(message)s"
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
)
logger = logging.getLogger("bitia")
console = Console()
def set_logger_level(level_name: str):
"""Set the global logging level"""
assert level_name.upper() in ["INFO", "DEBUG", "WARNING"]
lvl = logging.getLevelName(level_name.upper())
logger.setLevel(lvl)
for handler in logger.handlers:
handler.setLevel(lvl)
def cprint(*args, **kwargs):
"""Forward to rich.console.print"""
console.print(*args, **kwargs)
"""Pipleline"""
import typing as T
import shutil
import tempfile
import zipfile
from enum import Enum, auto
import json
from pathlib import Path
# from datetime import datetime
import validators
import bitia.config as bconfig
from bitia.common import sha256sum, dir_info
from bitia.logger import logger
from bitia.checksumdir import dirhash
class UserInputType(Enum):
"""Type of user input"""
STR_COMMAND = auto()
FILE_SCRIPT = auto()
FILE_ZIP = auto()
DIR_SCRIPT = auto()
UNKNOWN = auto()
class Pipeline:
def __init__(self, zipfile: Path, user_input=None, **kwargs):
self.zipfile: Path = zipfile
self.hash: str = sha256sum(zipfile)
self.user_input = user_input
self.type: UserInputType = UserInputType.UNKNOWN
def __eq__(self, other):
return self.checksum == other.checksum and self.size == other.size
def __repr__(self) -> str:
return json.dumps(
dict(file=str(self.zipfile), checksum=self.checksum, size=self.size),
sort_keys=True,
indent=2,
)
def is_a_command(self) -> bool:
return self.type == UserInputType.STR_COMMAND
@property
def size(self) -> int:
return self.zipfile.stat().st_size
@property
def checksum(self) -> str:
return self.hash
def data(self) -> T.Dict[str, T.Any]:
"""Return the pipeline key properties. This is often sent to the
server."""
return dict(
zip_filename=self.zipfile.name, checksum=self.checksum, size=self.size
)
@classmethod
def from_command(cls, cmd: str, *, comment: str = "") -> "Pipeline":
"""Create a pipeline from user command. The semantics is a bit different
for this.
"""
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME
lines: T.List[str] = []
lines.append("#!/bin/sh")
assert "\n" not in comment, "An eol char is found in comment"
lines.append(f"# {comment}")
lines.append(f"{cmd}")
pipeline_file.write_text("\n".join(lines))
logger.debug("Wrote pipeline %s", pipeline_file.read_text())
pipeline = Pipeline.from_directory(pipeline_dir)
pipeline.user_input = cmd
return pipeline
@classmethod
def from_script(cls, script: Path) -> "Pipeline":
"""Create a pipelinefile from a single script"""
assert script.is_file(), f"{script} is not a file"
pipeline_dir = Path(tempfile.mkdtemp(prefix="bitia_"))
pipeline_file = pipeline_dir / bconfig.BITIA_MAIN_SCRIPT_NAME
# move the script to this directory.
shutil.copy2(script, pipeline_dir)
script_name = script.name
with pipeline_file.open("w", newline="\n") as outf:
outf.write(f"#!/bin/sh\nchmod +x ./{script_name}\n./{script_name}")
pipeline = Pipeline.from_directory(pipeline_dir)
pipeline.user_input = script
return pipeline
@classmethod
def from_directory(cls, user_dir: Path) -> "Pipeline":
"""Create pipeline from a directory"""
dinfo = dir_info(user_dir)
content_hash = dirhash(user_dir)
outfile = bconfig.bitia_dir() / "pipelines" / f"{content_hash}.zip"
if not outfile.is_file():
logger.info(f"Preparing the zipfile pipeline from {user_dir}")
logger.info(
f" size={dinfo['size_in_mb']} MB, total files={dinfo['num_files']}"
)
outfile.parent.mkdir(parents=True, exist_ok=True)
assert dinfo["files"], f"No file found in {user_dir}"
with zipfile.ZipFile(outfile, "w", zipfile.ZIP_DEFLATED) as zfile:
for entry in dinfo["files"]:
logger.info(f"Adding {entry} to zipfile")
zfile.write(entry)
# check the prepared zip file.
with zipfile.ZipFile(outfile) as zfile:
assert zfile.namelist(), "Empty zipfile"
else:
logger.info(f"Reusing the existing pipeline `{outfile}`.")
# assert non-zero size of the zip file.
assert outfile.is_file(), f"{outfile} does not exists"
return cls(outfile)
def user_input_to_pipeline(user_input: str) -> Pipeline:
"""Create a pipeline file from user_input"""
if (path := Path(user_input)).exists():
if path.is_dir():
pipeline = Pipeline.from_directory(path)
pipeline.type = UserInputType.DIR_SCRIPT
return pipeline
if path.is_file() and path.suffix.lower() == ".zip":
pipeline = Pipeline(path, str(path.resolve()))
pipeline.type = UserInputType.FILE_ZIP
return pipeline
pipeline = Pipeline.from_script(path)
pipeline.type = UserInputType.FILE_SCRIPT
return pipeline
if validators.url(user_input):
raise NotImplementedError("Fetching pipeline from url is not supported")
pipeline = Pipeline.from_command(user_input)
pipeline.type = UserInputType.STR_COMMAND
return pipeline
"""API module. Uses session to avoid redundant connections"""
import requests
import pickle
import bitia.config as bconfig
from bitia.logger import logger
g_session = requests.Session()
SESSION_PICKLE_FILE = bconfig.bitia_dir() / ".session.pickle"
def fetch_logs(container: str, *, server, timestamps: bool = True):
"""Fetch logs from a container."""
logger.info(f"Fetching logs for container `{container}`")
return get(
f"{server}/container/logs",
params=dict(container=container, timestamps=timestamps),
stream=True,
)
def save_session():
"""Save the current requests session as pickle"""
global g_session
with SESSION_PICKLE_FILE.open("wb") as fsession:
pickle.dump(g_session, fsession)
def load_session(force_new: bool = False):
"""Load the pickled session."""
global g_session
if force_new:
g_session = requests.Session()
return
if not SESSION_PICKLE_FILE.is_file():
g_session = requests.Session()
return
with SESSION_PICKLE_FILE.open("rb") as fsession:
try:
logger.info(f"Loading session from {fsession.name}")
g_session = pickle.load(fsession)
except Exception:
g_session = requests.Session()
def post(*args, **kwargs):
return g_session.post(*args, **kwargs)
def get(*args, **kwargs):
return g_session.get(*args, **kwargs)
[tool.poetry]
name = "bitia"
version = "0.1.3"
version = "0.2.0b1"
description = "BioInformatics Tool for Infrastructure Automation (BiTIA) CLI utility."
authors = ["Dilawar Singh <dilawar@subcom.tech>"]
readme = "README.md"
......
#!/bin/bash
set -e
poetry run bitia run "ls -ltrh /"
import tempfile
from pathlib import Path
from bitia.common import sha256sum
_common_hashes = {
"1": "6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b",
"123": "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3",
"abcdxyz": "bc5c610196f15a7e0d21a3af77b0e1740a4415a44d8a7ad4905d877825574bf9",
}
def test_filehash():
with tempfile.TemporaryDirectory() as tempdir:
f1 = Path(tempdir) / "a.txt"
f2 = Path(tempdir) / "b.txt"
for _c, _h in _common_hashes.items():
f1.write_text(_c)
assert f1.read_text() == _c
print(f"Computing hash of {f1}, content: {_c}")
assert sha256sum(f1) == _h
t1 = "ls -ltrh /"
f1.write_text(t1)
f2.write_text(t1)
assert sha256sum(f1) == sha256sum(f2)
t2 = "ls -ltrh"
f2.write_text(t2)
assert sha256sum(f1) != sha256sum(f2)
f1.write_text(t2)
assert sha256sum(f1) == sha256sum(f2)
"""Test pipleline"""
from bitia.pipeline import Pipeline
def test_pipeline_sanity():
uinput = "ls -l"
pipeline = Pipeline.from_command(uinput)
assert pipeline
assert pipeline.user_input == uinput
assert pipeline.size > 0
assert pipeline.checksum
def test_pipeline_singleton():
u1 = "ls -ltrh"
u2 = "ls -ltrh /"
p1 = Pipeline.from_command(u1)
p2 = Pipeline.from_command(u1)
assert p1 == p2
p3 = Pipeline.from_command(u2)
p4 = Pipeline.from_command(u2)
assert p3 == p4
assert p2 != p3
assert p1 != p4
from difflib import SequenceMatcher
import bitia
import bitia.config as bconfig
import bitia.__main__
import sys
import subprocess
def test_sanity():
assert bitia.version()
def test_sanity(capsys):
version = bitia.version()
assert len(version) >= 3, version
def test_run_simple():
bitia.__main__.run_pipeline("ls -ltr /")
def almost_equal(s1, s2, threshold=0.9) -> bool:
return SequenceMatcher(a=s1, b=s2).ratio() > threshold
def test_run_repeat(capsys):
bconfig.set_config("plain", True)
bitia.__main__.run_user_input("ls -ltr /", rerun=False)
l1 = capsys.readouterr().out # reset the internal buffer.
bitia.__main__.run_user_input("ls -ltr /", rerun=False)
l2 = capsys.readouterr().out
assert almost_equal(l1, l2, 0.95)
def test_run_simple(capsys):
# set the plain-text mode.
bconfig.set_config("plain", True)
bitia.__main__.run_user_input("ls -ltr /")
captured = capsys.readouterr() # reset the internal buffer.
l1 = captured.out
bitia.__main__.run_user_input("ls -ltr /", rerun=True)
captured = capsys.readouterr()
l2 = captured.out
assert almost_equal(l1, l2, 0.88)