Bacalhau Docs
GithubSlackBlogEnterprise
v1.6.x
  • Documentation
  • Use Cases
  • CLI & API
  • References
  • Community
v1.6.x
  • Welcome
  • Getting Started
    • How Bacalhau Works
    • Getting Started
      • Step 1: Install the Bacalhau CLI
      • Step 2: Running Your Own Job
      • Step 3: Checking on the Status of Your Job
    • Creating Your Own Bacalhau Network
      • Setting Up a Cluster on Amazon Web Services (AWS) with Terraform 🚀
      • Setting Up a Cluster on Google Cloud Platform (GCP) With Terraform 🚀
      • Setting Up a Cluster on Azure with Terraform 🚀
    • Hardware Setup
    • Container Onboarding
      • Docker Workloads
      • WebAssembly (Wasm) Workloads
  • Setting Up
    • Running Nodes
      • Node Onboarding
      • GPU Installation
      • Job selection policy
      • Access Management
      • Node persistence
      • Configuring Your Input Sources
      • Configuring Transport Level Security
      • Limits and Timeouts
      • Test Network Locally
      • Bacalhau WebUI
      • Private IPFS Network Setup
    • Workload Onboarding
      • Container
        • Docker Workload Onboarding
        • WebAssembly (Wasm) Workloads
        • Bacalhau Docker Image
        • How To Work With Custom Containers in Bacalhau
      • Python
        • Building and Running Custom Python Container
        • Running Pandas on Bacalhau
        • Running a Python Script
        • Running Jupyter Notebooks on Bacalhau
        • Scripting Bacalhau with Python
      • R (language)
        • Building and Running your Custom R Containers on Bacalhau
        • Running a Simple R Script on Bacalhau
      • Run CUDA programs on Bacalhau
      • Running a Prolog Script
      • Reading Data from Multiple S3 Buckets using Bacalhau
      • Running Rust programs as WebAssembly (WASM)
      • Generate Synthetic Data using Sparkov Data Generation technique
    • Networking Instructions
      • Accessing the Internet from Jobs
      • Utilizing NATS.io within Bacalhau
    • GPU Workloads Setup
    • Automatic Update Checking
    • Marketplace Deployments
      • Google Cloud Marketplace
    • Inter-Nodes TLS
  • Guides
    • Configuration Management
    • Write a config.yaml
    • Write a SpecConfig
    • Using Labels and Constraints
  • Examples
    • Table of Contents for Bacalhau Examples
    • Data Engineering
      • Using Bacalhau with DuckDB
      • Ethereum Blockchain Analysis with Ethereum-ETL and Bacalhau
      • Convert CSV To Parquet Or Avro
      • Simple Image Processing
      • Oceanography - Data Conversion
      • Video Processing
      • Bacalhau and BigQuery
    • Data Ingestion
      • Copy Data from URL to Public Storage
      • Pinning Data
      • Running a Job over S3 data
    • Model Inference
      • EasyOCR (Optical Character Recognition) on Bacalhau
      • Running Inference on Dolly 2.0 Model with Hugging Face
      • Speech Recognition using Whisper
      • Stable Diffusion on a GPU
      • Stable Diffusion on a CPU
      • Object Detection with YOLOv5 on Bacalhau
      • Generate Realistic Images using StyleGAN3 and Bacalhau
      • Stable Diffusion Checkpoint Inference
      • Running Inference on a Model stored on S3
    • Model Training
      • Training Pytorch Model with Bacalhau
      • Training Tensorflow Model
      • Stable Diffusion Dreambooth (Finetuning)
    • Molecular Dynamics
      • Running BIDS Apps on Bacalhau
      • Coresets On Bacalhau
      • Genomics Data Generation
      • Gromacs for Analysis
      • Molecular Simulation with OpenMM and Bacalhau
    • Systems Engineering
      • Ad-hoc log query using DuckDB
  • References
    • Jobs Guide
      • Job Specification
        • Job Types
        • Task Specification
          • Engines
            • Docker Engine Specification
            • WebAssembly (WASM) Engine Specification
          • Publishers
            • IPFS Publisher Specification
            • Local Publisher Specification
            • S3 Publisher Specification
          • Sources
            • IPFS Source Specification
            • Local Source Specification
            • S3 Source Specification
            • URL Source Specification
          • Network Specification
          • Input Source Specification
          • Resources Specification
          • ResultPath Specification
        • Constraint Specification
        • Labels Specification
        • Meta Specification
      • Job Templates
      • Queuing & Timeouts
        • Job Queuing
        • Timeouts Specification
      • Job Results
        • State
    • CLI Guide
      • Single CLI commands
        • Agent
          • Agent Overview
          • Agent Alive
          • Agent Node
          • Agent Version
        • Config
          • Config Overview
          • Config Auto-Resources
          • Config Default
          • Config List
          • Config Set
        • Job
          • Job Overview
          • Job Describe
          • Job Executions
          • Job History
          • Job List
          • Job Logs
          • Job Run
          • Job Stop
        • Node
          • Node Overview
          • Node Approve
          • Node Delete
          • Node List
          • Node Describe
          • Node Reject
      • Command Migration
    • API Guide
      • Bacalhau API overview
      • Best Practices
      • Agent Endpoint
      • Orchestrator Endpoint
      • Migration API
    • Node Management
    • Authentication & Authorization
    • Database Integration
    • Debugging
      • Debugging Failed Jobs
      • Debugging Locally
    • Running Locally In Devstack
    • Setting up Dev Environment
  • Help & FAQ
    • Bacalhau FAQs
    • Glossary
    • Release Notes
      • v1.5.0 Release Notes
      • v1.4.0 Release Notes
  • Integrations
    • Apache Airflow Provider for Bacalhau
    • Lilypad
    • Bacalhau Python SDK
    • Observability for WebAssembly Workloads
  • Community
    • Social Media
    • Style Guide
    • Ways to Contribute
