The Problem: You Can’t Just Merge CSV Files from Five Different PLCs
I spent three months building a PHM system for a chemical plant with 47 rotating assets spread across four production lines. Each line had its own SCADA system. Two were Siemens S7-1500s, one was an Allen-Bradley ControlLogix, and the last was a legacy Modbus TCP setup from 2008 that nobody wanted to touch.
The operations team wanted real-time bearing health monitoring across all of them. Sounds simple until you realize each SCADA polls at different rates (100ms, 500ms, 1s, 2s), uses different tag naming conventions, and timestamps in different formats. One system gave Unix epoch milliseconds. Another gave ISO 8601 strings with inconsistent timezone handling. The Modbus system? It didn’t timestamp at all.
You can’t just throw this into a pandas DataFrame and call it a day.
Two Architectures: Message Queue vs Time-Series Database First
I tried two approaches. The first used MQTT as a central message broker with edge processors on each SCADA system. The second bypassed messaging entirely and wrote directly to InfluxDB with a time-series-first design. Both worked. Both had ugly failure modes.
Architecture A: MQTT + Stream Processing
The MQTT approach looked clean on paper. Each SCADA system runs a lightweight Python client (50-80 lines) that subscribes to relevant OPC UA tags, resamples to a common 1Hz cadence, and publishes JSON payloads to topic hierarchies like plant/line2/pump_04/vibration. A central Kafka Streams processor consumes these, performs sensor fusion, and triggers feature extraction pipelines.
Here’s the actual edge client I deployed on the Siemens PLCs:
import time
import json
import paho.mqtt.client as mqtt
from opcua import Client
import numpy as np
from collections import deque
class SCADAEdgeClient:
def __init__(self, opc_url, mqtt_broker, asset_id, buffer_size=100):
self.opc = Client(opc_url)
self.mqtt = mqtt.Client()
self.mqtt.connect(mqtt_broker, 1883, 60)
self.asset_id = asset_id
self.buffer = deque(maxlen=buffer_size) # Rolling window for resampling
def run(self):
self.opc.connect()
node = self.opc.get_node("ns=2;s=Machine.VibrationRMS") # Actual tag from Siemens TIA Portal
while True:
try:
val = node.get_value()
ts = time.time()
self.buffer.append((ts, val))
# Resample to 1Hz by averaging last 100ms of data
if len(self.buffer) >= 10: # Assuming ~100ms OPC poll rate
recent = [v for t, v in self.buffer if ts - t < 0.1]
if recent:
avg = np.mean(recent)
payload = {
"asset_id": self.asset_id,
"timestamp": int(ts * 1000), # Unix ms
"vibration_rms": float(avg),
"unit": "mm/s"
}
self.mqtt.publish(f"plant/line1/{self.asset_id}/vib", json.dumps(payload))
time.sleep(0.01) # 100Hz polling, way faster than needed but OPC can handle it
except Exception as e:
# This actually happened: network hiccup killed OPC connection
print(f"OPC read failed: {e}, reconnecting...")
self.opc.disconnect()
time.sleep(5)
self.opc.connect()
The good: Decoupled architecture. If the central server dies, edge clients keep buffering. MQTT’s QoS 1 guarantees at-least-once delivery. I could add new assets by just spinning up another client — no central config changes.
The bad: Message ordering is a nightmare with multiple publishers. Even with QoS 1, you get out-of-order delivery when network latency spikes. I had to add sequence numbers and a 2-second reordering window in the Kafka consumer. And here’s the killer: MQTT broker became a single point of failure. When it went down during a routine update, I lost 18 minutes of data because the edge devices’ disk buffers overflowed. (I set buffer size to 10,000 messages thinking that was plenty. It wasn’t.)
Architecture B: Direct Time-Series Write
The second approach skipped messaging. Each edge client wrote directly to InfluxDB 2.x using batched writes. No broker, no queue, just HTTP POST with line protocol.
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from opcua import Client
import time
class DirectTSDBClient:
def __init__(self, opc_url, influx_url, token, org, bucket, asset_id):
self.opc = Client(opc_url)
self.influx = InfluxDBClient(url=influx_url, token=token, org=org)
self.write_api = self.influx.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.asset_id = asset_id
self.batch = []
def run(self):
self.opc.connect()
vib_node = self.opc.get_node("ns=2;s=Machine.VibrationRMS")
temp_node = self.opc.get_node("ns=2;s=Machine.BearingTemp")
while True:
try:
vib = vib_node.get_value()
temp = temp_node.get_value()
ts = time.time_ns() # Nanosecond precision, InfluxDB loves this
# Multi-field point: one timestamp, multiple measurements
point = Point("asset_health") \
.tag("asset_id", self.asset_id) \
.tag("line", "line1") \
.field("vibration_rms", float(vib)) \
.field("bearing_temp", float(temp)) \
.time(ts)
self.batch.append(point)
# Batch writes every 100 points or 1 second
if len(self.batch) >= 100:
self.write_api.write(bucket=self.bucket, record=self.batch)
self.batch = []
time.sleep(0.01)
except Exception as e:
# Influx write can fail if server is overloaded
print(f"Write failed: {e}, will retry batch")
time.sleep(1)
The good: Simpler. No broker to maintain. InfluxDB handles time-series indexing, so queries like “give me last hour of vibration for pump_04” are instant. The retention policies are built-in — raw data keeps for 7 days, downsampled 1-minute averages keep for 90 days. And here’s the key advantage: InfluxDB’s continuous queries let me pre-compute rolling statistics (RMS over 10s windows, FFT peak frequencies) server-side without spinning up separate stream processors.
The bad: InfluxDB became the bottleneck. With 47 assets × 4 sensors each × 100 Hz, that’s 18,800 writes/second. InfluxDB 2.x on a 4-core VM with 16GB RAM started choking around 12,000 writes/sec. Latency spiked to 500ms. I had to downsample at the edge to 10 Hz, which lost high-frequency bearing fault signatures above Nyquist limit of 5 Hz. Not acceptable for inner race defects.
The Hybrid Solution That Actually Worked
I ended up combining both. MQTT for high-frequency raw streams (100 Hz vibration, 50 Hz current), direct InfluxDB writes for low-frequency telemetry (1 Hz temperature, pressure, flow rate). The MQTT consumers ran on a separate 8-core box doing real-time FFT and envelope analysis, then wrote extracted features (peak frequencies, kurtosis, crest factor) to InfluxDB at 1 Hz.
This split the load. High-frequency data never hit the database — only the diagnostically relevant features did. Raw waveforms got buffered to S3 in 10-minute Parquet files for offline analysis. If an anomaly detector flagged something, I could pull the raw waveform from S3 for deep dive.
Here’s the Kafka Streams processor that did feature extraction:
import json
import numpy as np
from scipy import signal
from kafka import KafkaConsumer, KafkaProducer
from influxdb_client import InfluxDBClient, Point
class FeatureExtractor:
def __init__(self, kafka_broker, influx_url, token, org, bucket):
self.consumer = KafkaConsumer(
'raw_vibration',
bootstrap_servers=kafka_broker,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='feature_extractor'
)
self.influx = InfluxDBClient(url=influx_url, token=token, org=org)
self.write_api = self.influx.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.window = {} # Asset ID -> deque of last 100 samples (1 second at 100Hz)
def extract_features(self, samples):
"""Compute time-domain and frequency-domain features"""
arr = np.array(samples)
rms = np.sqrt(np.mean(arr**2))
peak = np.max(np.abs(arr))
crest_factor = peak / rms if rms > 0 else 0
kurtosis = np.mean((arr - arr.mean())**4) / (np.std(arr)**4) if len(arr) > 3 else 0
# FFT for dominant frequency
fft = np.fft.rfft(arr)
freqs = np.fft.rfftfreq(len(arr), d=0.01) # 100 Hz sampling
peak_freq = freqs[np.argmax(np.abs(fft))]
return {
'rms': float(rms),
'crest_factor': float(crest_factor),
'kurtosis': float(kurtosis),
'peak_freq': float(peak_freq)
}
def run(self):
for msg in self.consumer:
data = msg.value
asset_id = data['asset_id']
value = data['vibration_rms']
ts = data['timestamp']
if asset_id not in self.window:
self.window[asset_id] = []
self.window[asset_id].append(value)
# Process every 100 samples (1 second)
if len(self.window[asset_id]) >= 100:
features = self.extract_features(self.window[asset_id])
point = Point("vibration_features") \
.tag("asset_id", asset_id) \
.field("rms", features['rms']) \
.field("crest_factor", features['crest_factor']) \
.field("kurtosis", features['kurtosis']) \
.field("peak_freq", features['peak_freq']) \
.time(ts * 1000000) # Convert ms to ns
try:
self.write_api.write(bucket=self.bucket, record=point)
except Exception as e:
# I've seen this when InfluxDB gets restarted mid-write
print(f"Influx write failed for {asset_id}: {e}")
self.window[asset_id] = [] # Reset window
This ran for 8 months with 99.7% uptime. The only downtime was a planned Kafka upgrade and one incident where the S3 buffer filled up because I miscalculated storage costs. (Turns out 47 assets × 100 Hz × 4 bytes × 86400 seconds/day = 1.6 TB/day. Oops.)
The Failure Modes Nobody Talks About
Sensor drift killed me twice. The Modbus-connected accelerometers (cheap MEMS sensors, $80 each) drifted by 0.3 mm/s over 6 months due to temperature cycling. My anomaly detector started flagging healthy pumps as degraded because the baseline had shifted. I had to implement adaptive baseline tracking using exponential moving averages with a 30-day window.
Clock skew was worse. The legacy Modbus system didn’t have NTP. Its internal clock drifted +47 seconds over 3 months. My sensor fusion algorithm assumed all timestamps were synchronized within ±100ms. When I merged Modbus temperature with Siemens vibration, the time-alignment was off by nearly a minute. Correlation analysis between temperature rise and bearing wear became meaningless. Fix: I added a secondary alignment step using cross-correlation of slowly-varying signals (flow rate) to estimate and correct clock offset.
And network partitions. When a switch firmware update went wrong, the Allen-Bradley PLC got isolated for 11 minutes. MQTT clients on that subnet kept buffering, but the disk filled up. When the network came back, 9,000 messages flooded in simultaneously. Kafka couldn’t keep up, and I lost 4 minutes of data from the reordering window timeout. Lesson: Always set MQTT persistent sessions with file-backed storage, not just in-memory buffers.
Performance Numbers (Because Everyone Asks)
This is all running on:
– Edge clients: Raspberry Pi 4B (4GB RAM, Python 3.11)
– MQTT broker: Mosquitto 2.0.15 on 2-core/4GB VM
– Kafka cluster: 3 nodes, 4-core/8GB each, Kafka 3.3.1
– InfluxDB: 4-core/16GB VM, InfluxDB 2.7.1
– Feature extraction: 8-core/16GB VM, Python 3.11 with numpy 1.24
Throughput:
– Raw MQTT: 12,000 msg/s sustained, 18,000 peak (before broker CPU hits 90%)
– InfluxDB writes: 8,000 points/s sustained with <50ms p99 latency
– Kafka throughput: 25,000 msg/s (wasn’t the bottleneck)
– Feature extraction latency: 8ms per 100-sample window (mostly FFT compute)
Data retention:
– Raw MQTT waveforms: 10-minute S3 Parquet files, 30-day retention (23 TB total)
– InfluxDB features: 90-day retention for 1Hz features (180 GB), 7-day retention for raw downsampled 10Hz (410 GB)
When to Use Which Architecture
If you have fewer than 10 assets, all on the same SCADA vendor, and you’re okay with 1Hz sampling, just use direct InfluxDB writes. The simplicity is worth it. You’ll be up and running in a weekend.
If you need high-frequency data (>10 Hz) from more than 20 assets, or you’re mixing SCADA vendors, go hybrid: MQTT for raw streams, feature extraction in the middle, time-series DB for features only. The complexity pays off when you need to do real-time FFT or envelope analysis.
If you’re doing RUL prediction with deep learning, you’ll want the raw waveforms in S3 or Parquet for training. Don’t try to query InfluxDB for 10,000-sample windows at training time — it’ll timeout. Pre-export to Parquet files in batches.
The one thing I’m still not satisfied with: handling SCADA protocol diversity. Every time we onboard a new line, I have to write a new edge client for whatever OPC UA dialect or Modbus variant they’re using. I’ve been looking at Apache PLC4X for protocol abstraction, but I haven’t tested it at scale yet. If anyone’s done this with more than five different SCADA vendors, I’d love to hear how you handled it.
Did you find this helpful?
☕ Buy me a coffee
Leave a Reply