Bacalhau and BigQuery
Use Bacalhau to process logs remotely across clouds and land the results on BigQuery.
Introduction
This example demonstrates how to build a sophisticated distributed log processing pipeline using Bacalhau, Google BigQuery, and DuckDB. You'll learn how to process and analyze logs across distributed nodes, with progressively more advanced techniques for handling, sanitizing, and aggregating log data.
The combination of Bacalhau and BigQuery offers several key advantages:
Process logs directly where they are generated, eliminating the need for centralized collection
Scale processing across multiple nodes in different cloud providers
Leverage BigQuery's powerful analytics capabilities for processed data
Implement privacy-conscious data handling and efficient aggregation strategies
Through this example, you'll evolve from basic log collection to implementing a production-ready system with privacy protection and smart aggregation. Whether you're handling application logs, system metrics, or security events, this pipeline provides a robust foundation for distributed log analytics.
Prerequisites
Bacalhau client installed
A Google Cloud Project with BigQuery enabled
Service account credentials with BigQuery access
A running Bacalhau cluster with nodes across different cloud providers
Follow the standard Bacalhau network setup guide
Ensure nodes are properly configured across your cloud providers (AWS, GCP, Azure). You can see more about setting up your nodes here
Your nodes will have to have at least the following configurations installed to use the below jobs unchanged (in addition to anything else you may require).
Components
DuckDB Processing: Process and analyze data using DuckDB's SQL capabilities
BigQuery Integration: Store processed results in Google BigQuery for further analysis
Before You Start
Make sure you have configured your config file to have the correct BigQuery project and dataset. You can do this by copying the config.yaml.example file to config.yaml and editing the values to match your BigQuery project and dataset.
Ensure your Google Cloud service account has these roles:
BigQuery Data Editor
BigQuery Job User
Have your service account key file (JSON format) ready
Configure your BigQuery settings:
Create a dataset for log analytics
Note your project ID and dataset name
We have provided some utility scripts to help you set up your BigQuery project and tables. You can run the following commands to set up your project and tables if you haven't already:
Interactive setup to set up your BigQuery project and tables. Will go through and create the necessary bigquery projects.
Creates sample tables in BigQuery for testing.
Checks the permissions of the service account specified in log_uploader_credentials.json to ensure it has the necessary permissions to write to BigQuery tables from the Bacalhau nodes.
Confirms your BigQuery project and dataset, and creates the tables if they don't exist with the correct schema. This will also zero out the tables if they already exist, so be careful! (Useful for debugging)
Distributes the credentials to /bacalhau_data on all nodes in a Bacalhau network.
Ensures the service account specified in log_uploader_credentials.json has the necessary permissions to write to BigQuery tables from the Bacalhau nodes.
One more thing to set up is the log faker on the nodes. This will generate logs for you to work with. You can run the following command to start the log faker:
Give it a couple of minutes to start up and then you can start processing the logs.
Demo Walkthrough
Let's walk through each stage of the demo, seeing how we can progressively improve our data processing pipeline!
Stage 1: Raw Power - Basic Log Upload 🚀
Let's start by looking at the raw logs.
That will print out the logs to stdout, which we can then read from the job.
After running the job, you will see a job id, something like this:
When you run the describe
command, you will see the details of the job, including the output of the log information.
Now let's upload the raw logs to BigQuery. This is the simplest approach - just get the data there:
This will upload the python script to all the nodes which, in turn, will upload the raw logs from all nodes to BigQuery. When you check BigQuery, you'll see:
Millions of rows uploaded (depends on how many nodes you have and how long you let it run)
Each log line as raw text
No structure or parsing
To query the logs, you can use the following SQL:
Stage 2: Adding Structure - Making Sense of Chaos 📊
Now let's do something more advanced, by parsing those logs into structured data before upload:
Your logs are now parsed into fields like:
IP Address
Timestamp
HTTP Method
Endpoint
Status Code
Response Size
To query the logs, you can use the following SQL:
Stage 3: Privacy First - Responsible Data Handling 🔒
Now let's handle the data responsibly by sanitizing PII (like IP addresses):
This:
Zeros out the last octet of IPv4 addresses
Zeros out the last 64 bits of IPv6 addresses
Maintains data utility while ensuring compliance
Again, to query the logs, you can use the following SQL:
Notice that the IP addresses are now sanitized.
Stage 4: Smart Aggregation - Efficiency at Scale 📈
Finally, let's be smart about what we upload:
This creates two streams:
Aggregated normal logs:
Grouped in 5-minute windows
Counts by status code
Average response sizes
Total requests per endpoint
Real-time emergency events:
Critical errors
Security alerts
System failures
To query the logs, you can use the following SQL:
What's Next?
Now that you've seen the power of distributed processing with Bacalhau:
Try processing your own log files
Experiment with different aggregation windows
Add your own privacy-preserving transformations
Scale to even more nodes!
Remember: The real power comes from processing data where it lives, rather than centralizing everything first. Happy distributed processing! 🚀
Table Schemas
log_results (Main Table):
project_id
: STRING - Project identifierregion
: STRING - Deployment regionnodeName
: STRING - Node nametimestamp
: TIMESTAMP - Event timeversion
: STRING - Log versionmessage
: STRING - Log contentsync_time
: TIMESTAMP - Upload timeremote_log_id
: STRING - Original log IDhostname
: STRING - Source hostpublic_ip
: STRING - Sanitized public IPprivate_ip
: STRING - Internal IPalert_level
: STRING - Event severityprovider
: STRING - Cloud provider
log_aggregates (5-minute windows):
project_id
: STRING - Project identifierregion
: STRING - Deployment regionnodeName
: STRING - Node nameprovider
: STRING - Cloud providerhostname
: STRING - Source hosttime_window
: TIMESTAMP - Aggregation windowlog_count
: INT64 - Events in windowmessages
: ARRAY - Event details
emergency_logs (Critical Events):
project_id
: STRING - Project identifierregion
: STRING - Deployment regionnodeName
: STRING - Node nameprovider
: STRING - Cloud providerhostname
: STRING - Source hosttimestamp
: TIMESTAMP - Event timeversion
: STRING - Log versionmessage
: STRING - Alert detailsremote_log_id
: STRING - Original log IDalert_level
: STRING - Always "EMERGENCY"public_ip
: STRING - Sanitized public IPprivate_ip
: STRING - Internal IP
Last updated
Was this helpful?