Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This directory contains examples relating to data engineering workloads. The goal is to provide a range of examples that show you how to work with Bacalhau in a variety of use cases.
Ethereum Blockchain Analysis with Ethereum-ETL and Bacalhau
Mature blockchains are difficult to analyze because of their size. Ethereum-ETL is a tool that makes it easy to extract information from an Ethereum node, but it's not easy to get working in a batch manner. It takes approximately 1 week for an Ethereum node to download the entire chain (even more in my experience) and importing and exporting data from the Ethereum node is slow.
For this example, we ran an Ethereum node for a week and allowed it to synchronize. We then ran ethereum-etl to extract the information and pinned it on Filecoin. This means that we can both now access the data without having to run another Ethereum node.
But there's still a lot of data and these types of analyses typically need repeating or refining. So it makes absolute sense to use a decentralized network like Bacalhau to process the data in a scalable way.
Running Ethereum-etl tool on Bacalhau to extract Ethereum node.
To get started, you need to install the Bacalhau client, see more information here
First let's download one of the IPFS files and inspect it locally. You can see the full list of IPFS CIDs in the appendix.
The following code inspects the daily trading volume of Ethereum for a single chunk (100,000 blocks) of data.
This is all good, but we can do better. We can use the Bacalhau client to download the data from IPFS and then run the analysis on the data in the cloud. This means that we can analyze the entire Ethereum blockchain without having to download it locally.
To run jobs on the Bacalhau network you need to package your code. In this example, I will package the code as a Docker image.
But before we do that, we need to develop the code that will perform the analysis. The code below is a simple script to parse the incoming data and produce a CSV file with the daily trading volume of Ethereum.
Next, let's make sure the file works as expected...
And finally, package the code inside a Docker image to make the process reproducible. Here I'm passing the Bacalhau default /inputs
and /outputs
directories. The /inputs
directory is where the data will be read from and the /outputs
directory is where the results will be saved to.
We've already pushed the container, but for posterity, the following command pushes this container to GHCR.
To run our analysis on the Ethereum blockchain, we will use the bacalhau docker run
command.
The job has been submitted and Bacalhau has printed out the related job id. We store that in an environment variable so that we can reuse it later on.
The bacalhau docker run
command allows to pass input data volume with a -i ipfs://CID:path
argument just like Docker, except the left-hand side of the argument is a content identifier (CID). This results in Bacalhau mounting a data volume inside the container. By default, Bacalhau mounts the input volume at the path /inputs
inside the container.
Bacalhau also mounts a data volume to store output data. The bacalhau docker run
command creates an output data volume mounted at /outputs
. This is a convenient location to store the results of your job.
Job status: You can check the status of the job using bacalhau list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau describe
.
Job download: You can download your job results directly by using bacalhau get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory.
After the download has finished you should see the following contents in the results directory.
To view the file, run the following command:
To view the images, we will use glob to return all file paths that match a specific pattern.
Ok, so that works. Let's scale this up! We can run the same analysis on the entire Ethereum blockchain (up to the point where I have uploaded the Ethereum data). To do this, we need to run the analysis on each of the chunks of data that we have stored on IPFS. We can do this by running the same job on each of the chunks.
See the appendix for the hashes.txt
file.
Now take a look at the job id's. You can use these to check the status of the jobs and download the results. You might want to double-check that the jobs ran ok by doing a bacalhau list
.
Wait until all of these jobs have been completed:
And then download all the results and merge them into a single directory. This might take a while, so this is a good time to treat yourself to a nice Dark Mild. There's also been some issues in the past communicating with IPFS, so if you get an error, try again.
To view the images, we will use glob to return all file paths that match a specific pattern.
That's it! There are several years of Ethereum transaction volume data.
The following list is a list of IPFS CID's for the Ethereum data that we used in this tutorial. You can use these CID's to download the rest of the chain if you so desire. The CIDs are ordered by block number and they increase 50,000 blocks at a time. Here's a list of ordered CIDs:
In the course of writing this example, I had to set up an Ethereum node. It was a slow and painful process so I thought I would share the steps I took to make it easier for others.
Geth supports Ubuntu by default, so use that when creating a VM. Use Ubuntu 22.04 LTS.
Mount the disk:
Run as a new user:
Check they are running:
Watch the logs:
Prysm will need to finish synchronizing before geth will start synchronizing.
In Prysm you will see lots of log messages saying: Synced new block
, and in Geth you will see: Syncing beacon headers downloaded=11,920,384 left=4,054,753 eta=2m25.903s
. This tells you how long it will take to sync the beacons. Once that's done, get will start synchronizing the blocks.
Bring up the Ethereum javascript console with:
Once the block sync has started, eth.syncing
will return values. Before it starts, this value will be false
.
Note that by default, geth will perform a fast sync, without downloading the full blocks. The syncmode=full
flag forces geth to do a full sync. If we didn't do this, then we wouldn't be able to back up the data properly.
Tar and compress the directories to make them easier to upload:
Export your Web3.storage JWT API key as an environment variable called TOKEN
:
Converting from CSV to parquet or avro reduces the size of the file and allows for faster read and write speeds. With Bacalhau, you can convert your CSV files stored on ipfs or on the web without the need to download files and install dependencies locally.
In this example tutorial we will convert a CSV file from a URL to parquet format and save the converted parquet file to IPFS
Converting CSV stored in public storage with Bacalhau
To get started, you need to install the Bacalhau client, see more information here
Installing dependencies
Run the following commands:
Viewing the parquet file
:::info You can skip this section entirely and directly go to running on Bacalhau :::
To build your own docker container, create a Dockerfile
, which contains instructions to build your image.
:::info See more information on how to containerize your script/apphere :::
We will run docker build
command to build the container;
Before running the command replace;
hub-user with your docker hub username, If you don’t have a docker hub account follow these instructions to create docker account, and use the username of the account you created
repo-name with the name of the container, you can name it anything you want
tag this is not required but you can use the latest tag
In our case:
Next, upload the image to the registry. This can be done by using the Docker hub username, repo name or tag.
In our case:
To submit a job, we are going to either mount the script from an IPFS or from an URL.
With the command below, we are gmounting the CSV file for transactions from IPFS
Let's look closely at the command above:
bacalhau docker run
: call to bacalhau
-i ipfs://QmTAQMGiSv9xocaB4PUCT5nSBHrf9HZrYj21BAZ5nMTY2W
: CIDs to use on the job. Mounts them at '/inputs' in the execution.
jsacex/csv-to-arrow-or-parque
: the name and the tag of the docker image we are using
../inputs/movies.csv
: path to input dataset
../outputs/movies.parquet parquet
: path to the output
python3 src/converter.py
: execute the script
To mount the CSV file from a URL
Let's look closely at the command above:
bacalhau docker run
: call to bacalhau
-i https://raw.githubusercontent.com/bacalhau-project/csv_to_avro_or_parquet/master/movies.csv
: URL: path of the input data volumes downloaded from a URL source
jsacex/csv-to-arrow-or-parque
: the name and the tag of the docker image we are using
../inputs/movies.csv
: path to the input dataset
../outputs/movies.parquet parquet
: path to the output
python3 src/converter.py
: execute the script
When a job is submitted, Bacalhau prints out the related job_id
. We store that in an environment variable so that we can reuse it later on.
Job status: You can check the status of the job using bacalhau list
.
:::note Replace the {JOB_ID}
with your generated ID. :::
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau describe
.
Job download: You can download your job results directly by using bacalhau get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory.
To view the file, run the following command:
Alternatively, you can do this.
For questions, and feedback, please reach out in our forum
Oceanography data conversion with Bacalhau
The Surface Ocean CO₂ Atlas (SOCAT) contains measurements of the fugacity of CO2 in seawater around the globe. But to calculate how much carbon the ocean is taking up from the atmosphere, these measurements need to be converted to the partial pressure of CO2. We will convert the units by combining measurements of the surface temperature and fugacity. Python libraries (xarray, pandas, numpy) and the pyseaflux package facilitate this process.
In this example tutorial, we will investigate the data and convert the workload so that it can be executed on the Bacalhau network, to take advantage of the distributed storage and compute resources.
Running oceanography dataset with Bacalhau
To get started, you need to install the Bacalhau client, see more information here
The raw data is available on the SOCAT website. We will use the SOCATv2021 dataset in the "Gridded" format to perform this calculation. First, let's take a quick look at some data:
Next let's write the requirements.txt
and install the dependencies. This file will also be used by the Dockerfile to install the dependencies.
Installing dependencies
We can see that the dataset contains lat-long coordinates, the date, and a series of seawater measurements. Above you can see a plot of the average surface sea temperature (sst) between 2010-2020, where recording buoys and boats have traveled.
To convert the data from fugacity of CO2 (fCO2) to partial pressure of CO2 (pCO2) we will combine the measurements of the surface temperature and fugacity. The conversion is performed by the pyseaflux package.
To execute this workload on the Bacalhau network we need to perform three steps:
Upload the data to IPFS
Create a docker image with the code and dependencies
Run a Bacalhau job with the docker image using the IPFS data
The first step is to upload the data to IPFS. The simplest way to do this is to use a third-party service to "pin" data to the IPFS network, to ensure that the data exists and is available. To do this you need an account with a pinning service like web3.storage or Pinata. Once registered you can use their UI or API or SDKs to upload files.
For the purposes of this example:
Downloaded the latest monthly data from the SOCAT website
Pinned the data to IPFS
This resulted in the IPFS CID of bafybeidunikexxu5qtuwc7eosjpuw6a75lxo7j5ezf3zurv52vbrmqwf6y
.
We will create a Dockerfile
and add the desired configuration to the file. These commands specify how the image will be built, and what extra requirements will be included.
We will run docker build
command to build the container;
Before running the command replace;
hub-user with your docker hub username, If you don’t have a docker hub account follow these instructions to create a Docker account, and use the username of the account you created
repo-name with the name of the container, you can name it anything you want
tag this is not required but you can use the latest tag
Now you can push this repository to the registry designated by its name or tag.
:::tip For more information about working with custom containers, see the custom containers example. :::
Now that we have the data in IPFS and the Docker image pushed, next is to run a job using the bacalhau docker run
command
When a job is submitted, Bacalhau prints out the related job_id
. We store that in an environment variable so that we can reuse it later on.
Job status: You can check the status of the job using bacalhau list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau describe
.
Job download: You can download your job results directly by using bacalhau get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory.
To view the file, run the following command:
For questions, and feedback, please reach out in our forum
How to process images stored in IPFS with Bacalhau
In this example tutorial, we will show you how to use Bacalhau to process images on a Landsat dataset.
Bacalhau has the unique capability of operating at a massive scale in a distributed environment. This is made possible because data is naturally sharded across the IPFS network amongst many providers. We can take advantage of this to process images in parallel.
Processing of images from a dataset using Bacalhau
To get started, you need to install the Bacalhau client, see more information here
To submit a workload to Bacalhau, we will use the bacalhau docker run
command.
The job has been submitted and Bacalhau has printed out the related job id. We store that in an environment variable so that we can reuse it later on.
The bacalhau docker run
command allows to pass input data volume with a -i ipfs://CID:path
argument just like Docker, except the left-hand side of the argument is a content identifier (CID). This results in Bacalhau mounting a data volume inside the container. By default, Bacalhau mounts the input volume at the path /inputs
inside the container.
Bacalhau also mounts a data volume to store output data. The bacalhau docker run
command creates an output data volume mounted at /outputs
. This is a convenient location to store the results of your job.
Job status: You can check the status of the job using bacalhau list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau describe
.
Job download: You can download your job results directly by using bacalhau get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory.
After the download has finished you should see the following contents in results directory.
To view the file, run the following command:
To view the images, we will use glob to return all file paths that match a specific pattern.
For questions, feedback, please reach out in our forum
Parallel Video Resizing via File Sharding
Many data engineering workloads consist of embarrassingly parallel workloads where you want to run a simple execution on a large number of files. In this example tutorial, we will run a simple video filter on a large number of video files.
Running video files with Bacalhau
To get started, you need to install the Bacalhau client, see more information here
To submit a workload to Bacalhau, we will use the bacalhau docker run
command.
The job has been submitted and Bacalhau has printed out the related job id. We store that in an environment variable so that we can reuse it later on.
The bacalhau docker run
command allows one to pass input data volume with a -i ipfs://CID:path
argument just like Docker, except the left-hand side of the argument is a content identifier (CID). This results in Bacalhau mounting a data volume inside the container. By default, Bacalhau mounts the input volume at the path /inputs
inside the container.
We created a 72px wide video thumbnails for all the videos in the inputs
directory. The outputs
directory will contain the thumbnails for each video. We will shard by 1 video per job, and use the linuxserver/ffmpeg
container to resize the videos.
:::tip Bacalhau overwrites the default entrypoint so we must run the full command after the --
argument. In this line you will list all of the mp4 files in the /inputs
directory and execute ffmpeg
against each instance. :::
Job status: You can check the status of the job using bacalhau list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau describe
.
Job download: You can download your job results directly by using bacalhau get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory.
After the download has finished you should see the following contents in the results directory.
To view the file, run the following command:
To view the videos, we will use glob to return all file paths that match a specific pattern.
<video src={require('./scaled_Bird_flying_over_the_lake.mp4').default} controls > Your browser does not support the video
element. <video src={require('./scaled_Calm_waves_on_a_rocky_sea_gulf.mp4').default} controls > Your browser does not support the video
element. <video src={require('./scaled_Prominent_Late_Gothic_styled_architecture.mp4').default} controls > Your browser does not support the video
element.
For questions, and feedback, please reach out in our forum
DuckDB is a relational table-oriented database management system that supports SQL queries for producing analytical results. It also comes with various features that are useful for data analytics.
DuckDB is suited for the following use cases:
Processing and storing tabular datasets, e.g. from CSV or Parquet files
Interactive data analysis, e.g. Joining & aggregate multiple large tables
Concurrent large changes, to multiple large tables, e.g. appending rows, adding/removing/updating columns
Large result set transfer to client
In this example tutorial, we will show how to use DuckDB with Bacalhau. The advantage of using DuckDB with Bacalhau is that you don’t need to install, and there is no need to download the datasets since the datasets are already there on IPFS or on the web.
How to run a relational database (like DUCKDB) on Bacalhau
To get started, you need to install the Bacalhau client, see more information here
You can skip this entirely and directly go to running on Bacalhau.
If you want any additional dependencies to be installed along with DuckDB, you need to build your own container.
To build your own docker container, create a Dockerfile
, which contains instructions to build your DuckDB docker container.
See more information on how to containerize your script/app here
We will run docker build
command to build the container;
Before running the command replace;
hub-user with your docker hub username, If you don’t have a docker hub account follow these instructions to create docker account, and use the username of the account you created
repo-name with the name of the container, you can name it anything you want
tag this is not required but you can use the latest tag
In our case
Next, upload the image to the registry. This can be done by using the Docker hub username, repo name or tag.
In our case
After the repo image has been pushed to Docker Hub, we can now use the container for running on Bacalhau. To submit a job, run the following Bacalhau command:
Let's look closely at the command above:
bacalhau docker run
: call to bacalhau
davidgasquez/datadex:v0.2.0
: the name and the tag of the docker image we are using
/inputs/
: path to input dataset
'duckdb -s "select 1"'
: execute DuckDB
When a job is submitted, Bacalhau prints out the related job_id
. We store that in an environment variable so that we can reuse it later on.
Job status: You can check the status of the job using bacalhau list
.
When it says Published
or Completed
, that means the job is done, and we can get the results.
Job information: You can find out more information about your job by using bacalhau describe
.
Job download: You can download your job results directly by using bacalhau get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory.
Each job creates 3 subfolders: the combined_results,per_shard files, and the raw directory. To view the file, run the following command:
Below is the bacalhau docker run
command to to run arbitrary SQL commands over the yellow taxi trips dataset
Let's look closely at the command above:
bacalhau docker run
: call to bacalhau
-i ipfs://bafybeiejgmdpwlfgo3dzfxfv3cn55qgnxmghyv7vcarqe3onmtzczohwaq \
: CIDs to use on the job. Mounts them at '/inputs' in the execution.
davidgasquez/duckdb:latest
: the name and the tag of the docker image we are using
/inputs
: path to input dataset
duckdb -s
: execute DuckDB
When a job is submitted, Bacalhau prints out the related job_id
. We store that in an environment variable so that we can reuse it later on.
Job status: You can check the status of the job using bacalhau list
.
Job information: You can find out more information about your job by using bacalhau describe
.
Job download: You can download your job results directly by using bacalhau get
. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory.
Each job creates 3 subfolders: the combined_results,per_shard files, and the raw directory. To view the file, run the following command:
For questions, and feedback, please reach out in our forum