What is a Dynamic Pipeline?
Understand Dynamic Pipelines: Explore Datastreamer's flexible, component-based data processing, covering core concepts, lifecycle, and use cases
This page will introduce you to the concept of data pipelines, explain what makes them "dynamic" in the context of Datastreamer, and cover their lifecycle, core concepts, and practical examples.
What is a pipeline?
In computer science, and specially in data processing, a pipeline is a set of data processing elements connected in series, where the output of one element is the input of the next element.
Imagine that you have a simple task: you need to get data from an external service (like an REST API), pick out some specific information, and then save it into your database. You might write a script or code that:
- Calls the API.
- Parses the response.
- Transforms the data into the format you need.
- Writes it to your database.
This sequence of steps is a basic form of a pipeline. Data flows in, get processed and them flows out. Pipelines formalize this concept, specially for more complex or high-volume data tasks.
graph LR A[Data Source] --> B(Process 1); B --> C(Process 2); C --> D[Data Destination];
Why "Dynamic"?
When we talk about Dynamic Pipelines at Datastreamer, we are emphasizing their flexibility and adaptability, specially when compared to traditional, more rigid data processing scripts or systems.
If your experience primarily involves developing systems for a specific API-to-database task, you understand that API changes, the need for new data transformations, the addition of new data destinations, retrying API calls, error handling, scaling resources, or optimizing CPU and memory often require substantial code refactoring and retesting.
Dynamic Pipelines are designed to make these kinds of changes much easier:
- Modularity: Pipelines are built from reusable components (or "steps"). You can swap out one transformation component for another, or reconfigure it, without rebuilding the entire pipeline.
- Configurability: You can change how a component in the pipeline behaves by adjusting settings rather than rewriting code. For example, changing which data fields to extract or where to send the output.
- Adaptability: Dynamic Pipelines can often adapt to varying data loads and can be evolved to use different data sources, news business logic, or new destinations.
Essentially, "dynamic" means our pipelines are designed to be easily modified, extended, and managed, allowing you to respond quickly to new requirements.
How it works (Lifecycle)
A dynamic pipeline goes though some stages from its creation to its operation:
- Definition:
- This is where you plan what your pipeline needs to do. You decide the data source, the sequence of steps (operations) that needs to go though, and where the result should go
- In Datastreamer, this involves selecting and configuring pre-built components from our catalog and arranging them in a specific order, using a visual interface available in our Portal or using our API.
- Deployment:
- Once defined, the pipeline needs to be "deployed". This makes it live, running services within the Datastreamer environment
- Execution:
- This is the running phase where data flows through the pipeline.
- Data starts to enter at the "ingress" points.
- It passes sequentially through each configured component.
- The processed data then exits at the "egress" endpoint
- This happens continuously or based on a schedule, depending on the pipeline's design
- Monitoring & Management:
- While the pipeline is running, it is important to monitor. Are there errors? Are the results the ones you expected?
- Updating/Versioning:
- Dynamic pipeline support versioning, allowing you to revisit to older versions if something goes wrong with a new version that you configured.
- Every new deployment of a updated pipeline will generate a new version of the pipeline automatically.
Core Concepts
A pipeline in Datastreamer is composed of four main types of components.
Ingress
Entry point for data ingestion from various sources
Transformation
Modify data structure or format
Operation
Enrich data
Egress
Exit point for sending or storing processed data.
- Data Connectors (or Ingress Components):
- What it is: The entry point for data into you pipeline. It is how the pipeline "ingests" or receives data in a form of documents
- Examples: Listening for data on an HTTP endpoint, reading messages from Google Pub/Sub, fetching files from an S3 bucket. In Datastreamer, this could be components like
Amazon S3 Storage Ingress
orDirect Data Upload
.
- Transformations:
- What it is: These components modify the structure or format of your data.
- Examples: In Datastreamer, this includes components like
Unify Transformer
orJSON Transform
- Operations:
- What it is: These components perform more complex processing on your data, often adding new information or insights. Unlike transformations that primarily change structure, operations enrich the data with new content.
- Examples: In Datastreamer, this includes components like
AI Sentiment Classifier
that analyzes sentiment,Language Detection
that identifies languages, orAI Entity Recognition Classifier
that extracts entities from text.
- Destination Connectors (or Egress Components):
- What it is: The exit point for data from your pipeline. It's where the processed data is sent or stored.
- Examples: Writing data to a database (like Elasticsearch), sending it to another API, publishing messages to a message queue, storing files, triggering notifications. In Datastreamer, this could be components like
Amazon S3 Storage Egress
orBig Query JSON Writer
.
Pipeline Features
Datastreamer pipelines come with several powerful features that enhance their reliability, maintainability, and scalability.
Isolation
Ensures optimal performance and reliability
Auto-scaling
Seamlessly adjust their computational resources to meet demand
Versioning
Automatically generates a new version
Isolation
Dynamic pipelines (and their individual components) run in isolation from each other. This means:
- Resources (CPU, memory) used by one pipeline don't affect others.
- Problems in one pipeline (errors, crashes) stay contained.
- Each pipeline can be monitored and managed independently.
- Resource allocation is controlled per pipeline.
This isolation improves reliability and stability.
Auto-scaling
Pipelines can automatically adjust their computational resources based on:
- Current volume of data being processed
- Queue sizes
- Other performance metrics
This ensures consistent performance and cost-effectiveness by:
- Scaling up when load increases
- Preventing bottlenecks
If an API suddenly sends 10 times more data than usual, a simple pipeline solution might slow down or crash. At Datastreamer, our auto-scaling pipeline can detect this increased load and automatically provision more resources to handle it.
Versioning
Every new deploy of your pipeline will automatically create a new version. Versioning in dynamic pipelines allows you to:
- Track Changes: Keep a history of how the pipeline's structure (which steps are included, in what order) and configuration (settings for each step) have changed over time.
- Rollback: If a new version of a pipeline introduces an issue, you can quickly revert to a previously known good version.

