Bacalhau Docs
GithubSlackBlogEnterprise
v1.6.x
  • Documentation
  • Use Cases
  • CLI & API
  • References
  • Community
v1.6.x
  • Welcome
  • Getting Started
    • How Bacalhau Works
    • Getting Started
      • Step 1: Install the Bacalhau CLI
      • Step 2: Running Your Own Job
      • Step 3: Checking on the Status of Your Job
    • Creating Your Own Bacalhau Network
      • Setting Up a Cluster on Amazon Web Services (AWS) with Terraform 🚀
      • Setting Up a Cluster on Google Cloud Platform (GCP) With Terraform 🚀
      • Setting Up a Cluster on Azure with Terraform 🚀
    • Hardware Setup
    • Container Onboarding
      • Docker Workloads
      • WebAssembly (Wasm) Workloads
  • Setting Up
    • Running Nodes
      • Node Onboarding
      • GPU Installation
      • Job selection policy
      • Access Management
      • Node persistence
      • Configuring Your Input Sources
      • Configuring Transport Level Security
      • Limits and Timeouts
      • Test Network Locally
      • Bacalhau WebUI
      • Private IPFS Network Setup
    • Workload Onboarding
      • Container
        • Docker Workload Onboarding
        • WebAssembly (Wasm) Workloads
        • Bacalhau Docker Image
        • How To Work With Custom Containers in Bacalhau
      • Python
        • Building and Running Custom Python Container
        • Running Pandas on Bacalhau
        • Running a Python Script
        • Running Jupyter Notebooks on Bacalhau
        • Scripting Bacalhau with Python
      • R (language)
        • Building and Running your Custom R Containers on Bacalhau
        • Running a Simple R Script on Bacalhau
      • Run CUDA programs on Bacalhau
      • Running a Prolog Script
      • Reading Data from Multiple S3 Buckets using Bacalhau
      • Running Rust programs as WebAssembly (WASM)
      • Generate Synthetic Data using Sparkov Data Generation technique
    • Networking Instructions
      • Accessing the Internet from Jobs
      • Utilizing NATS.io within Bacalhau
    • GPU Workloads Setup
    • Automatic Update Checking
    • Marketplace Deployments
      • Google Cloud Marketplace
    • Inter-Nodes TLS
  • Guides
    • Configuration Management
    • Write a config.yaml
    • Write a SpecConfig
    • Using Labels and Constraints
  • Examples
    • Table of Contents for Bacalhau Examples
    • Data Engineering
      • Using Bacalhau with DuckDB
      • Ethereum Blockchain Analysis with Ethereum-ETL and Bacalhau
      • Convert CSV To Parquet Or Avro
      • Simple Image Processing
      • Oceanography - Data Conversion
      • Video Processing
      • Bacalhau and BigQuery
    • Data Ingestion
      • Copy Data from URL to Public Storage
      • Pinning Data
      • Running a Job over S3 data
    • Model Inference
      • EasyOCR (Optical Character Recognition) on Bacalhau
      • Running Inference on Dolly 2.0 Model with Hugging Face
      • Speech Recognition using Whisper
      • Stable Diffusion on a GPU
      • Stable Diffusion on a CPU
      • Object Detection with YOLOv5 on Bacalhau
      • Generate Realistic Images using StyleGAN3 and Bacalhau
      • Stable Diffusion Checkpoint Inference
      • Running Inference on a Model stored on S3
    • Model Training
      • Training Pytorch Model with Bacalhau
      • Training Tensorflow Model
      • Stable Diffusion Dreambooth (Finetuning)
    • Molecular Dynamics
      • Running BIDS Apps on Bacalhau
      • Coresets On Bacalhau
      • Genomics Data Generation
      • Gromacs for Analysis
      • Molecular Simulation with OpenMM and Bacalhau
    • Systems Engineering
      • Ad-hoc log query using DuckDB
  • References
    • Jobs Guide
      • Job Specification
        • Job Types
        • Task Specification
          • Engines
            • Docker Engine Specification
            • WebAssembly (WASM) Engine Specification
          • Publishers
            • IPFS Publisher Specification
            • Local Publisher Specification
            • S3 Publisher Specification
          • Sources
            • IPFS Source Specification
            • Local Source Specification
            • S3 Source Specification
            • URL Source Specification
          • Network Specification
          • Input Source Specification
          • Resources Specification
          • ResultPath Specification
        • Constraint Specification
        • Labels Specification
        • Meta Specification
      • Job Templates
      • Queuing & Timeouts
        • Job Queuing
        • Timeouts Specification
      • Job Results
        • State
    • CLI Guide
      • Single CLI commands
        • Agent
          • Agent Overview
          • Agent Alive
          • Agent Node
          • Agent Version
        • Config
          • Config Overview
          • Config Auto-Resources
          • Config Default
          • Config List
          • Config Set
        • Job
          • Job Overview
          • Job Describe
          • Job Executions
          • Job History
          • Job List
          • Job Logs
          • Job Run
          • Job Stop
        • Node
          • Node Overview
          • Node Approve
          • Node Delete
          • Node List
          • Node Describe
          • Node Reject
      • Command Migration
    • API Guide
      • Bacalhau API overview
      • Best Practices
      • Agent Endpoint
      • Orchestrator Endpoint
      • Migration API
    • Node Management
    • Authentication & Authorization
    • Database Integration
    • Debugging
      • Debugging Failed Jobs
      • Debugging Locally
    • Running Locally In Devstack
    • Setting up Dev Environment
  • Help & FAQ
    • Bacalhau FAQs
    • Glossary
    • Release Notes
      • v1.5.0 Release Notes
      • v1.4.0 Release Notes
  • Integrations
    • Apache Airflow Provider for Bacalhau
    • Lilypad
    • Bacalhau Python SDK
    • Observability for WebAssembly Workloads
  • Community
    • Social Media
    • Style Guide
    • Ways to Contribute
