Fractal is a flexible, configurable data processing tool built with GoFr and Golang. Fractal is designed to handle data ingestion from multiple sources, apply powerful transformations and validations, and deliver output to a wide range of destinations. With Fractal, you can automate complex data workflows without needing to manage low-level details. Here's the documentation for setting up a new integration in your project:
The custom syntax enables users to:
- Validate incoming data to ensure it meets predefined conditions.
- Transform data fields to fit desired formats, structures, or requirements.
- Define flexible error-handling strategies for data processing pipelines.
Rules can be written for any data source or destination, such as JSON, YAML, CSV, SQL Databases, Message Brokers, or Cloud Services.
Validation rules ensure that data meets specific quality and integrity requirements.
FIELD(<field_name>) <validation_condition>
Condition | Description | Example |
---|---|---|
TYPE(<data_type>) |
Ensures the field is of a specified type. Data types: STRING , INT , FLOAT , BOOL , DATE . |
FIELD("age") TYPE(INT) |
RANGE(<min>, <max>) |
Ensures the field's value is within a specified range. | FIELD("price") RANGE(0, 1000) |
MATCHES(<regex>) |
Validates that the field's value matches a regular expression pattern. | FIELD("email") MATCHES(EMAIL_REGEX) |
IN(<value_list>) |
Validates that the field's value is one of the specified values. | FIELD("status") IN ("active", "inactive") |
REQUIRED |
Ensures the field is present. | FIELD("name") REQUIRED |
-
Validate that the field
age
is an integer and between 18 and 65:FIELD("age") TYPE(INT) RANGE(18, 65)
-
Ensure
email
matches a regex pattern for valid email addresses:FIELD("email") MATCHES(EMAIL_REGEX)
-
Check that
status
is either "active" or "inactive":FIELD("status") IN ("active", "inactive")
-
Make the
id
field mandatory:FIELD("id") REQUIRED
Transformation rules modify or enrich data to meet specific requirements.
<operation>
Operation | Description | Example |
---|---|---|
RENAME(<old_field>, <new_field>) |
Renames a field in the data. | RENAME("old_field", "new_field") |
MAP(<field_name>, {<mapping>}) |
Maps values in a field to new values using a key-value pair mapping. | MAP("status", {"0": "inactive", "1": "active"}) |
ADD_FIELD(<field_name>, <value>) |
Adds a new field with a specified value. | ADD_FIELD("timestamp", CURRENT_TIME()) |
IF <condition> THEN <operation> |
Applies a transformation based on a condition. | IF FIELD("age") > 50 THEN ADD_FIELD("senior_discount", TRUE) |
-
Rename a field
old_field
tonew_field
:RENAME("old_field", "new_field")
-
Map values in the
status
field:MAP("status", {"0": "inactive", "1": "active"})
-
Add a field
processed_at
with the current timestamp:ADD_FIELD("processed_at", CURRENT_TIME())
-
Conditionally add a senior discount for users above 50:
IF FIELD("age") > 50 THEN ADD_FIELD("senior_discount", TRUE)
Error handling defines how the system reacts when a validation or transformation fails.
ON_ERROR(<action>)
Action | Description | Example |
---|---|---|
LOG_AND_CONTINUE |
Logs the error and continues processing the next record. | ON_ERROR(LOG_AND_CONTINUE) |
STOP |
Stops the entire pipeline on encountering an error. | ON_ERROR(STOP) |
RETRY |
Attempts to retry processing the failed record. | ON_ERROR(RETRY) |
SEND_TO_QUARANTINE |
Sends the failed record to a quarantine output for further analysis. | ON_ERROR(SEND_TO_QUARANTINE) |
-
Log the error and continue processing:
ON_ERROR(LOG_AND_CONTINUE)
-
Stop the pipeline if an error occurs:
ON_ERROR(STOP)
Depending on the data source, fields can be named using:
- JSON Paths: For JSON/YAML data.
FIELD("$.user.age") TYPE(INT)
- Column Names: For CSV or SQL tables.
FIELD("Column1") TYPE(FLOAT)
- Message Keys/Values: For Kafka or RabbitMQ.
FIELD("$.message.key") MATCHES(KEY_REGEX)
The rules can be embedded into the YAML configuration for pipelines:
pipeline:
error-handling:
strategy: LOG_AND_CONTINUE
inputconfig:
csvsourcefilename: sample.csv
inputmethod: CSV
outputconfig:
csvdestinationfilename: test.csv
outputmethod: CSV
cronjob:
repetition_interval: "1h"
monitoring:
job_status:"pending"
transformations:
-ADD_FIELD("processed_at", CURRENT_TIME())
validations:
-FIELD("age") RANGE(30,35)
The system is designed to make it simple to add new data integrations for both input and output. Each integration should define methods to read (input) and write (output) data, following a unified interface approach.
-
Create a New File:
In theintegrations
directory, create a new file named after the integration. For example, if adding support for RabbitMQ, createrabbitmq.go
. -
Implement the
Source
andDestination
Interfaces:
In this file, you need to define structs that implement theSource
andDestination
interfaces.- Source Interface: This interface should define the
Read
method, which reads data from the input source and returns it in a standardized format. - Destination Interface: This interface should define the
Write
method, which writes data to the output destination.
Here's an example structure for
rabbitmq.go
:package integrations import "fmt" // RabbitMQInput struct to handle input from RabbitMQ type RabbitMQInput struct { // Add any necessary fields, such as connection settings, queues, etc. } // Read method to read from RabbitMQ func (r *RabbitMQInput) Read() (interface{}, error) { // Implement logic to read from RabbitMQ fmt.Println("Reading from RabbitMQ...") return nil, nil } // RabbitMQOutput struct to handle output to RabbitMQ type RabbitMQOutput struct { // Add any necessary fields, such as connection settings, queues, etc. } // Write method to write to RabbitMQ func (r *RabbitMQOutput) Write(data interface{}) error { // Implement logic to write to RabbitMQ fmt.Println("Writing to RabbitMQ...") return nil } // Initialize the new integration func init() { RegisterSource("rabbitmq", &RabbitMQInput{}) RegisterDestination("rabbitmq", &RabbitMQOutput{}) }
- Source Interface: This interface should define the
-
Register the Integration:
In theinit()
function, useRegisterSource
andRegisterDestination
to add the integration to the system. This makes it available for both CLI and HTTP server modes. -
Configuration:
If the integration requires additional configuration (like credentials or connection strings), make sure to add relevant fields to the struct and include a way to parse this information from the user-provided configuration. -
Testing the Integration:
Run the application and select the new integration in either CLI or HTTP mode. Verify that data can be read from and written to the integration correctly.
With this setup, adding integrations is straightforward. Each integration can now be quickly defined and registered, keeping your system scalable and modular.
- Multi-Source Data Ingestion: Supports data ingestion from HTTP, CSV files, SQL databases, Pub-Sub systems, cloud storage, and more.
- Customizable Data Transformations: Apply data transformations, including data mapping, filtering, aggregation, and enrichment, with built-in or custom functions.
- Validation Rules: Define validation schemas to ensure incoming data meets quality standards before processing.
- Flexible Output Options: Output processed data to databases (SQL/NoSQL), CSV files, messaging queues, HTTP responses, or cloud storage.
- YAML Configuration: Configure data workflows and transformation rules through a YAML file for easy setup and customization.
- Go 1.18+
- GoFr Framework installed
Clone the repository and navigate to the Fractal directory:
git clone https://github.com/SkySingh04/fractal.git
cd fractal
Install the dependencies:
go mod tidy
Set up a .yaml
configuration file in the root directory. Define inputs, transformations, validations, and outputs as per your workflow needs. Here's a basic example:
error-handling:
strategy: LOG_AND_CONTINUE
inputconfig:
csvsourcefilename: sample.csv
inputmethod: CSV
outputconfig:
csvdestinationfilename: test.csv
outputmethod: CSV
cronjob:
repetition_interval: "1h"
monitoring:
job_status:"pending"
transformations:
-ADD_FIELD("processed_at", CURRENT_TIME())
validations:
-FIELD("age") RANGE(30,35)
Start the pipeline using:
go run main.go -config=config.yaml
- Data Migration: Migrate data from legacy systems to cloud databases or NoSQL databases.
- Log Aggregation: Aggregate logs from multiple sources and send them to a searchable data store.
- Content Syndication: Ingest and format content from RSS feeds or APIs, and distribute it across platforms.
- Data Quality Checker: Validate incoming data streams to ensure data quality before storing.
Contributions are welcome! Feel free to submit pull requests for new features, bug fixes, or documentation improvements.
For detailed guidelines on how to contribute, please refer to the Contributing Guide.
This project is licensed under the MIT License. See LICENSE for details.