Custom Function
Enable document editing in the pipeline with Python using customized functions
The Custom Function component allows you to write Python functions to manipulate and transform documents within your pipeline. This flexibility enables you to perform a wide range of tasks, from simple data cleaning to merge documents. Whether you're filtering documents, enriching data, or transforming formats, the Custom Function component empowers you to customize your pipeline to meet your specific needs.
With this component, you can perform a variety of tasks on documents:
- Create, modify, merge, split, or delete documents.
- Add, modify, merge, or delete fields from the documents.
- Clean data by removing unnecessary whitespace, special characters, or invalid values.
- Validate document fields to ensure they meet specific criteria (e.g., check if a field is of the correct type or format).
- Perform calculations or aggregations on document fields (e.g., sum, average, or count).
- Convert data from one format to another (e.g., timestamps to human-readable dates).
- Identify and remove duplicate documents based on specific fields.
- Text cleaning
As well as several others.
Getting started
You can choose if you want to write your function that will receive a single document each time or receive a batch of documents. The function that receive a single document should be called process_document
and the one that receive a batch of document should be called process_batch
. You can also create new functions and call them from these main functions.

Python function that add the field 'current_date' to each document
Below are some examples of Python functions that can be used as Custom Functions in your pipeline that accepts a single document each time:
from datetime import datetime
def process_document(document):
current_date_iso = datetime.utcnow().isoformat() + 'Z' # UTC time with 'Z'
document['current_date'] = current_date_iso
return [document]
def process_document(document):
if 'value' in document and isinstance(document['value'], (int, float)):
document['value'] += 1 # Hardcoded field and increment
return [document]
def process_document(document):
if 'old_name' in document:
document['new_name'] = document.pop('old_name')
return [document]
Below are some examples of Python functions that can be used as Custom Functions in your pipeline that accepts a batch of documents:
def process_batch(documents: list) -> list[dict[any, any]]:
# Example removing documents that are missing ID field
filtered = [x for x in documents if 'id' in x]
# Return documents to forward
return filtered
from datetime import datetime
def process_batch(documents: list) -> list[dict[any, any]]:
# Add the current date in ISO format to each document
current_date_iso = datetime.utcnow().isoformat() + 'Z' # UTC time with 'Z'
for document in documents:
document['current_date'] = current_date_iso
return documents
def process_batch(documents: list) -> list[dict[any, any]]:
for document in documents:
if 'items' in document and isinstance(document['items'], list):
total_value = sum(item.get('price', 0) * item.get('quantity', 0) for item in document['items'])
document['total_value'] = total_value
return documents
def process_batch(documents: list) -> list[dict[any, any]]:
split_documents = []
for document in documents:
if 'categories' in document and isinstance(document['categories'], list):
for category in document['categories']:
new_doc = document.copy()
new_doc['category'] = category
split_documents.append(new_doc)
return split_documents
from datetime import datetime
def process_batch(documents: list) -> list[dict[any, any]]:
for document in documents:
if 'timestamp' in document:
timestamp = document['timestamp']
document['date'] = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
return documents
With the Custom Function, you can test your function code without the necessity to deploy the pipeline. You can just copy and paste to try it!

The function added the field 'current_date' to the document
Since the Datastreamer Pipeline can work with various document schemas, the document format will depend on the schema of the documents sent by the previous step. You can add components like the Document Inspector before the Custom Function component to ensure you are aware of the content of the documents you are receiving.
Supported Libraries for Natural Language Processing
The Custom Function component supports popular text-processing libraries, including NLTK (Natural Language Toolkit) for tasks like tokenization, stemming, and part-of-speech tagging, and TextBlob, which simplifies text cleaning, sentiment analysis, and noun phrase extraction. These libraries enable advanced text preprocessing and analysis within the component, what is specially important to Data Science professionals.
Below you can see an example of a function using TextBlob:
from textblob import TextBlob
def get_subjectivity(text):
"""
Input: text string
Return: extract subjectivity level as float
"""
testimonial = TextBlob(text)
return testimonial.subjectivity
def process_document(document):
body = document.get("content", {}).get("body", None)
if body is not None:
document["subjectivity"] = get_subjectivity(body)
return [document]
Here you can find an example of a function using a NLTK:
import string
import re
from nltk.corpus import stopwords
additional_stopwords = [
"a",
"an",
"the",
"in",
"on",
"at",
"with",
"for",
"of",
"and",
"or",
"but",
"to",
]
stopwords_to_remove = set(stopwords.words("english"))
stopwords_to_remove.update(additional_stopwords)
def remove_stopwords(text):
return " ".join(
[word for word in text.split() if word.lower() not in stopwords_to_remove]
)
def count_words(text):
return len(text.split())
def remove_url(text):
sen = re.sub(r"http\S+|www\S+", "", text)
return sen
def remove_punctuation(text):
translator = str.maketrans(string.punctuation, " " * len(string.punctuation))
return text.translate(translator)
def remove_spaces(text):
return re.sub(r" +", " ", text.replace("\n", " ").replace("\r", " ")).strip()
def process_batch(documents):
for document in documents:
body = document.get("content", {}).get("body", "")
body = remove_stopwords(body)
body = remove_url(body)
body = remove_punctuation(body)
body = remove_spaces(body)
document["cleaned_text"] = body
return documents
Updated 3 days ago