Get Started

Custom Function

Enable document editing in the pipeline with Python using customized functions

The Custom Function component allows you to write Python functions to manipulate and transform documents within your pipeline. This flexibility enables you to perform a wide range of tasks, from simple data cleaning to merge documents. Whether you're filtering documents, enriching data, or transforming formats, the Custom Function component empowers you to customize your pipeline to meet your specific needs.

With this component, you can perform a variety of tasks on documents:

  • Create, modify, merge, split, or delete documents.
  • Add, modify, merge, or delete fields from the documents.
  • Clean data by removing unnecessary whitespace, special characters, or invalid values.
  • Validate document fields to ensure they meet specific criteria (e.g., check if a field is of the correct type or format).
  • Perform calculations or aggregations on document fields (e.g., sum, average, or count).
  • Convert data from one format to another (e.g., timestamps to human-readable dates).
  • Identify and remove duplicate documents based on specific fields.
  • Text cleaning

As well as several others.

Getting started

You can choose if you want to write your function that will receive a single document each time or receive a batch of documents. The function that receive a single document should be called process_document and the one that receive a batch of document should be called process_batch. You can also create new functions and call them from these main functions.

Python function that add the field 'current_date' to each document

Python function that add the field 'current_date' to each document


Below are some examples of Python functions that can be used as Custom Functions in your pipeline that accepts a single document each time:

from datetime import datetime

def process_document(document):
		current_date_iso = datetime.utcnow().isoformat() + 'Z'  # UTC time with 'Z'
    document['current_date'] = current_date_iso
    return [document]
def process_document(document):
    if 'value' in document and isinstance(document['value'], (int, float)):
        document['value'] += 1  # Hardcoded field and increment
    return [document]
def process_document(document):
    if 'old_name' in document:
        document['new_name'] = document.pop('old_name')
    return [document]

Below are some examples of Python functions that can be used as Custom Functions in your pipeline that accepts a batch of documents:

def process_batch(documents: list) -> list[dict[any, any]]:
    # Example removing documents that are missing ID field
    filtered = [x for x in documents if 'id' in x]
    # Return documents to forward
    return filtered
from datetime import datetime

def process_batch(documents: list) -> list[dict[any, any]]:
    # Add the current date in ISO format to each document
    current_date_iso = datetime.utcnow().isoformat() + 'Z'  # UTC time with 'Z'
    for document in documents:
        document['current_date'] = current_date_iso
    return documents
def process_batch(documents: list) -> list[dict[any, any]]:
    for document in documents:
        if 'items' in document and isinstance(document['items'], list):
            total_value = sum(item.get('price', 0) * item.get('quantity', 0) for item in document['items'])
            document['total_value'] = total_value
    return documents
def process_batch(documents: list) -> list[dict[any, any]]:
    split_documents = []
    for document in documents:
        if 'categories' in document and isinstance(document['categories'], list):
            for category in document['categories']:
                new_doc = document.copy()
                new_doc['category'] = category
                split_documents.append(new_doc)
    return split_documents
from datetime import datetime

def process_batch(documents: list) -> list[dict[any, any]]:
    for document in documents:
        if 'timestamp' in document:
            timestamp = document['timestamp']
            document['date'] = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
    return documents

With the Custom Function, you can test your function code without the necessity to deploy the pipeline. You can just copy and paste to try it!

The function added the field 'current_date' to the document

The function added the field 'current_date' to the document


Since the Datastreamer Pipeline can work with various document schemas, the document format will depend on the schema of the documents sent by the previous step. You can add components like the Document Inspector before the Custom Function component to ensure you are aware of the content of the documents you are receiving.

Supported Libraries for Natural Language Processing

The Custom Function component supports popular text-processing libraries, including NLTK (Natural Language Toolkit) for tasks like tokenization, stemming, and part-of-speech tagging, and TextBlob, which simplifies text cleaning, sentiment analysis, and noun phrase extraction. These libraries enable advanced text preprocessing and analysis within the component, what is specially important to Data Science professionals.

Below you can see an example of a function using TextBlob:

from textblob import TextBlob

def get_subjectivity(text):    
    """    
    Input: text string    
    Return: extract subjectivity level as float    
    """    
    testimonial = TextBlob(text)    
    return testimonial.subjectivity

def process_document(document):    
    body = document.get("content", {}).get("body", None)    
    if body is not None:        
        document["subjectivity"] = get_subjectivity(body)
    return [document]

Here you can find an example of a function using a NLTK:

import string
import re
from nltk.corpus import stopwords

additional_stopwords = [
    "a",
    "an",
    "the",
    "in",
    "on",
    "at",
    "with",
    "for",
    "of",
    "and",
    "or",
    "but",
    "to",
]
stopwords_to_remove = set(stopwords.words("english"))
stopwords_to_remove.update(additional_stopwords)


def remove_stopwords(text):
    return " ".join(
        [word for word in text.split() if word.lower() not in stopwords_to_remove]
    )


def count_words(text):
    return len(text.split())


def remove_url(text):
    sen = re.sub(r"http\S+|www\S+", "", text)
    return sen


def remove_punctuation(text):
    translator = str.maketrans(string.punctuation, " " * len(string.punctuation))
    return text.translate(translator)


def remove_spaces(text):
    return re.sub(r"  +", " ", text.replace("\n", " ").replace("\r", " ")).strip()


def process_batch(documents):
    for document in documents:
        body = document.get("content", {}).get("body", "")
        body = remove_stopwords(body)
        body = remove_url(body)
        body = remove_punctuation(body)
        body = remove_spaces(body)
        document["cleaned_text"] = body

    return documents