Skip to main content

Scripting Bacalhau with Python

Open In Colab Open In Binder

Bacalhau allows you to easily execute batch jobs via the CLI. But sometimes you need to do more than that. You might need to execute a script that requires user input, or you might need to execute a script that requires a lot of parameters. In any case, you probably want to execute your jobs in a repeatable manner.

This example demonstrates a simple Python script that is able to orchestrate the execution of lots of jobs in a repeatable manner.

Prerequisites

Executing Bacalhau Jobs with Python Scripts

To demonstrate this example, I will use the data generated from the ethereum analysis example. This produced a list of hashes that I will iterate over and execute a job for each one.

%%writefile hashes.txt
bafybeihvtzberlxrsz4lvzrzvpbanujmab3hr5okhxtbgv2zvonqos2l3i
bafybeifb25fgxrzu45lsc47gldttomycqcsao22xa2gtk2ijbsa5muzegq
bafybeig4wwwhs63ly6wbehwd7tydjjtnw425yvi2tlzt3aii3pfcj6hvoq
bafybeievpb5q372q3w5fsezflij3wlpx6thdliz5xowimunoqushn3cwka
bafybeih6te26iwf5kzzby2wqp67m7a5pmwilwzaciii3zipvhy64utikre
bafybeicjd4545xph6rcyoc74wvzxyaz2vftapap64iqsp5ky6nz3f5yndm
bafybeicgo3iofo3sw73wenc3nkdhi263yytjnds5cxjwvypwekbz4sk7ra
bafybeihvep5xsvxm44lngmmeysihsopcuvcr34an4idz45ixl5slsqzy3y
bafybeigmt2zwzrbzwb4q2kt2ihlv34ntjjwujftvabrftyccwzwdypama4
bafybeiciwui7sw3zqkvp4d55p4woq4xgjlstrp3mzxl66ab5ih5vmeozci
bafybeicpmotdsj2ambf666b2jkzp2gvg6tadr6acxqw2tmdlmsruuggbbu
bafybeigefo3esovbveavllgv5wiheu5w6cnfo72jxe6vmfweco5eq5sfty
bafybeigvajsumnfwuv7lp7yhr2sr5vrk3bmmuhhnaz53waa2jqv3kgkvsu
bafybeih2xg2n7ytlunvqxwqlqo5l3daykuykyvhgehoa2arot6dmorstmq
bafybeihnmq2ltuolnlthb757teihwvvw7wophoag2ihnva43afbeqdtgi4
bafybeibb34hzu6z2xgo6nhrplt3xntpnucthqlawe3pmzgxccppbxrpudy
bafybeigny33b4g6gf2hrqzzkfbroprqrimjl5gmb3mnsqu655pbbny6tou
bafybeifgqjvmzbtz427bne7af5tbndmvniabaex77us6l637gqtb2iwlwq
bafybeibryqj62l45pxjhdyvgdc44p3suhvt4xdqc5jpx474gpykxwgnw2e
bafybeidme3fkigdjaifkjfbwn76jk3fcqdogpzebtotce6ygphlujaecla
bafybeig7myc3eg3h2g5mk2co7ybte4qsuremflrjneer6xk3pghjwmcwbi
bafybeic3x2r5rrd3fdpdqeqax4bszcciwepvbpjl7xdv6mkwubyqizw5te
bafybeihxutvxg3bw7fbwohq4gvncrk3hngkisrtkp52cu7qu7tfcuvktnq
bafybeicumr67jkyarg5lspqi2w4zqopvgii5dgdbe5vtbbq53mbyftduxy
bafybeiecn2cdvefvdlczhz6i4afbkabf5pe5yqrcsgdvlw5smme2tw7em4
bafybeiaxh7dhg4krgkil5wqrv5kdsc3oewwy6ym4n3545ipmzqmxaxrqf4
bafybeiclcqfzinrmo3adr4lg7sf255faioxjfsolcdko3i4x7opx7xrqii
bafybeicjmeul7c2dxhmaudawum4ziwfgfkvbgthgtliggfut5tsc77dx7q
bafybeialziupik7csmhfxnhuss5vrw37kmte7rmboqovp4cpq5hj4insda
bafybeid7ecwdrw7pb3fnkokq5adybum6s5ok3yi2lw4m3edjpuy65zm4ji
bafybeibuxwnl5ogs4pwa32xriqhch24zbrw44rp22hrly4t6roh6rz7j4m
bafybeicxvy47jpvv3fi5umjatem5pxabfrbkzxiho7efu6mpidjpatte54
bafybeifynb4mpqrbsjbeqtxpbuf6y4frrtjrc4tm7cnmmui7gbjkckszrq
bafybeidcgnbhguyfaahkoqbyy2z525d3qfzdtbjuk4e75wkdbnkcafvjei
bafybeiefc67s6hpydnsqdgypbunroqwkij5j26sfmc7are7yxvg45uuh7i
bafybeiefwjy3o42ovkssnm7iihbog46k5grk3gobvvkzrqvof7p6xbgowi
bafybeihpydd3ivtza2ql5clatm5fy7ocych7t4czu46sbc6c2ykrbwk5uu
bafybeiet7222lqfmzogur3zlxqavlnd3lt3qryw5yi5rhuiqeqg4w7c3qu
bafybeihwomd4ygoydvj5kh24wfwk5kszmst5vz44zkl6yibjargttv7sly
bafybeidbjt2ckr4oooio3jsfk76r3bsaza5trjvt7u36slhha5ksoc5gv4
bafybeifyjrmopgtfmswq7b4pfscni46doy3g3z6vi5rrgpozc6duebpmuy
bafybeidsrowz46yt62zs64q2mhirlc3rsmctmi3tluorsts53vppdqjj7e
bafybeiggntql57bw24bw6hkp2yqd3qlyp5oxowo6q26wsshxopfdnzsxhq
bafybeidguz36u6wakx4e5ewuhslsfsjmk5eff5q7un2vpkrcu7cg5aaqf4
bafybeiaypwu2b45iunbqnfk2g7bku3nfqveuqp4vlmmwj7o7liyys42uai
bafybeicaahv7xvia7xojgiecljo2ddrvryzh2af7rb3qqbg5a257da5p2y
bafybeibgeiijr74rcliwal3e7tujybigzqr6jmtchqrcjdo75trm2ptb4e
bafybeiba3nrd43ylnedipuq2uoowd4blghpw2z7r4agondfinladcsxlku
bafybeif3semzitjbxg5lzwmnjmlsrvc7y5htekwqtnhmfi4wxywtj5lgoe
bafybeiedmsig5uj7rgarsjans2ad5kcb4w4g5iurbryqn62jy5qap4qq2a
bafybeidyz34bcd3k6nxl7jbjjgceg5eu3szbrbgusnyn7vfl7facpecsce
bafybeigmq5gch72q3qpk4nipssh7g7msk6jpzns2d6xmpusahkt2lu5m4y
bafybeicjzoypdmmdt6k54wzotr5xhpzwbgd3c4oqg6mj4qukgvxvdrvzye
bafybeien55egngdpfvrsxr2jmkewdyha72ju7qaaeiydz2f5rny7drgzta
Overwriting hashes.txt

