Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
B
bitia-cli
Manage
Activity
Members
Labels
Plan
Issues
0
Issue boards
Milestones
Wiki
Code
Merge requests
1
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
bitia
bitia-cli
Merge requests
!2
version 0.2.0
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
version 0.2.0
devel
into
main
Overview
0
Commits
27
Pipelines
12
Changes
12
Merged
dilawar
requested to merge
devel
into
main
2 years ago
Overview
0
Commits
27
Pipelines
12
Changes
12
Expand
Doesn't append '/api/v1' by itself anymore.
0
0
Merge request reports
Compare
main
version 11
db482951
2 years ago
version 10
48980057
2 years ago
version 9
c4aa5d5a
2 years ago
version 8
13d06d53
2 years ago
version 7
7397064b
2 years ago
version 6
2fbc934c
2 years ago
version 5
a6bce64d
2 years ago
version 4
80390649
2 years ago
version 3
60d6bb05
2 years ago
version 2
33eeb678
2 years ago
version 1
1cd95b1d
2 years ago
main (base)
and
version 3
latest version
9c1d0a18
27 commits,
2 years ago
version 11
db482951
26 commits,
2 years ago
version 10
48980057
25 commits,
2 years ago
version 9
c4aa5d5a
24 commits,
2 years ago
version 8
13d06d53
23 commits,
2 years ago
version 7
7397064b
22 commits,
2 years ago
version 6
2fbc934c
21 commits,
2 years ago
version 5
a6bce64d
20 commits,
2 years ago
version 4
80390649
19 commits,
2 years ago
version 3
60d6bb05
18 commits,
2 years ago
version 2
33eeb678
17 commits,
2 years ago
version 1
1cd95b1d
16 commits,
2 years ago
12 files
+
484
−
136
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
12
Search (e.g. *.vue) (Ctrl+P)
bitia/__main__.py
+
105
−
124
Options
"""
BiTIA command line interface to submit job to the BiTIA server.
(c) 2022-, Subconscious Compute
https://bitia.link
(c) 2022-, Subconscious Compute, https://subcom.tech
"""
import
validators
import
shutil
import
sys
import
typing
as
T
import
requests
import
zipfile
from
pathlib
import
Path
import
tempfile
import
bitia
from
bitia.checksumdir
import
dirhash
import
logging
from
rich.logging
import
RichHandler
import
os
import
functools
FORMAT
=
"
%(message)s
"
logging
.
basicConfig
(
level
=
"
INFO
"
,
format
=
FORMAT
,
datefmt
=
"
[%X]
"
,
handlers
=
[
RichHandler
()]
)
import
bitia.helper
as
bhelper
from
bitia.logger
import
logger
logger
=
logging
.
getLogger
(
"
bitia
"
)
import
bitia
import
bitia.session
as
bsession
import
bitia.config
as
bconfig
g_default_bitia_main_script_name
:
T
.
Final
[
str
]
=
"
__main__.bitia.sh
"
import
typer
@@ -32,126 +23,116 @@ app = typer.Typer()
def
version_callback
(
value
:
bool
):
"""
callback for version
"""
if
value
:
print
(
bitia
.
version
())
def
bitia_dir
()
->
Path
:
"""
CLI cache directory
"""
bdir
=
Path
(
tempfile
.
gettempdir
())
/
"
bitia
"
bdir
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
return
bdir
def
dir_info
(
user_dir
:
Path
)
->
dict
:
"""
Check if directory is in good condition.
"""
files
=
[
f
.
resolve
()
for
f
in
user_dir
.
glob
(
"
**/*
"
)
if
f
.
is_file
()]
size_in_mb
=
sum
(
f
.
stat
().
st_size
/
1024.0
/
1024.0
for
f
in
files
)
if
size_in_mb
>
20
:
logger
.
error
(
"
The size of pipeline is more than 20MB. Uploading this big pipeline is now allowed.
"
)
quit
(
-
1
)
if
size_in_mb
>
10
:
logger
.
warning
(
"
The size of pipeline is >10MB ({size_in_mb} MB).
"
"
You should try to reduce the size of the pipeline. TODO: See this link.
"
)
return
dict
(
size_in_mb
=
size_in_mb
,
num_files
=
len
(
files
),
files
=
files
)
def
prepare_archive
(
user_dir
:
Path
)
->
Path
:
"""
Prepare the file to upload. Store it in temp directory
"""
dinfo
=
dir_info
(
user_dir
)
dhash
=
dirhash
(
user_dir
)
logger
.
info
(
f
"
Preparing the zipfile pipeline from
{
user_dir
}
"
)
logger
.
info
(
f
"
size=
{
dinfo
[
'
size_in_mb
'
]
}
MB, total files=
{
dinfo
[
'
num_files
'
]
}
"
)
outfile
=
bitia_dir
()
/
"
pipelines
"
/
f
"
{
dhash
}
.zip
"
outfile
.
parent
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
assert
dinfo
[
"
files
"
],
f
"
No file found in
{
user_dir
}
"
with
zipfile
.
ZipFile
(
outfile
,
"
w
"
,
zipfile
.
ZIP_DEFLATED
)
as
zfile
:
for
entry
in
dinfo
[
"
files
"
]:
logger
.
info
(
f
"
Adding
{
entry
}
to zipfile
"
)
zfile
.
write
(
entry
)
# check the prepared zip file.
with
zipfile
.
ZipFile
(
outfile
)
as
zfile
:
assert
zfile
.
namelist
(),
"
Empty zipfile
"
# assert non-zero size of the zip file.
assert
outfile
.
is_file
(),
f
"
{
outfile
}
does not exists
"
return
outfile
def
create_pipeline_from_single_script
(
script
:
Path
)
->
Path
:
"""
Create a pipelinefile from a single script
"""
assert
script
.
is_file
(),
f
"
{
script
}
is not a file
"
pipeline_dir
=
Path
(
tempfile
.
mkdtemp
(
prefix
=
"
bitia_
"
))
pipeline_file
=
pipeline_dir
/
g_default_bitia_main_script_name
# move the script to this directory.
shutil
.
copy2
(
script
,
pipeline_dir
)
script_name
=
script
.
name
with
pipeline_file
.
open
(
'
w
'
,
newline
=
'
\n
'
)
as
outf
:
outf
.
write
(
f
"
#!/bin/sh
\n
chmod +x ./
{
script_name
}
\n
./
{
script_name
}
"
)
return
prepare_archive
(
pipeline_dir
)
def
create_pipeline_from_command
(
cmd
:
str
)
->
Path
:
"""
Create a pipeline from user input.
Returns
-------
The directory in which pipeline was created.
"""
pipeline_dir
=
Path
(
tempfile
.
mkdtemp
(
prefix
=
"
bitia_
"
))
pipeline_file
=
pipeline_dir
/
g_default_bitia_main_script_name
with
pipeline_file
.
open
(
'
w
'
,
newline
=
'
\n
'
)
as
outf
:
outf
.
write
(
f
"
#!/bin/sh
\n\n
{
cmd
}
"
)
logging
.
info
(
"
Wrote pipeline %s
"
,
pipeline_file
.
read_text
())
return
prepare_archive
(
pipeline_dir
)
def
submit_job
(
pipeline_zip
:
Path
,
server
:
str
):
"""
Submit job to the API and stream the output.
"""
session
=
requests
.
Session
()
numbytes
=
pipeline_zip
.
stat
().
st_size
assert
numbytes
>
0
logger
.
info
(
f
"
Submitting
{
pipeline_zip
}
(size=
{
numbytes
/
1024.0
:
.
2
f
}
KB) to the
{
server
}
"
def
session
(
func
):
"""
Load a session before and save the session after the function call
"""
@functools.wraps
(
func
)
def
wrapper
(
*
args
,
**
kwargs
):
bsession
.
load_session
()
retval
=
func
(
*
args
,
**
kwargs
)
bsession
.
save_session
()
return
retval
return
wrapper
@app.command
(
"
create-container
"
)
@session
def
create_remote_container
(
user_input
:
str
,
recreate
:
bool
=
False
):
"""
Run a pipeline
"""
pipeline_zip
,
_
=
bhelper
.
user_input_to_pipeline
(
user_input
)
res
=
bhelper
.
post_pipeline_task
(
pipeline_zip
,
endpoint
=
"
container/create
"
,
params
=
dict
(
recreate
=
"
true
"
if
recreate
else
"
false
"
),
server
=
bconfig
.
server
(),
stream
=
True
,
)
files
=
{
"
pipeline_zip
"
:
open
(
str
(
pipeline_zip
),
"
rb
"
)}
response
=
session
.
post
(
f
"
{
server
}
/api/v1/submit
"
,
files
=
files
,
data
=
dict
(
filename
=
pipeline_zip
),
stream
=
True
res
.
raise_for_status
()
for
line
in
res
.
iter_lines
():
print
(
line
.
decode
().
rstrip
())
@app.command
(
"
list-container
"
)
@session
def
list_remote_container
(
user_input
:
str
,
server
:
str
=
bconfig
.
g_server
)
->
T
.
List
[
str
]:
"""
List the remote server associated with the pipeline.
"""
_
,
pipeline_hash
=
bhelper
.
user_input_to_pipeline
(
user_input
)
logger
.
info
(
f
"
Finding container for
{
user_input
}
, sha256sum=
{
pipeline_hash
}
"
)
res
=
bhelper
.
get
(
endpoint
=
"
container/list
"
,
server
=
bconfig
.
server
(),
params
=
dict
(
pipeline_sha256
=
pipeline_hash
),
)
for
line
in
response
.
iter_lines
():
print
(
line
.
decode
())
res
.
raise_for_status
()
return
res
.
json
()[
"
containers
"
].
split
(
"
,
"
)
@app.command
(
"
logs
"
)
@session
def
stream_log
(
user_input
:
str
,
server
:
str
=
bconfig
.
g_server
):
"""
Stream logs for the most recent run of a given pipeline.
"""
_
,
pipeline_hash
=
bhelper
.
user_input_to_pipeline
(
user_input
)
logger
.
info
(
f
"
Finding container for
{
user_input
}
, sha256sum=
{
pipeline_hash
}
"
)
res
=
bhelper
.
get
(
endpoint
=
"
logs
"
,
params
=
dict
(
pipeline_sha256
=
pipeline_hash
),
server
=
bconfig
.
server
(),
stream
=
True
,
)
res
.
raise_for_status
()
for
line
in
res
.
iter_lines
():
print
(
line
.
decode
().
rstrip
())
@app.command
(
"
run
"
)
@app.command
(
"
submit
"
)
def
run_pipeline
(
user_input
:
str
,
server
:
str
=
"
https://public.bitia.link
"
):
@session
def
submit_pipeline
(
user_input
:
str
,
rerun
:
bool
=
False
):
"""
Submit your pipelin (url, directory, zip_file).
Prepare the user directory to send to the server. User can also provide link
to the pipeline to run.
"""
if
(
path
:
=
Path
(
user_input
)).
exists
():
if
path
.
is_dir
():
pipeline_zip
=
prepare_archive
(
path
)
elif
path
.
is_file
()
and
path
.
suffix
.
lower
()
==
"
.zip
"
:
pipeline_zip
=
path
elif
path
.
is_file
():
pipeline_zip
=
create_pipeline_from_single_script
(
path
)
else
:
raise
NotImplementedError
(
f
"
{
path
}
is not yet supported
"
)
elif
validators
.
url
(
user_input
):
logger
.
warning
(
"
Fetching pipeline from url is not supported
"
)
sys
.
exit
(
-
1
)
create_remote_container
(
user_input
,
recreate
=
rerun
)
containers
=
list_remote_container
(
user_input
)
logger
.
info
(
f
"
Got containers
{
containers
}
"
)
return
containers
@app.command
(
"
run
"
)
@session
def
run_pipeline
(
user_input
:
str
,
*
,
rerun
:
bool
=
False
):
"""
Run a pipeline
"""
create_remote_container
(
user_input
,
recreate
=
rerun
)
containers
=
list_remote_container
(
user_input
)
return
[
bhelper
.
log_container
(
container
,
server
=
bconfig
.
server
())
for
container
in
containers
]
@app.callback
()
def
main
(
verbose
:
bool
=
False
,
server
:
T
.
Optional
[
str
]
=
None
):
"""
Callback
"""
if
verbose
:
logger
.
info
(
"
--verbose mode enabled
"
)
if
server
is
not
None
:
bconfig
.
set_server
(
server
)
elif
os
.
environ
.
get
(
"
BITIA_SERVER
"
)
is
not
None
:
bconfig
.
set_server
(
os
.
environ
[
"
BITIA_SERVER
"
])
else
:
# generate a temporary pipeline and submit.
pipeline_zip
=
create_pipeline_from_command
(
user_input
)
logging
.
info
(
f
"
Created pipeline in
{
pipeline_zip
}
"
)
submit_job
(
pipeline_zip
,
server
)
pass
logger
.
info
(
f
"
Using server
{
bconfig
.
server
()
}
"
)
@app.command
()
Loading