Powered by GitBook
LogoLogo

Use Cases

  • Distributed ETL
  • Edge ML
  • Distributed Data Warehousing
  • Fleet Management

About Us

  • Who we are
  • What we value

News & Blog

  • Blog

Get Support

  • Request Enterprise Solutions

Expanso (2025). All Rights Reserved.

On this page
  • Introduction
  • Prerequisites​
  • Running Locally​
  • Downloading the dataset​
  • Installing dependencies​
  • Reading and Viewing Data​
  • Data Conversion​
  • Writing the Script​
  • Upload the Data to IPFS​
  • Setting up Docker Container​
  • Build the container​
  • Push the container​
  • Running a Bacalhau Job​
  • Structure of the command​
  • Declarative job description​
  • Checking the State of your Jobs​
  • Viewing your Job Output​
  • Support​

Was this helpful?

Export as PDF
  1. Examples
  2. Data Engineering

Oceanography - Data Conversion

PreviousSimple Image ProcessingNextVideo Processing

Was this helpful?

Introduction

The Surface Ocean COâ‚‚ Atlas (SOCAT) contains measurements of the of COâ‚‚ in seawater around the globe. But to calculate how much carbon the ocean is taking up from the atmosphere, these measurements need to be converted to the partial pressure of COâ‚‚. We will convert the units by combining measurements of the surface temperature and fugacity. Python libraries (xarray, pandas, numpy) and the pyseaflux package facilitate this process.

In this example tutorial, our focus will be on running the oceanography dataset with Bacalhau, where we will investigate the data and convert the workload. This will enable the execution on the Bacalhau network, allowing us to leverage its distributed storage and compute resources.

Prerequisites​

To get started, you need to install the Bacalhau client, see more information

Running Locally​

Downloading the dataset​

For the purposes of this example we will use the dataset in the "Gridded" format from the and long-term global sea surface temperature data from - information about that dataset can be found .

mkdir -p inputs
curl -L --output ./inputs/SOCATv2022_tracks_gridded_monthly.nc.zip https://www.socat.info/socat_files/v2022/SOCATv2022_tracks_gridded_monthly.nc.zip
curl --output ./inputs/sst.mnmean.nc https://downloads.psl.noaa.gov/Datasets/noaa.oisst.v2/sst.mnmean.nc

Installing dependencies​

Next let's write the requirements.txt. This file will also be used by the Dockerfile to install the dependencies.

# requirements.txt
Bottleneck==1.3.5
dask==2022.2.0
fsspec==2022.5.0
netCDF4==1.6.0
numpy==1.21.6
pandas==1.3.5
pip==22.1.2
pyseaflux==2.2.1
scipy==1.7.3
xarray==0.20.2
zarr>=2.0.0
pip install -r requirements.txt > /dev/null

Reading and Viewing Data​

import fsspec # for reading remote files
import xarray as xr

# Open the zip archive using fsspec and load the data into xarray.Dataset
with fsspec.open("./inputs/SOCATv2022_tracks_gridded_monthly.nc.zip", compression='zip') as fp:
    ds = xr.open_dataset(fp)

# Display information about the dataset    
ds.info()
time_slice = slice("2010", "2020") # select a decade
res = ds['sst_ave_unwtd'].sel(tmnth=time_slice).mean(dim='tmnth') # compute the mean for this period
res.plot() # plot the result

We can see that the dataset contains latitude-longitude coordinates, the date, and a series of seawater measurements. Below is a plot of the average sea surface temperature (SST) between 2010 and 2020, where data have been collected by buoys and vessels.

Data Conversion​

Writing the Script​

Let's create a new file called main.py and paste the following script in it:

# main.py
import fsspec
import xarray as xr
import pandas as pd
import numpy as np
import pyseaflux


def lon_360_to_180(ds=None, lonVar=None):
    lonVar = "lon" if lonVar is None else lonVar
    return (ds.assign_coords({lonVar: (((ds[lonVar] + 180) % 360) - 180)})
            .sortby(lonVar)
            .astype(dtype='float32', order='C'))


def center_dates(ds):
    # start and end date
    start_date = str(ds.time[0].dt.strftime('%Y-%m').values)
    end_date = str(ds.time[-1].dt.strftime('%Y-%m').values)

    # monthly dates centered on 15th of each month
    dates = pd.date_range(start=f'{start_date}-01T00:00:00.000000000',
                          end=f'{end_date}-01T00:00:00.000000000',
                          freq='MS') + np.timedelta64(14, 'D')

    return ds.assign(time=dates)


def get_and_process_sst(url=None):
    # get noaa sst
    if url is None:
        url = ("/inputs/sst.mnmean.nc")

    with fsspec.open(url) as fp:
        ds = xr.open_dataset(fp)
        ds = lon_360_to_180(ds)
        ds = center_dates(ds)
        return ds


def get_and_process_socat(url=None):
    if url is None:
        url = ("/inputs/SOCATv2022_tracks_gridded_monthly.nc.zip")

    with fsspec.open(url, compression='zip') as fp:
        ds = xr.open_dataset(fp)
        ds = ds.rename({"xlon": "lon", "ylat": "lat", "tmnth": "time"})
        ds = center_dates(ds)
        return ds


def main():
    print("Load SST and SOCAT data")
    ds_sst = get_and_process_sst()
    ds_socat = get_and_process_socat()

    print("Merge datasets together")
    time_slice = slice("1981-12", "2022-05")
    ds_out = xr.merge([ds_sst['sst'].sel(time=time_slice),
                       ds_socat['fco2_ave_unwtd'].sel(time=time_slice)])

    print("Calculate pco2 from fco2")
    ds_out['pco2_ave_unwtd'] = xr.apply_ufunc(
        pyseaflux.fCO2_to_pCO2,
        ds_out['fco2_ave_unwtd'],
        ds_out['sst'])

    print("Add metadata")
    ds_out['pco2_ave_unwtd'].attrs['units'] = 'uatm'
    ds_out['pco2_ave_unwtd'].attrs['notes'] = ("calculated using" +
                                               "NOAA OI SST V2" +
                                               "and pyseaflux package")

    print("Save data")
    ds_out.to_zarr("/processed.zarr")
    import shutil
    shutil.make_archive("/outputs/processed.zarr", 'zip', "/processed.zarr")
    print("Zarr file written to disk, job completed successfully")

if __name__ == "__main__":
    main()

This code loads and processes SST and SOCAT data, combines them, computes pCO2, and saves the results for further use.

Upload the Data to IPFS​

This resulted in the IPFS CID of bafybeidunikexxu5qtuwc7eosjpuw6a75lxo7j5ezf3zurv52vbrmqwf6y.

