Scripting Bacalhau with Python

Bacalhau allows you to easily execute batch jobs via the CLI. But sometimes you need to do more than that. You might need to execute a script that requires user input, or you might need to execute a script that requires a lot of parameters. In any case, you probably want to execute your jobs in a repeatable manner.

This example demonstrates a simple Python script that is able to orchestrate the execution of lots of jobs in a repeatable manner.


Running Python script in Bacalhau


To get started, you need to install the Bacalhau client, see more information here

Executing Bacalhau Jobs with Python Scripts

To demonstrate this example, I will use the data generated from the ethereum analysis example. This produced a list of hashes that I will iterate over and execute a job for each one.

%%writefile hashes.txt

Now let's run the following script. You can execute this script anywhere with python

import json, glob, os, multiprocessing, shutil, subprocess, tempfile, time

# checkStatusOfJob checks the status of a Bacalhau job
def checkStatusOfJob(job_id: str) -> str:
assert len(job_id) > 0
p =
["bacalhau", "list", "--output", "json", "--id-filter", job_id],
r = parseJobStatus(p.stdout)
if r == "":
print("job status is empty! %s" % job_id)
elif r == "Completed":
print("job completed: %s" % job_id)
print("job not completed: %s - %s" % (job_id, r))

return r

# submitJob submits a job to the Bacalhau network
def submitJob(cid: str) -> str:
assert len(cid) > 0
p =
"ipfs://" + cid + ":/inputs/data.tar.gz",
if p.returncode != 0:
print("failed (%d) job: %s" % (p.returncode, p.stdout))
job_id = p.stdout.strip()
print("job submitted: %s" % job_id)

return job_id

# getResultsFromJob gets the results from a Bacalhau job
def getResultsFromJob(job_id: str) -> str:
assert len(job_id) > 0
temp_dir = tempfile.mkdtemp()
print("getting results for job: %s" % job_id)
for i in range(0, 5): # try 5 times
p =
if p.returncode == 0:
print("failed (exit %d) to get job: %s" % (p.returncode, p.stdout))

return temp_dir

# parseJobStatus parses the status of a Bacalhau job
def parseJobStatus(result: str) -> str:
if len(result) == 0:
return ""
r = json.loads(result)
if len(r) > 0:
return r[0]["State"]["State"]
return ""

# parseHashes splits lines from a text file into a list
def parseHashes(filename: str) -> list:
assert os.path.exists(filename)
with open(filename, "r") as f:
hashes =
return hashes

def main(file: str, num_files: int = -1):
# Use multiprocessing to work in parallel
count = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=count) as pool:
hashes = parseHashes(file)[:num_files]
print("submitting %d jobs" % len(hashes))
job_ids =, hashes)
assert len(job_ids) == len(hashes)

print("waiting for jobs to complete...")
while True:
job_statuses =, job_ids)
total_finished = sum(map(lambda x: x == "Completed", job_statuses))
if total_finished >= len(job_ids):
print("%d/%d jobs completed" % (total_finished, len(job_ids)))

print("all jobs completed, saving results...")
results =, job_ids)
print("finished saving results")

# Do something with the results
shutil.rmtree("results", ignore_errors=True)
os.makedirs("results", exist_ok=True)
for r in results:
path = os.path.join(r, "outputs", "*.csv")
csv_file = glob.glob(path)
for f in csv_file:
print("moving %s to results" % f)
shutil.move(f, "results")

if __name__ == "__main__":
main("hashes.txt", 10)

This code has a few interesting features:

  • Change the value in the main call to change the number of jobs to execute
  • Because all jobs are complete at different times, there's a loop to check that all jobs have been completed before downloading the results -- if you don't do this you'll likely see an error when trying to download the results
  • When downloading the results, the IPFS get often times out, so I wrapped that in a loop

Let's run it!


Hopefully, the results directory contains all the combined results from the jobs we just executed. Here's we're expecting to see CSV files:

ls -l results

Success! We've now executed a bunch of jobs in parallel using Python. This is a great way to execute lots of jobs in a repeatable manner. You can alter the file above for your purposes.

Next Steps