Powered by GitBook
LogoLogo

Use Cases

  • Distributed ETL
  • Edge ML
  • Distributed Data Warehousing
  • Fleet Management

About Us

  • Who we are
  • What we value

News & Blog

  • Blog

Get Support

  • Request Enterprise Solutions

Expanso (2025). All Rights Reserved.

On this page
  • Features
  • Requirements
  • Installation
  • From pypi
  • From source
  • Worked example
  • Setup
  • Example DAG: chaining jobs
  • Run it
  • Development
  • Unit tests

Was this helpful?

Export as PDF
  1. Integrations

Apache Airflow Provider for Bacalhau

Previousv1.4.0 Release NotesNextLilypad

Was this helpful?

This is bacalhau-airflow, a Python package that integrates with . The benefit is twofold. First, thanks to this package you can now write complex pipelines for Bacalhau. For instance, jobs can communicate their output's CIDs to downstream jobs, that can use those as inputs. Second, Apache Airflow provides a solid solution to reliably orchestrate your DAGs.

You may try this out using a local devstack until https://github.com/bacalhau-project/bacalhau/issues/2038 has been fixed. Please set the following environment variables AIRFLOW_VAR_BACALHAU_API_HOST, AIRFLOW_VAR_BACALHAU_API_PORT.

Features

  • Create Airflow tasks that run on Bacalhau (via custom operator!)

  • Support for sharded jobs: output shards can be passed downstream (via )

  • Coming soon...

    • Lineage (see )

    • Various working code examples

    • Hosting instructions

Requirements

  • Python 3.8+

  • apache-airflow 2.3+

Installation

The integration automatically registers itself for Airflow 2.3+ if it's installed on the Airflow worker's Python.

From pypi

pip install bacalhau-airflow

From source

Clone the public repository:

git clone https://github.com/bacalhau-project/bacalhau/

Once you have a copy of the source, you can install it with:

cd integration/airflow/
pip install .

Worked example

Setup

If you're just curious and want to give it a try on your local machine, please follow the steps below.

First, install and initialize Airflow:

$ pip install apache-airflow
export AIRFLOW_HOME=~/airflow
$ airflow db init

My config file looks like what follows:

[core]
dags_folder = /Users/enricorotundo/bacalhau/integration/airflow/example_dags
...

Optionally, to reduce clutter in the Airflow UI, you could disable the loading of the default example DAGs by setting load_examples to False.

Finally, we can launch Airflow locally:

airflow standalone