Image showing 3 different versions of a pipeline
Components Catalog
Dynamic pipelines in Datastreamer are built by combining various types of components. Each component is a specialized unit that performs a specific task. Here are some examples from our catalog:
Data Connectors (Getting Data In):
Amazon S3 Storage Ingress
: Reads files from an Amazon S3 bucket managed by your organization.Google Cloud Storage Ingress
: Reads content from a Google Cloud Storage Bucket.Direct Data Upload
: Enables you to submit documents through the UI.WebSightLine Augmented Instagram
: Provides a feed with Instagram posts each day.Datastreamer File Storage Ingress
: Reads files from Datastreamer file storage.
Transformation Components (Changing Data Structure):
Unify Transformer
: This component transforms documents to the Datastreamer JSON schema.JSON Transform
: Performs a JSON transformation on each document, allowing you to restructure or modify data.Concatenate
: Combines the text from multiple fields into a single field.Map
: Maps a field to another field.
Operation Components (Processing & Enriching Data):
AI Sentiment Classifier
: Analyzes content in multiple languages, identifying sentiment as positive, neutral, or negative.AI Entity Recognition Classifier
: Uses AI to identify person, location, or organization entities.Google Translate
: Applies Google Translate to any field in a document to translate its content.Language Detection (Datastreamer)
: Detects the language used in any field from given inputs.Custom Function
: Enables document editing in the pipeline with Python using customized functions.
Destination Connectors (Sending Data Out):
Amazon S3 Storage Egress
: Stores files in an S3 bucket managed by your organization.Google Cloud Storage Egress
: Stores files in a Google storage bucket managed by your organization.Big Query JSON Writer
: Writes data in JSON format to Google BigQuery.Datastreamer Searchable Storage
: Saves content to Datastreamer's searchable storage, allowing powerful Lucene queries.Webhook Egress
: Sends processed data to a specified external HTTP webhook URL.
Example Use Cases
Automated Daily Sales Reporting
Goal: Fetch daily sales data from an e-commerce platform (via Amazon S3), calculate regional summaries, store them for analysis, and email a summary.
Pipeline:
- Ingress:
Amazon S3 Storage Ingress
(reading sales data). - Transformation:
JSON Transform
(to standardize sales data format). - Operation:
Custom Function
(Python script to filter sales by region, aggregate totals, and prepare email summary). - Egress 1:
Big Query External JSON Writer
(stores aggregated regional summaries). - Egress 2:
Webhook Egress
(sends a summary payload to an internal emailing service).
Diagram:
graph TD A["Ingress: Amazon S3 Storage Ingress<br/>(Sales Data)"] --> B(Transform: JSON Transform<br/>Standardize Format); B --> C["Operation: Custom Function<br/>Filter, Aggregate, Prep Summary"]; C --> D["Egress: Big Query Writer<br/>(Regional Summaries)"]; C --> E["Egress: Webhook Egress<br/>(Email Summary)"];
Real-time Social Media Feedback Analysis & Alerting
Goal: Monitor TikTok posts (using Socialgist TikTok
), analyze sentiment, and immediately alert support for negative feedback.
Pipeline:
- Ingress:
Socialgist TikTok
(configured with Lucene to filter for brand mentions). - Operation 1:
AI Sentiment Classifier
(to determine the sentiment of the post). - Transformation:
JSON Document Router
(routes documents based on sentiment: negative sentiment goes one way, others another). - Operation 2 (for negative sentiment path):
AI Entity Recognition Classifier
(to identify any products or people mentioned). - Egress 1:
Datastreamer Searchable Storage
(stores all ingested posts and their enrichments). - Egress 2 (for negative sentiment path):
Webhook Egress
(sends details of negative posts to a support ticketing system or an alerting platform).
Diagram:
graph TD subgraph "Main Flow" A["Ingress: WebSightLine Instagram Live Feed<br/>(Brand Mentions)"] --> B["Operation: AI Sentiment Classifier"]; B --> C{"Transform: JSON Document Router<br/>(Route by Sentiment)"}; end subgraph "Negative Sentiment Path" C -- Negative --> D["Operation: AI Entity Recognition<br/>(Identify Products/People)"]; D --> E["Egress: Webhook Egress<br/>(Alert Support)"]; D --> F["Egress: Datastreamer Searchable Storage<br/>(Store Enriched Negative Post)"]; end C -- Positive/Neutral --> G["Egress: Datastreamer Searchable Storage<br/>(Store Post)"];
Updated 1 day ago