Real-Time Data Processing: From Scraping to Insights
Learn how to process and analyze scraped data in real-time, turning raw information into actionable business insights with streaming pipelines and instant analytics.
Collecting data is only the first step. The real value comes from processing that data into insights while it's still fresh. This guide walks through building an end-to-end real-time pipeline, from stream ingestion to dashboards and alerts, with patterns you can copy into production.
TL;DR
- Pick ingestion tech by event rate: Redis or managed Kafka under 10K/sec, self-managed Kafka or Kinesis above.
- Apply three core patterns: map for enrichment, aggregate for windows, filter for routing.
- Tier storage into hot, warm, and cold layers, and wire alerts to latency and anomaly thresholds.
What does a real-time data pipeline look like?
A real-time pipeline is a chain of four stages that keep scraped data in motion: ingest, process, store, and serve. Each stage has its own latency budget and failure mode. The diagram below is the mental model we use on every streaming engagement, and the rest of this post expands each box.
The ASCII sketch below shows how the four stages connect to the dashboard and alerting layer on top.
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Scraped │───▶│ Stream │───▶│ Process │───▶│ Store │
│ Data │ │ Ingest │ │ & Enrich│ │ & Serve│
└─────────┘ └─────────┘ └─────────┘ └─────────┘
│ │
└──────────┬───────────────────┘
▼
┌──────────────┐
│ Dashboard │
│ / Alerts │
└──────────────┘
How do I choose stream ingestion technology?
Match the tool to your event rate. Under 10K events per second, managed services are almost always cheaper and faster to ship. Above that, self-managed Kafka, Kinesis, or Pub/Sub earn their keep with throughput and partitioning control.
Choice of technology
For moderate scale (< 10K events/sec)
- Redis Streams
- Apache Kafka (managed, via Confluent Cloud or AWS MSK)
- Postgres logical replication
For high scale (> 10K events/sec)
- Apache Kafka (self-managed)
- Amazon Kinesis Data Streams
- Google Pub/Sub
Implementation example
The snippet below publishes a scraped record to a Redis stream. We use XADD with * so Redis assigns the ID, which keeps ordering monotonic even with multiple producers.
// Redis Streams example
const redis = require('redis')
const client = redis.createClient()
async function publishScrapedData(data) {
await client.xAdd('scraped-data', '*', {
url: data.url,
timestamp: Date.now(),
content: JSON.stringify(data),
source: data.source
})
}
How do I process streams in real time?
Stream processing in Node.js usually comes down to two choices: the built-in stream module for lightweight work, or a dedicated framework like Flink when you need stateful windows, exactly-once semantics, and horizontal scaling across a cluster.
Stream processing frameworks
The pipeline helper below composes a source, a transform, and a destination into one backpressure-aware chain. It's the simplest building block before reaching for a full framework.
// Using Node.js streams
const { pipeline } = require('stream/promises')
await pipeline(
sourceStream,
transformStream,
destinationStream
)
Dedicated frameworks:
- Apache Flink (Java/Scala)
- Apache Storm (Java)
- Hazelcast Jet (Java)
Processing patterns
Three patterns cover most streaming workloads: map, aggregate, and filter. Mix them, don't reinvent them.
1. Map pattern
Use the map pattern to transform each event independently. The function below enriches the raw event with a timestamp, a category, and a sentiment score before passing it downstream.
function enrichWithMetadata(event) {
return {
...event,
processedAt: Date.now(),
category: classifyContent(event.content),
sentiment: analyzeSentiment(event.content)
}
}
2. Aggregate pattern
Use aggregates when the business question is "how many" or "what's the average" over a time window. This snippet bucket events into one-minute windows and emits results when each window closes.
const windowedCounts = new Map()
function processInWindow(event, windowSize = 60000) {
const windowKey = Math.floor(event.timestamp / windowSize)
if (!windowedCounts.has(windowKey)) {
windowedCounts.set(windowKey, { count: 0, events: [] })
}
const window = windowedCounts.get(windowKey)
window.count++
window.events.push(event)
// Emit window results when complete
if (event.timestamp % windowSize < 1000) {
emitWindowResult(windowKey, window)
windowedCounts.delete(windowKey)
}
}
3. Filter pattern
Use filtering to route events to different queues based on priority or type. Splitting the stream early keeps slow consumers from blocking urgent work.
function routeEvent(event) {
if (event.priority === 'urgent') {
return urgentQueue
} else if (event.category === 'pricing') {
return pricingQueue
} else {
return standardQueue
}
}
What metrics should real-time analytics track?
Track three metric families: volume, quality, and timeliness. Volume tells you the system is alive. Quality tells you the data is trustworthy. Timeliness tells you the insights are still actionable by the time they reach a human or a downstream system.
Key metrics to track
- Volume metrics — events per second, throughput in MB/sec, and active sources.
- Quality metrics — completeness percentage, schema validation errors, duplicate rate.
- Timeliness metrics — end-to-end latency, processing lag, alert response time.
Dashboard implementation
The WebSocket server below pushes updates to every connected dashboard client. It's the simplest way to replace polling when your users expect live tiles.
// WebSocket updates to dashboard
const WebSocket = require('ws')
const wss = new WebSocket.Server({ port: 8080 })
function broadcastUpdate(data) {
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(data))
}
})
}
How should I design the alerting system?
A good alerting system combines static thresholds with statistical anomaly detection. Static rules catch the obvious. Anomaly rules catch the drifts you didn't think to write a rule for. Pattern rules catch failure bursts before they become outages.
Alert types
Threshold alerts fire when a single metric crosses a fixed line. Simple, loud, and correct when the line is well-known.
if (metric.value > threshold) {
triggerAlert({
type: 'threshold',
metric: metric.name,
value: metric.value,
severity: 'warning'
})
}
Anomaly detection flags values more than 3 standard deviations from the rolling mean. Useful when "normal" shifts across the day or week.
function detectAnomaly(history, current) {
const mean = history.reduce((a, b) => a + b) / history.length
const stdDev = Math.sqrt(
history.map(x => Math.pow(x - mean, 2))
.reduce((a, b) => a + b) / history.length
)
// Alert if 3 standard deviations from mean
if (Math.abs(current - mean) > 3 * stdDev) {
return true
}
}
Pattern alerts fire when a sequence of events matches a known failure shape. The snippet below trips when more than 10 scraping failures stack up in five minutes.
// Detect scraping failures
const recentFailures = failures.filter(
f => Date.now() - f.timestamp < 300000 // Last 5 min
)
if (recentFailures.length > 10) {
triggerAlert('Scraping failure rate elevated')
}
How should I tier storage for streaming data?
Split storage by access pattern. Hot data needs sub-millisecond reads. Warm data needs fast aggregations. Cold data needs cheap durability. Using one database for all three is how teams burn cash on Redis or starve dashboards on S3.
Hot data (recent)
Redis sorted sets keep the last 1,000 scraped items ready for instant dashboard reads. The zRemRangeByRank call trims the tail so memory stays bounded.
// Redis for recent data
async function storeRecentData(data) {
await client.zAdd('recent:scraped', {
score: Date.now(),
value: JSON.stringify(data)
})
// Keep only last 1000 items
await client.zRemRangeByRank('recent:scraped', 0, -1001)
}
Warm data (analytics)
A timeseries database like InfluxDB handles rolled-up metrics for analytics queries. Tags index cheaply, fields store the numbers you'll aggregate.
// Timeseries database
async function storeMetrics(metric) {
await influxDB.write({
measurement: 'scraped_data',
tags: { source: metric.source },
fields: {
count: metric.count,
duration: metric.duration
},
timestamp: metric.timestamp
})
}
Cold data (archive)
S3 is the default archive target. Partitioning by date keeps Athena queries and lifecycle rules simple later.
// S3 for long-term storage
const { PutObjectCommand } = require('@aws-sdk/client-s3')
async function archiveData(data) {
await s3.send(new PutObjectCommand({
Bucket: 'scraped-data-archive',
Key: `${datePartition(data.timestamp)}/${data.id}.json`,
Body: JSON.stringify(data)
}))
}
What are the most common real-time use cases?
Price monitoring is the canonical use case and sits squarely inside a competitive intelligence program. The stream below is how those CI dashboards actually refresh in seconds instead of days. Content and stock monitoring follow the same shape: diff the current snapshot against the previous one, then act on the delta.
1. Price monitoring
The function compares the latest price to the last recorded one and fires a notification when the change exceeds 5%.
// Detect price changes
async function checkPriceChange(product) {
const previous = await getPreviousPrice(product.id)
const change = (product.price - previous.price) / previous.price
if (Math.abs(change) > 0.05) { // 5% change
await notify({
type: 'price_change',
product: product.id,
change: change * 100
})
}
}
2. Content monitoring
This pattern diffs headline lists between polls and routes only the new URLs downstream, avoiding reprocessing.
// Detect new articles
async function checkForNewContent(source) {
const current = await getCurrentHeadlines(source)
const previous = await getPreviousHeadlines(source)
const newArticles = current.filter(
item => !previous.includes(item.url)
)
if (newArticles.length > 0) {
await processNewArticles(newArticles)
}
}
3. Stock monitoring
A one-liner check that triggers a restock alert when inventory drops below a configured threshold.
// Monitor inventory levels
async function checkStockLevel(product) {
if (product.stock < threshold) {
await triggerRestockAlert(product)
}
}
What latency targets should I aim for?
Set target and maximum latencies per stage, then budget the total. The table below is the baseline we use for most client pipelines. Treat "target" as the SLO and "maximum" as the paging threshold.
Latency targets
| Stage | Target | Maximum |
|---|---|---|
| Ingestion | < 100ms | 500ms |
| Processing | < 1s | 5s |
| Storage | < 500ms | 2s |
| Alerts | < 1s | 10s |
Optimization techniques
Three levers move latency the most: batching, async fan-out, and compression.
1. Batch processing amortizes per-call overhead by flushing accumulated items on an interval.
const batch = []
setInterval(() => {
if (batch.length > 0) {
processBatch(batch)
batch.length = 0
}
}, 1000)
2. Async processing skips await on non-critical side effects so the hot path isn't blocked.
// Don't await for non-critical operations
processData(data)
updateMetrics(data) // Fire and forget
3. Data compression cuts transmission time for large payloads between services.
// Compress before transmission
const compressed = gzip.encode(JSON.stringify(largeData))
How do I monitor a real-time system in production?
Expose a health endpoint and record processing times as histograms. The health endpoint gives load balancers something to probe. Histograms give you p50, p95, and p99 latencies, which are the numbers that matter when something starts to drift.
Health checks
The endpoint returns uptime, memory, queue depth, and processing rate. When the queue depth exceeds 1,000, the status flips to degraded so upstream load balancers can react.
// System health endpoint
app.get('/health', async (req, res) => {
const health = {
status: 'ok',
uptime: process.uptime(),
memory: process.memoryUsage(),
queueDepth: await getQueueDepth(),
processingRate: getProcessingRate()
}
if (health.queueDepth > 1000) {
health.status = 'degraded'
}
res.json(health)
})
Performance metrics
A histogram records every processing duration so you can chart percentile latency over time.
// Track processing times
const histogram = new Histogram({
name: 'processing_time',
measurement: 'milliseconds'
})
function recordProcessingTime(duration) {
histogram.observe(duration)
}
What are the common streaming challenges?
Three problems show up in every production pipeline: out-of-order events, duplicates, and backpressure. Solve them early, or they compound into silent data corruption.
1. Out-of-order events
Events arrive out of order when network paths differ. Use the event's own timestamp, not wall-clock time, and branch late events into a historical update path.
// Use event timestamps, not processing times
function processEvent(event) {
const eventTime = event.timestamp
// Handle late events appropriately
if (eventTime < lastProcessedTime) {
// Late event - update historical data
updateHistoricalData(event)
} else {
// Normal processing
processData(event)
}
}
2. Duplicate events
Make processing idempotent by keying on a stable event ID. The Set below short-circuits replays without touching the downstream system.
// Idempotent processing
const processed = new Set()
function processEvent(event) {
const id = generateEventId(event)
if (processed.has(id)) return
processed.add(id)
// Actual processing...
}
3. Backpressure
When producers outpace consumers, either slow the producers or scale the workers. Doing neither is how queues explode.
// Implement backpressure
if (queueDepth > maxDepth) {
// Slow down producers
await throttleProducers()
// Or scale up consumers
await scaleUpWorkers()
}
Conclusion
Building real-time pipelines is an exercise in latency budgeting and failure planning. Start with the smallest architecture that meets your SLO, measure every stage, and scale the specific bottleneck instead of the whole stack.
For the scraping side of the pipeline, workers, queues, proxy rotation, see our technical scaling guide. And if you're piping events into LLMs for classification or summarization, AI-powered scraping is the next layer up.
About SIÁN Team
SIÁN Agency builds automated data pipelines for small businesses — from web scraping to AI processing to workflow integration. We write about what we know from building these systems every day.
More Articles
Data Pipeline for Small Business: The 2026 SME Guide
A practical 2026 data pipeline guide for small business — the 4 layers, real costs (€150–€800/mo), and the 380% first-year ROI math. No data engineer needed.
We Won the Apify 1 Million Challenge Grand Prize
SIÁN Agency took home 1st place in the Apify 1 Million Challenge. Here's what we built, how we approached it, and what it means for our work going forward.
The Future of Web Scraping: AI-Powered Solutions
Explore how artificial intelligence is revolutionizing web data extraction, making it more efficient, accurate, and scalable than ever before. Discover machine learning techniques for intelligent scraping.