This directory contains examples relating to data engineering workloads. The goal is to provide a range of examples that show you how to work with Bacalhau in a variety of use cases.
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Converting from CSV to parquet or avro reduces the size of the file and allows for faster read and write speeds. With Bacalhau, you can convert your CSV files stored on ipfs or on the web without the need to download files and install dependencies locally.
In this example tutorial we will convert a CSV file from a URL to parquet format and save the converted parquet file to IPFS
To get started, you need to install the Bacalhau client, see more information here
Let's download the transactions.csv
file:
You can use the CSV files from here
Write the converter.py
Python script, that serves as a CSV converter to Avro or Parquet formats:
You can find out more information about converter.py
here
In our case:
You can skip this section entirely and directly go to Running on Bacalhau
To build your own docker container, create a Dockerfile
, which contains instructions to build your image.
See more information on how to containerize your script/app here
We will run the docker build
command to build the container:
Before running the command replace:
hub-user
with your docker hub username. If you don’t have a docker hub account follow these instructions to create docker account, and use the username of the account you created
repo-name
with the name of the container, you can name it anything you want
tag
this is not required but you can use the latest tag
In our case:
Next, upload the image to the registry. This can be done by using the Docker hub username, repo name or tag.
In our case:
With the command below, we are mounting the CSV file for transactions from IPFS
Let's look closely at the command above:
bacalhau docker run
: call to Bacalhau
-i ipfs://QmTAQMGiSv9xocaB4PUCT5nSBHrf9HZrYj21BAZ5nMTY2W
: CIDs to use on the job. Mounts them at '/inputs' in the execution.
jsacex/csv-to-arrow-or-parque
: the name and the tag of the docker image we are using
../inputs/transactions.csv
: path to input dataset
/outputs/transactions.parquet parquet
: path to the output
python3 src/converter.py
: execute the script
When a job is submitted, Bacalhau prints out the related job_id
. We store that in an environment variable so that we can reuse it later on.
The same job can be presented in the declarative format. In this case, the description will look like this:
The job description should be saved in .yaml
format, e.g. convertcsv.yaml
, and then run with the command:
Job status: You can check the status of the job using bacalhau job list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau job describe
.
Job download: You can download your job results directly by using bacalhau job get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory (results
) and downloaded our job output to be stored in that directory.
To view the file, run the following command:
Alternatively, you can do this:
If you have questions or need support or guidance, please reach out to the Bacalhau team via Slack (#general channel).
In this example tutorial, we will show you how to use Bacalhau to process images on a Landsat dataset.
Bacalhau has the unique capability of operating at a massive scale in a distributed environment. This is made possible because data is naturally sharded across the IPFS network amongst many providers. We can take advantage of this to process images in parallel.
To get started, you need to install the Bacalhau client, see more information here
To submit a workload to Bacalhau, we will use the bacalhau docker run
command. This command allows to pass input data volume with a -i ipfs://CID:path
argument just like Docker, except the left-hand side of the argument is a content identifier (CID). This results in Bacalhau mounting a data volume inside the container. By default, Bacalhau mounts the input volume at the path /inputs
inside the container.
Bacalhau also mounts a data volume to store output data. The bacalhau docker run
command creates an output data volume mounted at /outputs
. This is a convenient location to store the results of your job.
Let's look closely at the command above:
bacalhau docker run
: call to Bacalhau
-i src=s3://landsat-image-processing/*,dst=/input_images,opt=region=us-east-1
: Specifies the input data, which is stored in the S3 storage.
--entrypoint mogrify
: Overrides the default ENTRYPOINT of the image, indicating that the mogrify utility from the ImageMagick package will be used instead of the default entry.
dpokidov/imagemagick:7.1.0-47-ubuntu
: The name and the tag of the docker image we are using
-- -resize 100x100 -quality 100 -path /outputs '/input_images/*.jpg'
: These arguments are passed to mogrify and specify operations on the images: resizing to 100x100 pixels, setting quality to 100, and saving the results to the /outputs
folder.
When a job is submitted, Bacalhau prints out the related job_id
. We store that in an environment variable so that we can reuse it later on.
The same job can be presented in the declarative format. In this case, the description will look like this:
The job description should be saved in .yaml
format, e.g. image.yaml
, and then run with the command:
Job status: You can check the status of the job using bacalhau job list
:
When it says Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau job describe
:
Job download: You can download your job results directly by using bacalhau job get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory.
To view the images, open the results/outputs/
folder:
If you have questions or need support or guidance, please reach out to the Bacalhau team via Slack (#general channel).
The Surface Ocean CO₂ Atlas (SOCAT) contains measurements of the fugacity of CO₂ in seawater around the globe. But to calculate how much carbon the ocean is taking up from the atmosphere, these measurements need to be converted to the partial pressure of CO₂. We will convert the units by combining measurements of the surface temperature and fugacity. Python libraries (xarray, pandas, numpy) and the pyseaflux package facilitate this process.
In this example tutorial, our focus will be on running the oceanography dataset with Bacalhau, where we will investigate the data and convert the workload. This will enable the execution on the Bacalhau network, allowing us to leverage its distributed storage and compute resources.
To get started, you need to install the Bacalhau client, see more information here
For the purposes of this example we will use the SOCATv2022 dataset in the "Gridded" format from the SOCAT website and long-term global sea surface temperature data from NOAA - information about that dataset can be found here.
Next let's write the requirements.txt
. This file will also be used by the Dockerfile to install the dependencies.
We can see that the dataset contains latitude-longitude coordinates, the date, and a series of seawater measurements. Below is a plot of the average sea surface temperature (SST) between 2010 and 2020, where data have been collected by buoys and vessels.
To convert the data from fugacity of CO2 (fCO2) to partial pressure of CO2 (pCO2) we will combine the measurements of the surface temperature and fugacity. The conversion is performed by the pyseaflux package.
Let's create a new file called main.py
and paste the following script in it:
This code loads and processes SST and SOCAT data, combines them, computes pCO2, and saves the results for further use.
The simplest way to upload the data to IPFS is to use a third-party service to "pin" data to the IPFS network, to ensure that the data exists and is available. To do this you need an account with a pinning service like NFT.storage or Pinata. Once registered you can use their UI or API or SDKs to upload files.
This resulted in the IPFS CID of bafybeidunikexxu5qtuwc7eosjpuw6a75lxo7j5ezf3zurv52vbrmqwf6y
.
We will create a Dockerfile
and add the desired configuration to the file. These commands specify how the image will be built, and what extra requirements will be included.
We will run docker build
command to build the container:
Before running the command replace:
hub-user
with your docker hub username, If you don’t have a docker hub account follow these instructions to create a Docker account, and use the username of the account you created
repo-name
with the name of the container, you can name it anything you want
tag
this is not required but you can use the latest tag
Now you can push this repository to the registry designated by its name or tag.
For more information about working with custom containers, see the custom containers example.
Now that we have the data in IPFS and the Docker image pushed, next is to run a job using the bacalhau docker run
command
Let's look closely at the command above:
bacalhau docker run
: call to Bacalhau
--input ipfs://bafybeidunikexxu5qtuwc7eosjpuw6a75lxo7j5ezf3zurv52vbrmqwf6y
: CIDs to use on the job. Mounts them at '/inputs' in the execution.
ghcr.io/bacalhau-project/examples/socat:0.0.11
: the name and the tag of the image we are using
python main.py
: execute the script
When a job is submitted, Bacalhau prints out the related job_id
. We store that in an environment variable so that we can reuse it later on.
The same job can be presented in the declarative format. In this case, the description will look like this:
The job description should be saved in .yaml
format, e.g. oceanyaml
, and then run with the command:
Job status: You can check the status of the job using bacalhau job list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau job describe
.
Job download: You can download your job results directly by using bacalhau job get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory (results
) and downloaded our job output to be stored in that directory.
To view the file, run the following command:
If you have questions or need support or guidance, please reach out to the Bacalhau team via Slack (#general channel).
Use Bacalhau to process logs remotely across clouds and land the results on BigQuery.
This example demonstrates how to build a sophisticated distributed log processing pipeline using , Google , and . You'll learn how to process and analyze logs across distributed nodes, with progressively more advanced techniques for handling, sanitizing, and aggregating log data.
The combination of Bacalhau and BigQuery offers several key advantages:
Process logs directly where they are generated, eliminating the need for centralized collection
Scale processing across multiple nodes in different cloud providers
Leverage BigQuery's powerful analytics capabilities for processed data
Implement privacy-conscious data handling and efficient aggregation strategies
Through this example, you'll evolve from basic log collection to implementing a production-ready system with privacy protection and smart aggregation. Whether you're handling application logs, system metrics, or security events, this pipeline provides a robust foundation for distributed log analytics.
installed
A Google Cloud Project with BigQuery enabled
Service account credentials with BigQuery access
A running Bacalhau cluster with nodes across different cloud providers
Follow the
Ensure nodes are properly configured across your cloud providers (AWS, GCP, Azure). You can see more about setting up your nodes
Your nodes will have to have at least the following configurations installed to use the below jobs unchanged (in addition to anything else you may require).
DuckDB Processing: Process and analyze data using DuckDB's SQL capabilities
BigQuery Integration: Store processed results in Google BigQuery for further analysis
Make sure you have configured your config file to have the correct BigQuery project and dataset. You can do this by copying the config.yaml.example file to config.yaml and editing the values to match your BigQuery project and dataset.
Ensure your Google Cloud service account has these roles:
BigQuery Data Editor
BigQuery Job User
Have your service account key file (JSON format) ready
Configure your BigQuery settings:
Create a dataset for log analytics
Note your project ID and dataset name
We have provided some utility scripts to help you set up your BigQuery project and tables. You can run the following commands to set up your project and tables if you haven't already:
Interactive setup to set up your BigQuery project and tables. Will go through and create the necessary bigquery projects.
Creates sample tables in BigQuery for testing.
Checks the permissions of the service account specified in log_uploader_credentials.json to ensure it has the necessary permissions to write to BigQuery tables from the Bacalhau nodes.
Confirms your BigQuery project and dataset, and creates the tables if they don't exist with the correct schema. This will also zero out the tables if they already exist, so be careful! (Useful for debugging)
Distributes the credentials to /bacalhau_data on all nodes in a Bacalhau network.
Ensures the service account specified in log_uploader_credentials.json has the necessary permissions to write to BigQuery tables from the Bacalhau nodes.
One more thing to set up is the log faker on the nodes. This will generate logs for you to work with. You can run the following command to start the log faker:
Give it a couple of minutes to start up and then you can start processing the logs.
Let's walk through each stage of the demo, seeing how we can progressively improve our data processing pipeline!
Let's start by looking at the raw logs.
That will print out the logs to stdout, which we can then read from the job.
After running the job, you will see a job id, something like this:
When you run the describe
command, you will see the details of the job, including the output of the log information.
Now let's upload the raw logs to BigQuery. This is the simplest approach - just get the data there:
This will upload the python script to all the nodes which, in turn, will upload the raw logs from all nodes to BigQuery. When you check BigQuery, you'll see:
Millions of rows uploaded (depends on how many nodes you have and how long you let it run)
Each log line as raw text
No structure or parsing
To query the logs, you can use the following SQL:
Now let's do something more advanced, by parsing those logs into structured data before upload:
Your logs are now parsed into fields like:
IP Address
Timestamp
HTTP Method
Endpoint
Status Code
Response Size
To query the logs, you can use the following SQL:
Now let's handle the data responsibly by sanitizing PII (like IP addresses):
This:
Zeros out the last octet of IPv4 addresses
Zeros out the last 64 bits of IPv6 addresses
Maintains data utility while ensuring compliance
Again, to query the logs, you can use the following SQL:
Notice that the IP addresses are now sanitized.
Finally, let's be smart about what we upload:
This creates two streams:
Aggregated normal logs:
Grouped in 5-minute windows
Counts by status code
Average response sizes
Total requests per endpoint
Real-time emergency events:
Critical errors
Security alerts
System failures
To query the logs, you can use the following SQL:
Now that you've seen the power of distributed processing with Bacalhau:
Try processing your own log files
Experiment with different aggregation windows
Add your own privacy-preserving transformations
Scale to even more nodes!
Remember: The real power comes from processing data where it lives, rather than centralizing everything first. Happy distributed processing! 🚀
log_results (Main Table):
project_id
: STRING - Project identifier
region
: STRING - Deployment region
nodeName
: STRING - Node name
timestamp
: TIMESTAMP - Event time
version
: STRING - Log version
message
: STRING - Log content
sync_time
: TIMESTAMP - Upload time
remote_log_id
: STRING - Original log ID
hostname
: STRING - Source host
public_ip
: STRING - Sanitized public IP
private_ip
: STRING - Internal IP
alert_level
: STRING - Event severity
provider
: STRING - Cloud provider
log_aggregates (5-minute windows):
project_id
: STRING - Project identifier
region
: STRING - Deployment region
nodeName
: STRING - Node name
provider
: STRING - Cloud provider
hostname
: STRING - Source host
time_window
: TIMESTAMP - Aggregation window
log_count
: INT64 - Events in window
messages
: ARRAY - Event details
emergency_logs (Critical Events):
project_id
: STRING - Project identifier
region
: STRING - Deployment region
nodeName
: STRING - Node name
provider
: STRING - Cloud provider
hostname
: STRING - Source host
timestamp
: TIMESTAMP - Event time
version
: STRING - Log version
message
: STRING - Alert details
remote_log_id
: STRING - Original log ID
alert_level
: STRING - Always "EMERGENCY"
public_ip
: STRING - Sanitized public IP
private_ip
: STRING - Internal IP
Mature blockchains are difficult to analyze because of their size. Ethereum-ETL is a tool that makes it easy to extract information from an Ethereum node, but it's not easy to get working in a batch manner. It takes approximately 1 week for an Ethereum node to download the entire chain (even more in my experience) and importing and exporting data from the Ethereum node is slow.
For this example, we ran an Ethereum node for a week and allowed it to synchronize. We then ran ethereum-etl to extract the information and pinned it on Filecoin. This means that we can both now access the data without having to run another Ethereum node.
But there's still a lot of data and these types of analyses typically need repeating or refining. So it makes absolute sense to use a decentralized network like Bacalhau to process the data in a scalable way.
In this tutorial example, we will run Ethereum-ETL tool on Bacalhau to extract data from an Ethereum node.
To get started, you need to install the Bacalhau client, see more information
First let's download one of the IPFS files and inspect it locally:
You can see the full list of IPFS CIDs in the appendix at the bottom of the page.
If you don't already have the Pandas library, let's install it:
The following code inspects the daily trading volume of Ethereum for a single chunk (100,000 blocks) of data.
This is all good, but we can do better. We can use the Bacalhau client to download the data from IPFS and then run the analysis on the data in the cloud. This means that we can analyze the entire Ethereum blockchain without having to download it locally.
To run jobs on the Bacalhau network you need to package your code. In this example, I will package the code as a Docker image.
But before we do that, we need to develop the code that will perform the analysis. The code below is a simple script to parse the incoming data and produce a CSV file with the daily trading volume of Ethereum.
Next, let's make sure the file works as expected:
And finally, package the code inside a Docker image to make the process reproducible. Here I'm passing the Bacalhau default /inputs
and /outputs
directories. The /inputs
directory is where the data will be read from and the /outputs
directory is where the results will be saved to.
We've already pushed the container, but for posterity, the following command pushes this container to GHCR.
To run our analysis on the Ethereum blockchain, we will use the bacalhau docker run
command.
The job has been submitted and Bacalhau has printed out the related job id. We store that in an environment variable so that we can reuse it later on.
Bacalhau also mounts a data volume to store output data. The bacalhau docker run
command creates an output data volume mounted at /outputs
. This is a convenient location to store the results of your job.
The job description should be saved in .yaml
format, e.g. blockchain.yaml
, and then run with the command:
Copy
Job status: You can check the status of the job using bacalhau job list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau job describe
.
Job download: You can download your job results directly by using bacalhau job get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory (results
) and downloaded our job output to be stored in that directory.
To view the file, run the following command:
To view the images, we will use glob to return all file paths that match a specific pattern.
Ok, so that works. Let's scale this up! We can run the same analysis on the entire Ethereum blockchain (up to the point where I have uploaded the Ethereum data). To do this, we need to run the analysis on each of the chunks of data that we have stored on IPFS. We can do this by running the same job on each of the chunks.
See the appendix for the hashes.txt
file.
Now take a look at the job id's. You can use these to check the status of the jobs and download the results:
You might want to double-check that the jobs ran ok by doing a bacalhau job list
.
Wait until all of these jobs have been completed. And then download all the results and merge them into a single directory. This might take a while, so this is a good time to treat yourself to a nice Dark Mild. There's also been some issues in the past communicating with IPFS, so if you get an error, try again.
To view the images, we will use glob to return all file paths that match a specific pattern.
That's it! There are several years of Ethereum transaction volume data.
The following list is a list of IPFS CID's for the Ethereum data that we used in this tutorial. You can use these CID's to download the rest of the chain if you so desire. The CIDs are ordered by block number and they increase 50,000 blocks at a time. Here's a list of ordered CIDs:
is a relational table-oriented database management system and supports SQL queries for producing analytical results. It also comes with various features that are useful for data analytics.
DuckDB is suited for the following use cases:
Processing and storing tabular datasets, e.g. from CSV or Parquet files
Interactive data analysis, e.g. Joining & aggregate multiple large tables
Concurrent large changes, to multiple large tables, e.g. appending rows, adding/removing/updating columns
Large result set transfer to client
In this example tutorial, we will show how to use DuckDB with Bacalhau. The advantage of using DuckDB with Bacalhau is that you don’t need to install, there is no need to download the datasets since the datasets are are already available on the server you are looking to run the query on.
To get started, you need to install the Bacalhau client, see more information
We will be using Bacalhau setup with the standard .
We will also need to have a server with some data to run the query on. In this example, we will use a server with the Yellow Taxi Trips dataset.
If you do not already have this data on your server, you can download it using the scripts in the prep_data
directory. The command to download the data is ./prep_data/run_download_jobs.sh
- and you must have the /bacalhau_data
directory on your server.
To submit a job, run the following Bacalhau command:
This is a simple query that will return a single row with a single column - but the query will be executed in DuckDB, on a remote server.
Let's look closely at the command above:
bacalhau docker run
: call to bacalhau
-e QUERY="select 1"
: the query to execute
docker.io/bacalhauproject/duckdb:latest
: the name and the tag of the docker image we are using
When a job is submitted, Bacalhau runs the query in DuckDB, and returns the results to the client.
After we run it, when we describe
the job, we can see the following in standard output:
What if you didn't want to run everything on the command line? You can use a YAML file to define the job. In simple_query.sql
, we have a simple query that will return the number of rows in the dataset.
To run this query, we can use the following YAML file:
Though this looks like a lot of code, it is actually quite simple. The Tasks
section defines the task to run, and the InputSources
section defines the input dataset. The Publisher
section defines where the results will be published, and the Resources
section defines the resources required for the job.
All the work is done in the environment variables, which are passed to the Docker image, and handed to DuckDB to execute the query.
To run this query, we can use the following command:
This breaks down into the following steps:
bacalhau job run
: call to bacalhau
duckdb_query_job.yaml
: the YAML file we are using
--template-vars="filename=/bacalhau_data/yellow_tripdata_2020-02.parquet"
: the file to read
--template-vars="QUERY=$(cat simple_query.sql)"
: the query to execute
When we run this, we get back the following simple output:
Let's say we want to run a more complex query. In window_query_simple.sql
, we have a query that will return the average number of rides per 5 minute interval.
When we run this, we get back the following output:
The sql file needs to be run in a single line, otherwise the line breaks will cause some issues with the templating. We're working on improving this!
With this structure, you can now run virtually any query you want on remote servers, without ever having to download the data. Welcome to compute over data by Bacalhau!
If you get stuck or have questions:
The bacalhau docker run
command allows passing input data volume with --input
or -i ipfs://CID:path
argument just like Docker, except the left-hand side of the argument is a . This results in Bacalhau mounting a data volume inside the container. By default, Bacalhau mounts the input volume at the path /inputs
inside the container.
The same job can be presented in the format. In this case, the description will look like this:
If you have questions or need support or guidance, please reach out to the (#general channel).
Check out the
Open an issue in our
Join our
Many data engineering workloads consist of embarrassingly parallel workloads where you want to run a simple execution on a large number of files. In this example tutorial, we will run a simple video filter on a large number of video files.
To get started, you need to install the Bacalhau client, see more information here
The simplest way to upload the data to IPFS is to use a third-party service to "pin" data to the IPFS network, to ensure that the data exists and is available. To do this you need an account with a pinning service like NFT.storage or Pinata. Once registered you can use their UI or API or SDKs to upload files.
This resulted in the IPFS CID of Qmd9CBYpdgCLuCKRtKRRggu24H72ZUrGax5A9EYvrbC72j
.
To submit a workload to Bacalhau, we will use the bacalhau docker run
command. The command allows one to pass input data volume with a -i ipfs://CID:path
argument just like Docker, except the left-hand side of the argument is a content identifier (CID). This results in Bacalhau mounting a data volume inside the container. By default, Bacalhau mounts the input volume at the path /inputs
inside the container.
Let's look closely at the command above:
bacalhau docker run
: call to Bacalhau
-i ipfs://Qmd9CBYpdgCLuCKRtKRRggu24H72ZUrGax5A9EYvrbC72j
: CIDs to use on the job. Mounts them at '/inputs' in the execution.
linuxserver/ffmpeg
: the name of the docker image we are using to resize the videos
-- bash -c 'find /inputs -iname "*.mp4" -printf "%f\n" | xargs -I{} ffmpeg -y -i /inputs/{} -vf "scale=-1:72,setsar=1:1" /outputs/scaled_{}'
: the command that will be executed inside the container. It uses find
to locate all files with the extension ".mp4" within /inputs
and then uses ffmpeg
to resize each found file to 72 pixels in height, saving the results in the /outputs
folder.
When a job is submitted, Bacalhau prints out the related job_id
. We store that in an environment variable so that we can reuse it later on.
Bacalhau overwrites the default entrypoint so we must run the full command after the --
argument. In this line you will list all of the mp4 files in the /inputs
directory and execute ffmpeg
against each instance.
The same job can be presented in the declarative format. In this case, the description will look like this:
The job description should be saved in .yaml
format, e.g. video.yaml
, and then run with the command:
Job status: You can check the status of the job using bacalhau job list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau job describe
.
Job download: You can download your job results directly by using bacalhau job get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory (results
) and downloaded our job output to be stored in that directory.
To view the results open the results/outputs/
folder.
If you have questions or need support or guidance, please reach out to the Bacalhau team via Slack (#general channel).