Now let's run the following script. There's a fair bit of code, but basically there's three core functions to submit, check the status of, and download the results from, a job. Then the main function wraps all of that in a multiprocessing pool to execute the jobs in parallel. Feel free to copy this code and save it to your local machine as bacalhau.py. Then you can execute this script anywhere with python bacalhau.py.

%%writefile bacalhau.py
import json, glob, os, multiprocessing, shutil, subprocess, tempfile, time

# checkStatusOfJob checks the status of a Bacalhau job
def checkStatusOfJob(job_id: str) -> str:
assert len(job_id) > 0
p = subprocess.run(
["bacalhau", "list", "--output", "json", "--id-filter", job_id],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
r = parseJobStatus(p.stdout)
if r == "":
print("job status is empty! %s" % job_id)
elif r == "Completed":
print("job completed: %s" % job_id)
else:
print("job not completed: %s - %s" % (job_id, r))

return r


# submitJob submits a job to the Bacalhau network
def submitJob(cid: str) -> str:
assert len(cid) > 0
p = subprocess.run(
[
"bacalhau",
"docker",
"run",
"--id-only",
"--wait=false",
"--input-volumes",
cid + ":/inputs/data.tar.gz",
"ghcr.io/bacalhau-project/examples/blockchain-etl:0.0.6",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if p.returncode != 0:
print("failed (%d) job: %s" % (p.returncode, p.stdout))
job_id = p.stdout.strip()
print("job submitted: %s" % job_id)

return job_id


# getResultsFromJob gets the results from a Bacalhau job
def getResultsFromJob(job_id: str) -> str:
assert len(job_id) > 0
temp_dir = tempfile.mkdtemp()
print("getting results for job: %s" % job_id)
for i in range(0, 5): # try 5 times
p = subprocess.run(
[
"bacalhau",
"get",
"--output-dir",
temp_dir,
job_id,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if p.returncode == 0:
break
else:
print("failed (exit %d) to get job: %s" % (p.returncode, p.stdout))

return temp_dir


# parseJobStatus parses the status of a Bacalhau job
def parseJobStatus(result: str) -> str:
if len(result) == 0:
return ""
r = json.loads(result)
if len(r) > 0:
for _, v in r[0]["Status"]["JobState"]["Nodes"].items():
state = v["Shards"]["0"]["State"]
if state == "Completed":
return state
for _, v in r[0]["Status"]["JobState"]["Nodes"].items():
state = v["Shards"]["0"]["State"]
if state != "Cancelled":
return state
return "Error"
return ""


# parseHashes splits lines from a text file into a list
def parseHashes(filename: str) -> list:
assert os.path.exists(filename)
with open(filename, "r") as f:
hashes = f.read().splitlines()
return hashes


def main(file: str, num_files: int = -1):
# Use multiprocessing to work in parallel
count = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=count) as pool:
hashes = parseHashes(file)[:num_files]
print("submitting %d jobs" % len(hashes))
job_ids = pool.map(submitJob, hashes)
assert len(job_ids) == len(hashes)

print("waiting for jobs to complete...")
while True:
job_statuses = pool.map(checkStatusOfJob, job_ids)
total_finished = sum(map(lambda x: x == "Completed", job_statuses))
if total_finished >= len(job_ids):
break
print("%d/%d jobs completed" % (total_finished, len(job_ids)))
time.sleep(2)

print("all jobs completed, saving results...")
results = pool.map(getResultsFromJob, job_ids)
print("finished saving results")

# Do something with the results
shutil.rmtree("results", ignore_errors=True)
os.makedirs("results", exist_ok=True)
for r in results:
path = os.path.join(r, "combined_results", "outputs", "*.csv")
csv_file = glob.glob(path)
for f in csv_file:
print("moving %s to results" % f)
shutil.move(f, "results")

if __name__ == "__main__":
main("hashes.txt", 10)

This code has a few interesting features:

  • Change the value in the main call to change the number of jobs to execute
  • Because all jobs complete at different times, there's a loop to check that all jobs have completed before downloading the results -- if you don't do this you'll likely see an error when trying to download the results
  • When downloading the results, the IPFS get often times out, so I wrapped that in a loop

Let's run it!

%%bash
python bacalhau.py
submitting 10 jobs
job submitted: dead536c-286a-4632-9105-d4fdf81b9863
job submitted: 42dabff1-4116-46df-9be7-5b2fc015a3fe
job submitted: 82f1f934-8acd-4e56-919f-f09928323b19
job submitted: 3cbc3334-d3a2-4980-8bad-4e4347814040
job submitted: 2d2c3b70-2739-49b0-b8af-05236a836630
job submitted: 7289b1ee-5863-4274-ae0f-4db0ac2dd3b3
job submitted: fb5ddaa5-d0ca-4c77-8bb3-a5af78a327f4
job submitted: c399b0c9-0f9c-4d74-afc0-f8cfcecc8d02
job submitted: e8d83d77-ea16-41fb-8c20-7e2e809a187b
job submitted: b6b49a8b-6145-4728-a16b-f3e657464e67
waiting for jobs to complete...
job not completed: 42dabff1-4116-46df-9be7-5b2fc015a3fe - Waiting
job not completed: fb5ddaa5-d0ca-4c77-8bb3-a5af78a327f4 - Waiting
job not completed: 82f1f934-8acd-4e56-919f-f09928323b19 - Waiting
job not completed: 7289b1ee-5863-4274-ae0f-4db0ac2dd3b3 - Waiting
job not completed: dead536c-286a-4632-9105-d4fdf81b9863 - Waiting
job not completed: 2d2c3b70-2739-49b0-b8af-05236a836630 - Waiting
job not completed: 3cbc3334-d3a2-4980-8bad-4e4347814040 - Waiting
job not completed: c399b0c9-0f9c-4d74-afc0-f8cfcecc8d02 - Waiting
job not completed: e8d83d77-ea16-41fb-8c20-7e2e809a187b - Waiting
job not completed: b6b49a8b-6145-4728-a16b-f3e657464e67 - Waiting
0/10 jobs completed
job not completed: 7289b1ee-5863-4274-ae0f-4db0ac2dd3b3 - Waiting
job not completed: c399b0c9-0f9c-4d74-afc0-f8cfcecc8d02 - Waiting
job not completed: fb5ddaa5-d0ca-4c77-8bb3-a5af78a327f4 - Waiting
job not completed: 2d2c3b70-2739-49b0-b8af-05236a836630 - Waiting
job not completed: 82f1f934-8acd-4e56-919f-f09928323b19 - Waiting
job not completed: dead536c-286a-4632-9105-d4fdf81b9863 - Waiting
job not completed: 3cbc3334-d3a2-4980-8bad-4e4347814040 - Waiting
job completed: 42dabff1-4116-46df-9be7-5b2fc015a3fe
job not completed: e8d83d77-ea16-41fb-8c20-7e2e809a187b - Waiting
job not completed: b6b49a8b-6145-4728-a16b-f3e657464e67 - Waiting
1/10 jobs completed
job completed: 42dabff1-4116-46df-9be7-5b2fc015a3fe
job completed: dead536c-286a-4632-9105-d4fdf81b9863
job completed: c399b0c9-0f9c-4d74-afc0-f8cfcecc8d02
job completed: 3cbc3334-d3a2-4980-8bad-4e4347814040
job completed: 82f1f934-8acd-4e56-919f-f09928323b19
job completed: 2d2c3b70-2739-49b0-b8af-05236a836630
job completed: fb5ddaa5-d0ca-4c77-8bb3-a5af78a327f4
job completed: 7289b1ee-5863-4274-ae0f-4db0ac2dd3b3
job completed: b6b49a8b-6145-4728-a16b-f3e657464e67
job completed: e8d83d77-ea16-41fb-8c20-7e2e809a187b
all jobs completed, saving results...
getting results for job: dead536c-286a-4632-9105-d4fdf81b9863
getting results for job: 82f1f934-8acd-4e56-919f-f09928323b19
getting results for job: 2d2c3b70-2739-49b0-b8af-05236a836630
getting results for job: 3cbc3334-d3a2-4980-8bad-4e4347814040
getting results for job: 42dabff1-4116-46df-9be7-5b2fc015a3fe
getting results for job: c399b0c9-0f9c-4d74-afc0-f8cfcecc8d02
getting results for job: 7289b1ee-5863-4274-ae0f-4db0ac2dd3b3
getting results for job: fb5ddaa5-d0ca-4c77-8bb3-a5af78a327f4
getting results for job: e8d83d77-ea16-41fb-8c20-7e2e809a187b
getting results for job: b6b49a8b-6145-4728-a16b-f3e657464e67
finished saving results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmpacgvy7wu/combined_results/outputs/transactions_00000000_00049999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmp05iwhtpp/combined_results/outputs/transactions_00050000_00099999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmp6t87xlzc/combined_results/outputs/transactions_00100000_00149999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmp75fer_gp/combined_results/outputs/transactions_00150000_00199999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmphpikbnbj/combined_results/outputs/transactions_00200000_00249999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmp8951a72p/combined_results/outputs/transactions_00250000_00299999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmp9baglzje/combined_results/outputs/transactions_00300000_00349999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmpa13amd3g/combined_results/outputs/transactions_00350000_00399999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmp4v6lqc80/combined_results/outputs/transactions_00400000_00449999.csv to results
moving /var/folders/kr/pl4p96k11b55hp5_p9l_t8kr0000gn/T/tmpqgm5ka1s/combined_results/outputs/transactions_00450000_00499999.csv to results

Hopefully the results directory contains all the combined results from the jobs we just executed. Here's we're expecting to see csv files:

%%bash
ls -l results
total 40
-rw-r--r-- 3 phil staff 55 Jan 12 13:48 transactions_00000000_00049999.csv
-rw-r--r-- 3 phil staff 387 Jan 12 13:49 transactions_00050000_00099999.csv
-rw-r--r-- 3 phil staff 388 Jan 12 13:50 transactions_00100000_00149999.csv
-rw-r--r-- 3 phil staff 426 Jan 12 13:48 transactions_00150000_00199999.csv
-rw-r--r-- 3 phil staff 393 Jan 12 13:49 transactions_00200000_00249999.csv
-rw-r--r-- 3 phil staff 384 Jan 12 13:48 transactions_00250000_00299999.csv
-rw-r--r-- 3 phil staff 421 Jan 12 13:48 transactions_00300000_00349999.csv
-rw-r--r-- 3 phil staff 390 Jan 12 13:48 transactions_00350000_00399999.csv
-rw-r--r-- 3 phil staff 347 Jan 12 13:48 transactions_00400000_00449999.csv
-rw-r--r-- 3 phil staff 386 Jan 12 13:48 transactions_00450000_00499999.csv

Success! We've now executed a bunch of jobs in parallel using Python. This is a great way to execute lots of jobs in a repeatable manner. You can alter the file above for your purposes.

Next Steps

You might also be interested in the following examples: