Bacalhau Docs
GithubSlackBlogEnterprise
v1.6.x
  • Documentation
  • Use Cases
  • CLI & API
  • References
  • Community
v1.6.x
  • Welcome
  • Getting Started
    • How Bacalhau Works
    • Getting Started
      • Step 1: Install the Bacalhau CLI
      • Step 2: Running Your Own Job
      • Step 3: Checking on the Status of Your Job
    • Creating Your Own Bacalhau Network
      • Setting Up a Cluster on Amazon Web Services (AWS) with Terraform 🚀
      • Setting Up a Cluster on Google Cloud Platform (GCP) With Terraform 🚀
      • Setting Up a Cluster on Azure with Terraform 🚀
    • Hardware Setup
    • Container Onboarding
      • Docker Workloads
      • WebAssembly (Wasm) Workloads
  • Setting Up
    • Running Nodes
      • Node Onboarding
      • GPU Installation
      • Job selection policy
      • Access Management
      • Node persistence
      • Configuring Your Input Sources
      • Configuring Transport Level Security
      • Limits and Timeouts
      • Test Network Locally
      • Bacalhau WebUI
      • Private IPFS Network Setup
    • Workload Onboarding
      • Container
        • Docker Workload Onboarding
        • WebAssembly (Wasm) Workloads
        • Bacalhau Docker Image
        • How To Work With Custom Containers in Bacalhau
      • Python
        • Building and Running Custom Python Container
        • Running Pandas on Bacalhau
        • Running a Python Script
        • Running Jupyter Notebooks on Bacalhau
        • Scripting Bacalhau with Python
      • R (language)
        • Building and Running your Custom R Containers on Bacalhau
        • Running a Simple R Script on Bacalhau
      • Run CUDA programs on Bacalhau
      • Running a Prolog Script
      • Reading Data from Multiple S3 Buckets using Bacalhau
      • Running Rust programs as WebAssembly (WASM)
      • Generate Synthetic Data using Sparkov Data Generation technique
    • Networking Instructions
      • Accessing the Internet from Jobs
      • Utilizing NATS.io within Bacalhau
    • GPU Workloads Setup
    • Automatic Update Checking
    • Marketplace Deployments
      • Google Cloud Marketplace
    • Inter-Nodes TLS
  • Guides
    • Configuration Management
    • Write a config.yaml
    • Write a SpecConfig
    • Using Labels and Constraints
  • Examples
    • Table of Contents for Bacalhau Examples
    • Data Engineering
      • Using Bacalhau with DuckDB
      • Ethereum Blockchain Analysis with Ethereum-ETL and Bacalhau
      • Convert CSV To Parquet Or Avro
      • Simple Image Processing
      • Oceanography - Data Conversion
      • Video Processing
      • Bacalhau and BigQuery
    • Data Ingestion
      • Copy Data from URL to Public Storage
      • Pinning Data
      • Running a Job over S3 data
    • Model Inference
      • EasyOCR (Optical Character Recognition) on Bacalhau
      • Running Inference on Dolly 2.0 Model with Hugging Face
      • Speech Recognition using Whisper
      • Stable Diffusion on a GPU
      • Stable Diffusion on a CPU
      • Object Detection with YOLOv5 on Bacalhau
      • Generate Realistic Images using StyleGAN3 and Bacalhau
      • Stable Diffusion Checkpoint Inference
      • Running Inference on a Model stored on S3
    • Model Training
      • Training Pytorch Model with Bacalhau
      • Training Tensorflow Model
      • Stable Diffusion Dreambooth (Finetuning)
    • Molecular Dynamics
      • Running BIDS Apps on Bacalhau
      • Coresets On Bacalhau
      • Genomics Data Generation
      • Gromacs for Analysis
      • Molecular Simulation with OpenMM and Bacalhau
    • Systems Engineering
      • Ad-hoc log query using DuckDB
  • References
    • Jobs Guide
      • Job Specification
        • Job Types
        • Task Specification
          • Engines
            • Docker Engine Specification
            • WebAssembly (WASM) Engine Specification
          • Publishers
            • IPFS Publisher Specification
            • Local Publisher Specification
            • S3 Publisher Specification
          • Sources
            • IPFS Source Specification
            • Local Source Specification
            • S3 Source Specification
            • URL Source Specification
          • Network Specification
          • Input Source Specification
          • Resources Specification
          • ResultPath Specification
        • Constraint Specification
        • Labels Specification
        • Meta Specification
      • Job Templates
      • Queuing & Timeouts
        • Job Queuing
        • Timeouts Specification
      • Job Results
        • State
    • CLI Guide
      • Single CLI commands
        • Agent
          • Agent Overview
          • Agent Alive
          • Agent Node
          • Agent Version
        • Config
          • Config Overview
          • Config Auto-Resources
          • Config Default
          • Config List
          • Config Set
        • Job
          • Job Overview
          • Job Describe
          • Job Executions
          • Job History
          • Job List
          • Job Logs
          • Job Run
          • Job Stop
        • Node
          • Node Overview
          • Node Approve
          • Node Delete
          • Node List
          • Node Describe
          • Node Reject
      • Command Migration
    • API Guide
      • Bacalhau API overview
      • Best Practices
      • Agent Endpoint
      • Orchestrator Endpoint
      • Migration API
    • Node Management
    • Authentication & Authorization
    • Database Integration
    • Debugging
      • Debugging Failed Jobs
      • Debugging Locally
    • Running Locally In Devstack
    • Setting up Dev Environment
  • Help & FAQ
    • Bacalhau FAQs
    • Glossary
    • Release Notes
      • v1.5.0 Release Notes
      • v1.4.0 Release Notes
  • Integrations
    • Apache Airflow Provider for Bacalhau
    • Lilypad
    • Bacalhau Python SDK
    • Observability for WebAssembly Workloads
  • Community
    • Social Media
    • Style Guide
    • Ways to Contribute
Powered by GitBook
LogoLogo

Use Cases

  • Distributed ETL
  • Edge ML
  • Distributed Data Warehousing
  • Fleet Management

About Us

  • Who we are
  • What we value

News & Blog

  • Blog

Get Support

  • Request Enterprise Solutions

Expanso (2025). All Rights Reserved.

On this page
  • Introduction​
  • Prerequisites​
  • Running CSV to Avro or Parquet Locally​​
  • Downloading the CSV file​
  • Writing the Script​
  • Installing Dependencies​
  • Converting CSV file to Parquet format​
  • Viewing the parquet file:​
  • Containerize Script with Docker​
  • Build the container​
  • Push the container​
  • Running a Bacalhau Job​
  • Structure of the command​
  • Declarative job description​
  • Checking the State of your Jobs​
  • Viewing your Job Output​
  • Support​

Was this helpful?

Export as PDF
  1. Examples
  2. Data Engineering

Convert CSV To Parquet Or Avro

PreviousEthereum Blockchain Analysis with Ethereum-ETL and BacalhauNextSimple Image Processing

Was this helpful?

Introduction​

Converting from CSV to parquet or avro reduces the size of the file and allows for faster read and write speeds. With Bacalhau, you can convert your CSV files stored on ipfs or on the web without the need to download files and install dependencies locally.

In this example tutorial we will convert a CSV file from a URL to parquet format and save the converted parquet file to IPFS

Prerequisites​

To get started, you need to install the Bacalhau client, see more information

Running CSV to Avro or Parquet Locally​​

Downloading the CSV file​

Let's download the transactions.csv file:

wget https://cloudflare-ipfs.com/ipfs/QmfKJT13h5k1b23ja3ZCVg5nFL9oKz2bVXc8oXgtwiwhjz/transactions.csv

You can use the CSV files from

Writing the Script​

Write the converter.py Python script, that serves as a CSV converter to Avro or Parquet formats:

# converter.py
import os
import sys
from abc import ABCMeta, abstractmethod

import fastavro
import numpy as np
import pandas as pd
from pyarrow import Table, parquet


class BaseConverter(metaclass=ABCMeta):
    """
    Base class for converters.

    Validate received parameters for future use.
    """
    def __init__(
        self,
        csv_file_path: str,
        target_file_path: str,
    ) -> None:
        self.csv_file_path = csv_file_path
        self.target_file_path = target_file_path

    @property
    def csv_file_path(self):
        return self._csv_file_path

    @csv_file_path.setter
    def csv_file_path(self, path):
        if not os.path.isabs(path):
            path = os.path.join(os.getcwd(), path)
        _, extension = os.path.splitext(path)
        if not os.path.isfile(path) or extension != '.csv':
            raise FileNotFoundError(
                f'No such csv file: {path}'
            )
        self._csv_file_path = path

    @property
    def target_file_path(self):
        return self._target_file_path

    @target_file_path.setter
    def target_file_path(self, path):
        if not os.path.isabs(path):
            path = os.path.join(os.getcwd(), path)
        target_dir = os.path.dirname(path)
        if not os.path.isdir(target_dir):
            raise FileNotFoundError(
                f'No such directory: {target_dir}\n'
                'Choose existing or create directory for result file.'
            )
        if os.path.isfile(path):
            raise FileExistsError(
                f'File {path} has already exists.'
                'Usage of existing file may result in data loss.'
            )
        self._target_file_path = path

    def get_csv_reader(self):
        """Return csv reader which read csv file as a stream"""
        return pd.read_csv(
            self.csv_file_path,
            iterator=True,
            chunksize=100000
        )

    @abstractmethod
    def convert(self):
        """Should be implemented in child class"""
        pass


class ParquetConverter(BaseConverter):
    """
    Convert received csv file to parquet file.

    Take path to csv file and path to result file.
    """
    def convert(self):
        """Read csv file as a stream and write data to parquet file."""
        csv_reader = self.get_csv_reader()
        writer = None
        for chunk in csv_reader:
            if not writer:
                table = Table.from_pandas(chunk)
                writer = parquet.ParquetWriter(
                    self.target_file_path, table.schema
                )
            table = Table.from_pandas(chunk)
            writer.write_table(table)
        writer.close()


class AvroConverter(BaseConverter):
    """
    Convert received csv file to avro file.

    Take path to csv file and path to result file.
    """
    NUMPY_TO_AVRO_TYPES = {
        np.dtype('?'): 'boolean',
        np.dtype('int8'): 'int',
        np.dtype('int16'): 'int',
        np.dtype('int32'): 'int',
        np.dtype('uint8'): 'int',
        np.dtype('uint16'): 'int',
        np.dtype('uint32'): 'int',
        np.dtype('int64'): 'long',
        np.dtype('uint64'): 'long',
        np.dtype('O'): ['null', 'string', 'float'],
        np.dtype('unicode_'): 'string',
        np.dtype('float32'): 'float',
        np.dtype('float64'): 'double',
        np.dtype('datetime64'): {
            'type': 'long',
            'logicalType': 'timestamp-micros'
        },
    }

    def get_avro_schema(self, pandas_df):
        """Generate avro schema."""
        column_dtypes = pandas_df.dtypes
        schema_name = os.path.basename(self.target_file_path)
        schema = {
            'type': 'record',
            'name': schema_name,
            'fields': [
                {
                    'name': name,
                    'type': AvroConverter.NUMPY_TO_AVRO_TYPES[dtype]
                } for (name, dtype) in column_dtypes.items()
            ]
        }
        return fastavro.parse_schema(schema)

    def convert(self):
        """Read csv file as a stream and write data to avro file."""
        csv_reader = self.get_csv_reader()
        schema = None
        with open(self.target_file_path, 'a+b') as f:
            for chunk in csv_reader:
                if not schema:
                    schema = self.get_avro_schema(chunk)
                fastavro.writer(
                    f,
                    schema=schema,
                    records=chunk.to_dict('records')
                )


if __name__ == '__main__':
    converters = {
        'parquet': ParquetConverter,
        'avro': AvroConverter
    }
    csv_file, result_path, result_type = sys.argv[1], sys.argv[2], sys.argv[3]
    if result_type.lower() not in converters:
        raise ValueError(
            'Invalid target type. Avalible types: avro, parquet.'
        )
    converter = converters[result_type.lower()](csv_file, result_path)
    converter.convert()

Installing Dependencies​

pip install fastavro numpy pandas pyarrow

Converting CSV file to Parquet format​

python converter.py <path_to_csv> <path_to_result_file> <extension>

In our case:

python3 converter.py transactions.csv transactions.parquet parquet

Viewing the parquet file:​

import pandas as pd
pd.read_parquet('transactions.parquet').head()

Containerize Script with Docker​

To build your own docker container, create a Dockerfile, which contains instructions to build your image.

FROM python:3.8

RUN apt update && apt install git

RUN git clone https://github.com/bacalhau-project/Sparkov_Data_Generation

WORKDIR /Sparkov_Data_Generation/

RUN pip3 install -r requirements.txt

Build the container​

We will run the docker build command to build the container:

docker build -t <hub-user>/<repo-name>:<tag> .

Before running the command replace:

repo-name with the name of the container, you can name it anything you want

tag this is not required but you can use the latest tag

In our case:

docker build -t jsacex/csv-to-arrow-or-parquet .

Push the container​

Next, upload the image to the registry. This can be done by using the Docker hub username, repo name or tag.

docker push <hub-user>/<repo-name>:<tag>

In our case:

docker push jsacex/csv-to-arrow-or-parquet

Running a Bacalhau Job​

With the command below, we are mounting the CSV file for transactions from IPFS

export JOB_ID=$(bacalhau docker run \
    -i ipfs://QmTAQMGiSv9xocaB4PUCT5nSBHrf9HZrYj21BAZ5nMTY2W  \
    --wait \
    --id-only \
    --output outputs:\outputs \
    --publisher ipfs \
    jsacex/csv-to-arrow-or-parquet \
    -- python3 src/converter.py ../inputs/transactions.csv  /outputs/transactions.parquet parquet)

Structure of the command​

Let's look closely at the command above:

  1. bacalhau docker run: call to Bacalhau

  2. -i ipfs://QmTAQMGiSv9xocaB4PUCT5nSBHrf9HZrYj21BAZ5nMTY2W: CIDs to use on the job. Mounts them at '/inputs' in the execution.

  3. jsacex/csv-to-arrow-or-parque: the name and the tag of the docker image we are using

  4. ../inputs/transactions.csv : path to input dataset

  5. /outputs/transactions.parquet parquet: path to the output

  6. python3 src/converter.py: execute the script

When a job is submitted, Bacalhau prints out the related job_id. We store that in an environment variable so that we can reuse it later on.

Declarative job description​

name: Convert CSV To Parquet Or Avro
type: batch
count: 1
tasks:
  - name: My main task
    Engine:
      type: docker
      params:
        Image: jsacex/csv-to-arrow-or-parquet
        Entrypoint:
          - /bin/bash
        Parameters:
          - -c
          - python3 src/converter.py ../inputs/transactions.csv  ../outputs/transactions.parquet parquet
    Publisher:
      Type: ipfs
    ResultPaths:
      - Name: outputs
        Path: /outputs
    InputSources:
      - Target: "/inputs"
        Source:
          Type: "ipfs"
          Params:
            CID: "QmTAQMGiSv9xocaB4PUCT5nSBHrf9HZrYj21BAZ5nMTY2W"

The job description should be saved in .yaml format, e.g. convertcsv.yaml, and then run with the command:

bacalhau job run convertcsv.yaml

Checking the State of your Jobs​

Job status: You can check the status of the job using bacalhau job list.

bacalhau job list --id-filter ${JOB_ID} 

When it says Published or Completed, that means the job is done, and we can get the results.

Job information: You can find out more information about your job by using bacalhau job describe.

bacalhau job describe ${JOB_ID}

Job download: You can download your job results directly by using bacalhau job get. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory (results) and downloaded our job output to be stored in that directory.

rm -rf results && mkdir -p results # Temporary directory to store the results
bacalhau job get ${JOB_ID} --output-dir results # Download the results

Viewing your Job Output​

To view the file, run the following command:

ls results/outputs

transactions.parquet

Alternatively, you can do this:

import pandas as pd
import os
pd.read_parquet('results/outputs/transactions.parquet')

Support​

You can find out more information about converter.py

You can skip this section entirely and directly go to

See more information on how to containerize your script/app

hub-user with your docker hub username. If you don’t have a docker hub account , and use the username of the account you created

The same job can be presented in the format. In this case, the description will look like this:

If you have questions or need support or guidance, please reach out to the (#general channel).

here
here
here
here
follow these instructions to create docker account
declarative
Bacalhau team via Slack
Running on Bacalhau