Setting up Docker Container​

We will create a Dockerfile and add the desired configuration to the file. These commands specify how the image will be built, and what extra requirements will be included.

FROM python:slim

RUN apt-get update && apt-get -y upgrade \
    && apt-get install -y --no-install-recommends \
    g++ \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /project

COPY ./requirements.txt /project

RUN pip3 install -r requirements.txt

COPY ./main.py /project

CMD ["python","main.py"]

Build the container​

We will run docker build command to build the container:

docker build -t <hub-user>/<repo-name>:<tag> .

Before running the command replace:

repo-name with the name of the container, you can name it anything you want

tag this is not required but you can use the latest tag

Push the container​

Now you can push this repository to the registry designated by its name or tag.

docker push <hub-user>/<repo-name>:<tag>

Running a Bacalhau Job​

Now that we have the data in IPFS and the Docker image pushed, next is to run a job using the bacalhau docker run command

export JOB_ID=$(bacalhau docker run \
    --input ipfs://bafybeidunikexxu5qtuwc7eosjpuw6a75lxo7j5ezf3zurv52vbrmqwf6y \
    --memory 10Gb \
    ghcr.io/bacalhau-project/examples/socat:0.0.11 \
    -- python main.py)

Structure of the command​

Let's look closely at the command above:

  1. bacalhau docker run: call to Bacalhau

  2. --input ipfs://bafybeidunikexxu5qtuwc7eosjpuw6a75lxo7j5ezf3zurv52vbrmqwf6y: CIDs to use on the job. Mounts them at '/inputs' in the execution.

  3. ghcr.io/bacalhau-project/examples/socat:0.0.11: the name and the tag of the image we are using

  4. python main.py: execute the script

When a job is submitted, Bacalhau prints out the related job_id. We store that in an environment variable so that we can reuse it later on.

Declarative job description​

name: Oceanography
type: batch
count: 1
tasks:
  - name: My main task
    Engine:
      type: docker
      params:
        Image: ghcr.io/bacalhau-project/examples/socat:0.0.11
        Entrypoint:
          - /bin/bash
        Parameters:
          - -c
          - python main.py
    Publisher:
      Type: ipfs
    ResultPaths:
      - Name: outputs
        Path: /outputs
    InputSources:
      - Target: "/inputs"
        Source:
          Type: "ipfs"
          Params:
            CID: "bafybeidunikexxu5qtuwc7eosjpuw6a75lxo7j5ezf3zurv52vbrmqwf6y"
    Resources:
        Memory: 10gb

The job description should be saved in .yaml format, e.g. oceanyaml, and then run with the command:

bacalhau job run ocean.yaml

Job status: You can check the status of the job using bacalhau job list.

bacalhau job list --id-filter ${JOB_ID}

When it says Published or Completed, that means the job is done, and we can get the results.

Job information: You can find out more information about your job by using bacalhau job describe.

bacalhau job describe ${JOB_ID}

Job download: You can download your job results directly by using bacalhau job get. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory (results) and downloaded our job output to be stored in that directory.

rm -rf results
mkdir -p ./results # Temporary directory to store the results
bacalhau job get ${JOB_ID} --output-dir ./results # Download the results

To view the file, run the following command:

ls results/outputs

processed.zarr.zip

To convert the data from fugacity of CO2 (fCO2) to partial pressure of CO2 (pCO2) we will combine the measurements of the surface temperature and fugacity. The conversion is performed by the package.

The simplest way to upload the data to IPFS is to use a third-party service to "pin" data to the IPFS network, to ensure that the data exists and is available. To do this you need an account with a pinning service like or . Once registered you can use their UI or API or SDKs to upload files.

hub-user with your docker hub username, If you don’t have a docker hub account , and use the username of the account you created

For more information about working with custom containers, see the .

The same job can be presented in the format. In this case, the description will look like this:

Checking the State of your Jobs

Viewing your Job Output

Support

If you have questions or need support or guidance, please reach out to the (#general channel).

fugacity
here
SOCATv2022
SOCAT website
NOAA
here
pyseaflux
NFT.storage
Pinata
follow these instructions to create a Docker account
custom containers example
declarative
​
​
​
Bacalhau team via Slack