Example DAG: chaining jobs

While Airflow's pinwheel is warming up in the background, let's take a look at the hello_world.py breakdown below.

In brief, the first task of this DAG prints out "Hello World" to stdout, then automatically pipe its output into the subsequent task as an input file. The second task will simply print out the content of its input file.

All you need to import from this package is the BacalhauSubmitJobOperator. It allows you to submit a job spec comprised of the usual fields such as engine, image, etc.

from datetime import datetime
from airflow import DAG
from bacalhau_airflow.operators import BacalhauSubmitJobOperator

This operator supports chaining multiple jobs without the need to manually pass any CID along, in this regards a special note goes to the input_volumes parameter (see task_2 below). Every time the operator runs a task, it stores a comma-separated string with the output shard-CIDs in an internal key-value store under the cids key. Thus, downstream tasks can read in those CIDs via the input_volumes parameter.

with DAG("bacalhau-helloworld-dag", start_date=datetime(2023, 3, 1)) as dag:
    task_1 = BacalhauSubmitJobOperator(
        task_id="task_1",
        api_version="V1beta1",
        job_spec=dict(
            engine="Docker",
            verifier="Noop",
            publisher="IPFS",
            docker=dict(
                image="ubuntu",
                entrypoint=["echo", "Hello World"],
            ),
            deal=dict(concurrency=1, confidence=0, min_bids=0),
        ),
    )

    task_2 = BacalhauSubmitJobOperator(
        task_id="task_2",
        api_version="V1beta1",
        input_volumes=[
            "{{ task_instance.xcom_pull(task_ids='task_1', key='cids') }}:/task_1_output",
        ],
        job_spec=dict(
            engine="Docker",
            verifier="Noop",
            publisher="IPFS",
            docker=dict(
                image="ubuntu",
                entrypoint=["cat", "/task_1_output/stdout"],
            ),
            deal=dict(concurrency=1, confidence=0, min_bids=0),
        ),
    )

    task_1 >> task_2

Run it

Now that we understand what the example DAG is supposed to do, let's just run it! Head over to http://0.0.0.0:8080, where Airflow UI is being served. The screenshot below shows our hello world has been loaded correctly.

When you inspect a DAG, Airflow will render a graph depicting a color-coded topology (see image below). For active (i.e. running) pipelines, this will be useful to oversee the status of each task.

To trigger a DAG please enable the toggle shown below.

When all tasks have been completed, we want to fetch the output of our pipeline. To do so we need to retrieve the job-id of the last task. Click on a green box in the task_2 line and then open the XCom tab.

Here we find the bacalhau_job_id. Select that value and copy it into your clipboard.

Lastly, we can use the bacalhau cli get command to fetch the output data as follows:

$ bacalhau job get 8fdab73b-00fd-4d13-941c-8ba002f8178d
Fetching results of job '8fdab73b-00fd-4d13-941c-8ba002f8178d'...
...
Results for job '8fdab73b-00fd-4d13-941c-8ba002f8178d' have been written to...
/tmp/dag-example/job-8fdab73b

$ cat /tmp/dag-example/job-8fdab73b/combined_results/stdout
Hello World

Development

pip install -r dev-requirements.txt

Unit tests

tox

You can also skip using tox and run pytest on your own dev environment.

In a production environment you may want to follow the or pick one of the suggested .

Then, we need to point Airflow to the absolute path of the folder where your pipelines live. To do that we edit the dags_folder field in ${AIRFLOW_HOME}/airflow.cfg file. In this example I'm going to use ; for the sake of completeness, the next section will walk you through the actual code.

All you need to do is (1) use the (in curly brackets) to specify the "sender" task ids and the cids key (e.g. {{ task_instance.xcom_pull(task_ids='task_1', key='cids') }}), (2) define a target mount point separated by a colon (e.g. :/task_1_output).

Lastly, we define task dependencies simply with task_1 >> task_2. To learn more about .

That's all folks .

🌈
Bacalhau
Apache Airflow
XComs
OpenLineage proof-of-concept integration here
bacalhau-sdk 0.3.25+
official Airflow's instructions
hosted solutions
the hello_world.py DAG shipped with this repository
XComs syntax
Airflow's DAG syntax please check out this page