Most CBM Systems Fail at the Last Mile
You can have the best RUL prediction model in the world, but if your maintenance team doesn’t see the alert until 8 hours after the anomaly started, you’ve already lost. The deployment gap between “model works in Jupyter” and “ops team gets actionable alerts” is where most condition-based maintenance projects die.
This isn’t about model accuracy anymore. It’s about latency, reliability, and making sure the right person sees the right data at the right time. A mediocre model with a solid deployment pipeline beats a perfect model that runs once a day via cron job.
We’ve spent three parts building a CBM system from scratch: ingesting sensor data (Part 1), extracting health indicators (Part 2), and training RUL prediction models (Part 3). Now we’re deploying the whole thing as a real-time dashboard with automatic alerting. This means streaming data ingestion, continuous model inference, threshold-based alerts pushed to Slack, and a web UI that doesn’t require refreshing the page.

The Architecture That Actually Works
Forget microservices and Kubernetes for a portfolio project. Here’s what you actually need: a Flask backend, a lightweight time-series database (InfluxDB or TimescaleDB), a message queue (Redis), and a frontend that polls or uses WebSockets. That’s it.
The data flow looks like this:
- Sensor data arrives via POST request (simulated or real hardware)
- Feature extraction happens in-memory (FFT, RMS, kurtosis — same pipeline from Part 2)
- Model inference runs on every batch (LSTM from Part 3, loaded once at startup)
- Predicted RUL gets stored in TimescaleDB with timestamp
- Alert logic checks thresholds and pushes to Slack if RUL < 50 hours
- Dashboard queries recent data and renders a live chart
No Kafka, no Airflow, no Docker Compose with 12 services. You can scale this later if you actually get traction.
Here’s the backend skeleton (Flask + Redis + psycopg2 for TimescaleDB):
from flask import Flask, request, jsonify
from flask_cors import CORS
import numpy as np
import redis
import psycopg2
from datetime import datetime
import pickle
import requests
from scipy.fft import rfft, rfftfreq
from scipy.stats import kurtosis, skew
app = Flask(__name__)
CORS(app)
# Load trained LSTM model (saved from Part 3)
with open('rul_lstm_model.pkl', 'rb') as f:
model = pickle.load(f) # this is a keras model wrapper
# Redis for temporary queue (optional, can skip for MVP)
r = redis.Redis(host='localhost', port=6379, decode_responses=False)
# TimescaleDB connection
db_conn = psycopg2.connect(
host="localhost",
database="cbm_dashboard",
user="postgres",
password="your_password"
)
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
RUL_ALERT_THRESHOLD = 50 # hours
def extract_features(vibration_data, temp_data, sample_rate=1000):
"""Same feature extraction from Part 2, condensed."""
# Time-domain features
vib_rms = np.sqrt(np.mean(vibration_data**2))
vib_peak = np.max(np.abs(vibration_data))
vib_kurtosis = kurtosis(vibration_data)
vib_skew = skew(vibration_data)
# Frequency-domain features
fft_vals = np.abs(rfft(vibration_data))
freqs = rfftfreq(len(vibration_data), 1/sample_rate)
# Dominant frequency and power in bearing fault bands
bearing_band = (freqs > 100) & (freqs < 500) # Hz, adjust for your case
bearing_power = np.sum(fft_vals[bearing_band]**2)
temp_mean = np.mean(temp_data)
temp_std = np.std(temp_data)
return np.array([
vib_rms, vib_peak, vib_kurtosis, vib_skew,
bearing_power, temp_mean, temp_std
])
def predict_rul(features):
"""Run inference on LSTM model. Expects shape (1, lookback, n_features)."""
# In practice, you need a sliding window of past features.
# For demo, assume we store last 10 timesteps in Redis or DB.
# Here we'll fake it with a single timestep repeated (not ideal, but works).
lookback = 10 # same as training
feature_window = np.tile(features, (lookback, 1)) # cheat: repeat current features
feature_window = feature_window.reshape(1, lookback, -1)
rul_hours = model.predict(feature_window, verbose=0)[0][0]
return float(rul_hours)
def send_slack_alert(asset_id, rul_hours):
"""Push alert to Slack if RUL drops below threshold."""
message = {
"text": f"⚠️ *CBM Alert*: Asset `{asset_id}` RUL dropped to *{rul_hours:.1f} hours*. Inspect immediately."
}
try:
resp = requests.post(SLACK_WEBHOOK, json=message, timeout=5)
if resp.status_code != 200:
print(f"Slack webhook failed: {resp.status_code}")
except Exception as e:
print(f"Slack alert error: {e}")
@app.route('/ingest', methods=['POST'])
def ingest_data():
"""Receive sensor data, extract features, predict RUL, store, alert."""
data = request.get_json()
asset_id = data.get('asset_id', 'unknown')
vibration = np.array(data['vibration']) # expect list of floats
temperature = np.array(data['temperature'])
timestamp = data.get('timestamp', datetime.utcnow().isoformat())
# Feature extraction
features = extract_features(vibration, temperature)
# RUL prediction
rul_hours = predict_rul(features)
# Store in TimescaleDB
cursor = db_conn.cursor()
cursor.execute(
"INSERT INTO rul_predictions (asset_id, timestamp, rul_hours, features) VALUES (%s, %s, %s, %s)",
(asset_id, timestamp, rul_hours, features.tolist())
)
db_conn.commit()
cursor.close()
# Alert logic
if rul_hours < RUL_ALERT_THRESHOLD:
send_slack_alert(asset_id, rul_hours)
return jsonify({"status": "ok", "rul_hours": rul_hours})
@app.route('/dashboard/<asset_id>', methods=['GET'])
def get_dashboard_data(asset_id):
"""Fetch last 100 RUL predictions for charting."""
cursor = db_conn.cursor()
cursor.execute(
"SELECT timestamp, rul_hours FROM rul_predictions WHERE asset_id = %s ORDER BY timestamp DESC LIMIT 100",
(asset_id,)
)
rows = cursor.fetchall()
cursor.close()
data = [{"time": row[0].isoformat(), "rul": row[1]} for row in reversed(rows)]
return jsonify(data)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
This is 90% of your backend. The LSTM model from Part 3 gets loaded once at startup (not per request — that’s a rookie mistake). The /ingest endpoint handles everything: feature extraction, inference, storage, alerting. The /dashboard endpoint just pulls recent predictions for charting.
The Feature Window Problem (And Why I’m Cheating)
Notice the predict_rul() function has a hack: it repeats the current feature vector 10 times to fill the LSTM’s lookback window. In production, you’d maintain a sliding window of the last 10 feature vectors per asset (stored in Redis or a small in-memory cache). But for a portfolio demo, the repeated-feature trick works surprisingly well because the LSTM mostly cares about the current state, not the temporal evolution (at least for our bearing dataset).
If your model genuinely needs temporal context — say, detecting gradual drift over hours — you MUST implement a proper sliding window. Here’s the idea:
# Redis-based sliding window (pseudocode)
def get_feature_window(asset_id, new_features, lookback=10):
key = f"features:{asset_id}"
r.rpush(key, pickle.dumps(new_features))
r.ltrim(key, -lookback, -1) # keep only last 10
window = [pickle.loads(x) for x in r.lrange(key, 0, -1)]
# Pad if not enough history yet
while len(window) < lookback:
window.insert(0, window[0] if window else new_features)
return np.array(window)
This adds Redis dependency but solves the temporal context issue properly. For a single-asset demo, an in-memory collections.deque works just fine.
TimescaleDB Setup (Because Time-Series Data Matters)
Don’t use vanilla PostgreSQL for time-series data. TimescaleDB is a free extension that adds automatic partitioning (called “hypertables”) and makes time-range queries 10-100x faster. Installation on Ubuntu:
sudo apt install postgresql postgresql-contrib
sudo add-apt-repository ppa:timescale/timescaledb-ppa
sudo apt update && sudo apt install timescaledb-postgresql-14
sudo timescaledb-tune # auto-configure postgres.conf
sudo systemctl restart postgresql
Then create the database and enable the extension:
CREATE DATABASE cbm_dashboard;
\c cbm_dashboard
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE TABLE rul_predictions (
asset_id TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
rul_hours REAL NOT NULL,
features REAL[],
PRIMARY KEY (asset_id, timestamp)
);
SELECT create_hypertable('rul_predictions', 'timestamp');
The create_hypertable() call converts the table into a time-series optimized structure. Queries like “get last 7 days” now use partitioned chunks instead of scanning the whole table. For a portfolio project with <1M rows, this is overkill — but it shows you know the difference.
Frontend: Polling vs WebSockets (And Why Polling Won)
Everyone wants WebSockets for “real-time” dashboards. But here’s the dirty secret: polling every 2 seconds is simpler, more reliable, and good enough for 90% of use cases. WebSockets add complexity (reconnection logic, server-side connection pooling, scaling issues) that you don’t need unless you’re pushing updates every 100ms.
Here’s a minimal React dashboard (or vanilla JS if you prefer):
import React, { useEffect, useState } from 'react';
import { LineChart, Line, XAxis, YAxis, CartesianGrid, Tooltip, Legend } from 'recharts';
function Dashboard() {
const [data, setData] = useState([]);
const [assetId, setAssetId] = useState('pump_01');
const [latestRUL, setLatestRUL] = useState(null);
useEffect(() => {
const fetchData = async () => {
try {
const resp = await fetch(`http://localhost:5000/dashboard/${assetId}`);
const json = await resp.json();
setData(json);
if (json.length > 0) {
setLatestRUL(json[json.length - 1].rul);
}
} catch (err) {
console.error('Fetch failed:', err);
}
};
fetchData();
const interval = setInterval(fetchData, 2000); // poll every 2s
return () => clearInterval(interval);
}, [assetId]);
const alertStyle = latestRUL !== null && latestRUL < 50
? { color: 'red', fontWeight: 'bold' }
: { color: 'green' };
return (
<div style={{ padding: '20px' }}>
<h1>CBM Dashboard</h1>
<label>
Asset ID:
<input
type="text"
value={assetId}
onChange={(e) => setAssetId(e.target.value)}
style={{ marginLeft: '10px' }}
/>
</label>
<h2 style={alertStyle}>
Latest RUL: {latestRUL !== null ? `${latestRUL.toFixed(1)} hours` : 'N/A'}
</h2>
<LineChart width={800} height={400} data={data}>
<CartesianGrid strokeDasharray="3 3" />
<XAxis dataKey="time" />
<YAxis label={{ value: 'RUL (hours)', angle: -90, position: 'insideLeft' }} />
<Tooltip />
<Legend />
<Line type="monotone" dataKey="rul" stroke="#8884d8" dot={false} />
</LineChart>
</div>
);
}
export default Dashboard;
This polls /dashboard/<asset_id> every 2 seconds, renders a line chart (via Recharts), and highlights the latest RUL in red if it’s below 50 hours. That’s the entire frontend. You can add a second chart for raw sensor data, or a table of recent alerts, but the core is just “fetch data, render chart, repeat.”
If you genuinely need sub-second updates (e.g., vibration waveform visualization), switch to WebSockets. Flask-SocketIO makes it easy:
from flask_socketio import SocketIO, emit
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('subscribe')
def handle_subscribe(data):
asset_id = data['asset_id']
# join a room for this asset
# whenever new data arrives, emit to this room
pass
But honestly, unless your sensors push data faster than 1 Hz, polling is cleaner.
Alert Logic: Thresholds, Hysteresis, and Rate Limiting
The naive alert logic is: “if RUL < 50, send Slack message.” This floods your channel with duplicate alerts because RUL might hover around 49-51 for hours. You need three fixes:
- Hysteresis: Only alert if RUL drops below 50 after being above 55. This prevents flapping.
- Rate limiting: Only send one alert per asset per hour (store last alert time in Redis).
- Severity levels: Warn at RUL < 50, critical at RUL < 20, emergency at RUL < 5.
Here’s the improved version:
def check_alert(asset_id, rul_hours):
key = f"alert:{asset_id}"
last_alert = r.get(key)
# Rate limit: only alert once per hour
if last_alert is not None:
last_time = float(last_alert.decode())
if (datetime.utcnow().timestamp() - last_time) < 3600:
return # skip alert
# Hysteresis: check if we were above 55 recently
prev_rul_key = f"prev_rul:{asset_id}"
prev_rul = r.get(prev_rul_key)
if prev_rul is not None:
prev_rul = float(prev_rul.decode())
if prev_rul < 55: # already in danger zone, no new alert
r.set(prev_rul_key, rul_hours, ex=3600)
return
# Determine severity
if rul_hours < 5:
severity = "🚨 EMERGENCY"
elif rul_hours < 20:
severity = "🔴 CRITICAL"
elif rul_hours < 50:
severity = "⚠️ WARNING"
else:
r.set(prev_rul_key, rul_hours, ex=3600)
return # no alert needed
# Send alert
message = {
"text": f"{severity}: Asset `{asset_id}` RUL = *{rul_hours:.1f} hours*. Check now."
}
requests.post(SLACK_WEBHOOK, json=message, timeout=5)
# Record alert time
r.set(key, datetime.utcnow().timestamp(), ex=3600)
r.set(prev_rul_key, rul_hours, ex=3600)
This avoids alert fatigue while still catching genuine degradation. The hysteresis logic isn’t perfect (what if RUL jumps from 60 to 45 in one step? — you’d still alert, which is correct). You can tune the thresholds based on your asset’s failure curve.
Simulating Sensor Data (Because You Don’t Have a Real Bearing)
Unless you work at a factory, you’re not getting live sensor data. So simulate it. Here’s a script that generates synthetic vibration + temperature data and POSTs it to the Flask backend:
import numpy as np
import requests
import time
from datetime import datetime
BASE_URL = "http://localhost:5000/ingest"
ASSET_ID = "pump_01"
def generate_sensor_data(rul_hours):
"""Simulate vibration and temperature based on remaining useful life."""
# As RUL decreases, vibration amplitude and temp increase
health_factor = max(0, rul_hours / 200.0) # 0=broken, 1=healthy
# Vibration: add bearing fault frequency (e.g., 200 Hz) with amplitude growing
t = np.linspace(0, 1, 1000) # 1 second at 1 kHz
vibration = (1 - health_factor) * 0.5 * np.sin(2 * np.pi * 200 * t) # fault component
vibration += 0.1 * np.random.randn(1000) # background noise
# Temperature: baseline 40°C, rises to 80°C as bearing fails
temp_base = 40 + (1 - health_factor) * 40
temperature = temp_base + np.random.randn(100) * 2 # 100 samples
return vibration.tolist(), temperature.tolist()
def simulate_degradation():
"""Simulate gradual bearing degradation over time."""
rul_hours = 200.0 # start healthy
while rul_hours > 0:
vibration, temperature = generate_sensor_data(rul_hours)
payload = {
"asset_id": ASSET_ID,
"timestamp": datetime.utcnow().isoformat(),
"vibration": vibration,
"temperature": temperature
}
try:
resp = requests.post(BASE_URL, json=payload, timeout=5)
if resp.status_code == 200:
result = resp.json()
print(f"[{datetime.utcnow().isoformat()}] Sent data. Predicted RUL: {result['rul_hours']:.1f} hours")
else:
print(f"POST failed: {resp.status_code}")
except Exception as e:
print(f"Request error: {e}")
# Degrade by 5 hours per iteration (simulate accelerated aging)
rul_hours -= 5
time.sleep(2) # send data every 2 seconds
if __name__ == '__main__':
simulate_degradation()
Run this in a separate terminal while your Flask backend is running. You’ll see RUL predictions update in real-time, and Slack alerts will fire once RUL drops below 50. The degradation is exaggerated (5 hours per 2 seconds) so you can watch the full lifecycle in a few minutes.
What I’d Change If This Were Production
This portfolio setup cuts corners. If you were deploying this at a real factory, here’s what you’d need to fix:
- Model versioning: The
rul_lstm_model.pklis hardcoded. In production, use MLflow or DVC to track model versions and swap them without restarting the server. - Sliding window storage: The repeated-feature hack breaks temporal LSTMs. Implement a proper Redis/DB-backed sliding window.
- Monitoring: Add Prometheus metrics (request latency, prediction distribution, DB connection pool size) and Grafana dashboards.
- Error handling: The code assumes sensor data always arrives in the right format. In reality, you’ll get malformed JSON, missing fields, NaN values, and occasional sensor dropout. Add validation with
pydanticormarshmallow. - Batch inference: If you have 100 assets, don’t run LSTM inference 100 times per second. Batch predictions into a single GPU call.
- Alert escalation: Integrate with PagerDuty or OpsGenie so critical alerts page the on-call engineer, not just post to Slack.
- Model retraining: RUL models drift as equipment ages. Schedule weekly retraining on new data (automate with Airflow or Prefect).
- Security: The
/ingestendpoint has no authentication. Add API keys or OAuth before exposing it to the internet.
But for a portfolio project? The current setup is enough to demonstrate you understand the full stack.
The Math Behind Alert Thresholds
Choosing the RUL alert threshold isn’t arbitrary. You want to minimize false positives (alerting when equipment is fine) and false negatives (missing real failures). This is a classic precision-recall tradeoff.
Define:
– = true positives (alerted before failure)
– = false positives (alerted but no failure)
– = false negatives (missed failure)
Precision and recall are:
You want high recall (catch all failures) but not at the cost of drowning in false alarms. The score balances both:
In practice, you plot a precision-recall curve by sweeping the RUL threshold from 10 to 100 hours, compute for each, and pick the threshold that maximizes it. This requires historical failure data (which you probably don’t have for a portfolio project). If you’re guessing, 50 hours is a safe default — enough time to order parts but not so early that you alert on healthy equipment.
Another approach: cost-sensitive thresholds. If a false negative costs (unplanned downtime) and a false positive costs (unnecessary inspection), weight recall 20x higher. The threshold becomes:
This pushes the threshold higher (alert earlier) to avoid missing failures.
Deployment Checklist (Before You Show This to Anyone)
- [ ] Backend runs without crashing for 10+ minutes
- [ ] Dashboard renders charts correctly (test in Chrome + Firefox)
- [ ] Slack alerts fire when RUL drops below threshold
- [ ] Simulate degradation script completes without errors
- [ ] TimescaleDB queries return in <100ms (check with
EXPLAIN ANALYZE) - [ ] Feature extraction matches Part 2 (same RMS, FFT, kurtosis formulas)
- [ ] LSTM model loads successfully at startup (check logs)
- [ ] Frontend handles missing data gracefully (empty chart, not crash)
- [ ] Alert rate limiting works (no duplicate Slack messages within 1 hour)
- [ ] Code is on GitHub with a README that explains how to run it
If you’re interviewing for a data science role, this is your portfolio closer. Jupyter notebooks are fine for exploration, but a working dashboard proves you can ship.
Why This Matters (And What I’d Do Next)
Most CBM projects never leave the lab. You’ll see papers with 99% RUL prediction accuracy on NASA datasets, but zero mention of how to deploy it. This post fills that gap.
The hard parts aren’t the LSTM or the FFT — those are solved problems. The hard parts are:
– Getting sensor data into the system reliably
– Running inference fast enough for real-time decisions
– Making alerts actionable (not just “something is wrong”)
– Building UI that maintenance engineers actually use
If I were extending this, I’d add:
1. Explainability: Use SHAP or integrated gradients to show which features drove the low RUL prediction (e.g., “bearing power spike at 200 Hz”).
2. Multi-asset comparison: Dashboard showing 10 pumps side-by-side, sorted by urgency.
3. Historical failure analysis: When a pump fails, backtest how early the model could’ve predicted it.
4. Anomaly detection: Flag weird sensor patterns even if RUL is high (e.g., sudden temperature spike).
But the core pipeline — data ingestion, feature extraction, inference, alerting, visualization — is done. You’ve built a complete CBM system from scratch.
Use dashboards with embedded models for internal tools where downtime risk justifies the engineering cost. For public-facing products or low-stakes monitoring, simpler heuristics (threshold alerts on raw sensor data) often suffice. The LSTM adds value when failure modes are subtle and multi-dimensional — if a simple “temp > 70°C” rule catches 90% of failures, skip the neural net.
Did you find this helpful?
☕ Buy me a coffee
Leave a Reply