Sensor Fusion and IoT Integration in Smart Manufacturing

Updated Feb 6, 2026

Why Sensor Fusion Beats Single-Source Monitoring

Most factories I’ve seen still treat sensors as isolated data sources. Temperature from one device, vibration from another, current draw from a third—all logged separately, analyzed in silos. The problem isn’t the data quality. It’s that each sensor only tells you part of the story.

Consider a bearing failure scenario. A vibration sensor might show elevated FFT peaks at 2x rotational frequency, suggesting misalignment. But if you also have temperature data showing a 15°C spike over three hours, plus current draw increasing by 8%, you’re not looking at misalignment anymore—you’re watching lubrication breakdown in real time. Single sensors miss this. Sensor fusion catches it before the bearing seizes.

The gap between theory and practice here is massive. Academic papers talk about Kalman filters and particle filters as if they’re plug-and-play solutions. In reality, you’re dealing with sensors that drift (thermocouples after six months of steam exposure), timestamps that don’t align (one logger samples at 10 Hz, another at 1 Hz with 200ms jitter), and network dropouts that leave you interpolating missing chunks. I’ll show you two approaches: a simple weighted average method that’s surprisingly robust, and a more sophisticated temporal fusion architecture that handles the messiness better but costs you in complexity.

Modern minimalist smart home devices including security camera and sensors in white.
Photo by Jakub Zerdzicki on Pexels

The Naive Approach: Weighted Averaging with Manual Thresholds

Let’s start with what most factories actually implement—manual feature engineering with weighted scores. You define thresholds for each sensor, assign weights based on “expert knowledge” (read: trial and error), and sum them into a health score. Here’s a realistic implementation for a pump monitoring system:

import numpy as np
from dataclasses import dataclass
from typing import Dict, Tuple

@dataclass
class SensorReading:
    timestamp: float
    vibration_rms: float  # mm/s
    temperature: float    # °C
    current: float        # Amps
    pressure: float       # bar

class NaiveFusionMonitor:
    def __init__(self):
        # Baseline values from commissioning (not from spec sheet!)
        self.baseline = {
            'vibration': 2.3,  # measured during first week of operation
            'temperature': 68.5,
            'current': 12.1,
            'pressure': 5.8
        }
        # Weights tuned over 3 months of false alarms
        self.weights = {
            'vibration': 0.35,  # most reliable early indicator
            'temperature': 0.25,
            'current': 0.20,
            'pressure': 0.20
        }
        self.alert_threshold = 0.65  # unitless, empirically chosen

    def compute_health_score(self, reading: SensorReading) -> Tuple[float, Dict[str, float]]:
        # Normalize each sensor relative to baseline (this is where it breaks)
        vib_score = min(reading.vibration_rms / self.baseline['vibration'], 3.0)
        temp_score = max((reading.temperature - self.baseline['temperature']) / 20.0, 0.0)
        current_score = max((reading.current - self.baseline['current']) / 5.0, 0.0)
        pressure_score = max((self.baseline['pressure'] - reading.pressure) / 2.0, 0.0)

        component_scores = {
            'vibration': vib_score,
            'temperature': temp_score,
            'current': current_score,
            'pressure': pressure_score
        }

        # Weighted sum (the "fusion" part)
        health_score = sum(self.weights[k] * component_scores[k] for k in self.weights)

        return health_score, component_scores

    def should_alert(self, reading: SensorReading) -> Tuple[bool, str]:
        score, components = self.compute_health_score(reading)

        if score > self.alert_threshold:
            # Try to explain which sensor triggered it (often wrong)
            dominant = max(components.items(), key=lambda x: x[1])
            return True, f"Health score {score:.2f} (driven by {dominant[0]}: {dominant[1]:.2f})"
        return False, ""

# Simulate realistic sensor stream with correlated drift
np.random.seed(42)
monitor = NaiveFusionMonitor()

for t in range(100):
    # Normal operation for 50 steps, then gradual bearing failure
    if t < 50:
        reading = SensorReading(
            timestamp=t,
            vibration_rms=2.3 + np.random.normal(0, 0.15),
            temperature=68.5 + np.random.normal(0, 1.2),
            current=12.1 + np.random.normal(0, 0.3),
            pressure=5.8 + np.random.normal(0, 0.1)
        )
    else:
        # Correlated degradation (this is key—real failures aren't independent)
        decay_factor = (t - 50) / 50.0
        reading = SensorReading(
            timestamp=t,
            vibration_rms=2.3 + decay_factor * 1.8 + np.random.normal(0, 0.2),
            temperature=68.5 + decay_factor * 12.0 + np.random.normal(0, 1.5),
            current=12.1 + decay_factor * 2.5 + np.random.normal(0, 0.4),
            pressure=5.8 - decay_factor * 0.6 + np.random.normal(0, 0.15)
        )

    alert, msg = monitor.should_alert(reading)
    if alert:
        print(f"t={t}: ALERT — {msg}")
        break

When I run this (Python 3.11, numpy 1.26), it triggers around t=72t=72, which isn’t terrible. But here’s the problem: the threshold and weights are static. If ambient temperature rises in summer (common in non-climate-controlled facilities), you’ll get false positives. If one sensor drifts low due to calibration error, the weighted sum masks a real problem in another channel. And the biggest issue—this approach has no concept of temporal patterns. A gradual rise over 48 hours looks the same as a sudden spike that self-corrects.

Temporal Fusion with Attention (The Better Way)

What we actually want is a model that:
1. Learns correlations between sensors automatically (not hand-tuned weights)
2. Accounts for temporal dynamics (rate of change matters, not just absolute values)
3. Handles missing data gracefully (because sensors fail)
4. Provides uncertainty estimates (so you know when it’s guessing)

This is where something like a Temporal Fusion Transformer (Lim et al., 2021) or even a simpler LSTM-based fusion network shines. I’m going to show you a stripped-down version using PyTorch that captures the key ideas without the full TFT complexity.

import torch
import torch.nn as nn
import torch.nn.functional as F

class SensorFusionLSTM(nn.Module):
    def __init__(self, n_sensors=4, hidden_dim=64, n_layers=2, dropout=0.2):
        super().__init__()
        self.n_sensors = n_sensors
        self.hidden_dim = hidden_dim

        # Separate embedding for each sensor (handles scale differences)
        self.sensor_encoders = nn.ModuleList([
            nn.Linear(1, 16) for _ in range(n_sensors)
        ])

        # LSTM processes concatenated sensor embeddings over time
        self.lstm = nn.LSTM(
            input_size=n_sensors * 16,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            batch_first=True,
            dropout=dropout if n_layers > 1 else 0
        )

        # Attention over sensors (learns which sensors matter when)
        self.sensor_attention = nn.Sequential(
            nn.Linear(hidden_dim, n_sensors),
            nn.Softmax(dim=-1)
        )

        # Health score predictor (outputs mean and log variance for uncertainty)
        self.predictor = nn.Sequential(
            nn.Linear(hidden_dim, 32),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, 2)  # mean, log_var
        )

    def forward(self, x, return_attention=False):
        # x shape: (batch, seq_len, n_sensors)
        batch_size, seq_len, _ = x.shape

        # Encode each sensor separately
        sensor_embeddings = []
        for i in range(self.n_sensors):
            # Handle potential NaNs (missing sensor data)
            sensor_data = x[:, :, i:i+1]
            mask = ~torch.isnan(sensor_data)
            sensor_data = torch.where(mask, sensor_data, torch.zeros_like(sensor_data))
            emb = self.sensor_encoders[i](sensor_data)
            sensor_embeddings.append(emb)

        # Concatenate embeddings
        concat_emb = torch.cat(sensor_embeddings, dim=-1)

        # LSTM forward pass
        lstm_out, (h_n, c_n) = self.lstm(concat_emb)

        # Use last hidden state for prediction
        last_hidden = lstm_out[:, -1, :]  # (batch, hidden_dim)

        # Compute sensor attention weights
        attn_weights = self.sensor_attention(last_hidden)  # (batch, n_sensors)

        # Predict health score with uncertainty
        pred = self.predictor(last_hidden)  # (batch, 2)
        mean = pred[:, 0]
        log_var = pred[:, 1]

        if return_attention:
            return mean, log_var, attn_weights
        return mean, log_var

# Training loop (simplified—real version needs validation set, early stopping, etc.)
def train_fusion_model(model, train_data, n_epochs=50, lr=1e-3):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
    model.train()

    for epoch in range(n_epochs):
        total_loss = 0.0
        for batch_x, batch_y in train_data:  # batch_y: ground truth health (0=normal, 1=failure)
            optimizer.zero_grad()

            mean, log_var = model(batch_x)

            # Negative log-likelihood loss with learned uncertainty
            # (this is essentially a heteroscedastic regression loss)
            precision = torch.exp(-log_var)
            loss = 0.5 * (precision * (mean - batch_y)**2 + log_var).mean()

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # important for LSTM stability
            optimizer.step()

            total_loss += loss.item()

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{n_epochs}, Loss: {total_loss/len(train_data):.4f}")

# Inference with uncertainty quantification
def predict_with_uncertainty(model, sensor_window, n_samples=20):
    model.eval()
    with torch.no_grad():
        mean, log_var, attn = model(sensor_window.unsqueeze(0), return_attention=True)
        std = torch.exp(0.5 * log_var)

        # Monte Carlo dropout for epistemic uncertainty (optional but recommended)
        model.train()  # enable dropout
        mc_preds = []
        for _ in range(n_samples):
            mc_mean, _ = model(sensor_window.unsqueeze(0))
            mc_preds.append(mc_mean.item())
        model.eval()

        epistemic_std = np.std(mc_preds)
        aleatoric_std = std.item()

        return mean.item(), aleatoric_std, epistemic_std, attn.squeeze(0).cpu().numpy()

This model learns temporal patterns automatically. If temperature always rises 2-3 hours before vibration spikes (thermal expansion causing misalignment), the LSTM captures that sequence. The attention mechanism tells you which sensors the model is focusing on for each prediction—something I’ve found invaluable when explaining alerts to maintenance teams. And the uncertainty estimates (aleatoric from data noise, epistemic from model uncertainty) let you distinguish between “this definitely looks bad” and “I’m seeing something unusual but I’m not confident.”

But here’s where theory meets reality: you need labeled training data. Specifically, you need historical sensor windows with ground truth labels (0 for normal, 1 for failure within next N hours). Most factories don’t have this. They have CMMS logs that say “bearing replaced on 2024-03-15” but no precise timestamp of when the failure actually started. You end up hand-labeling retrospectively, which introduces bias. I’m not entirely sure how to solve this cleanly without a few failure cycles to calibrate on.

IoT Integration: Protocol Hell and Edge Processing

Now let’s talk about getting sensor data into your model in the first place. Every factory I’ve worked with has a Frankenstein’s monster of protocols: Modbus RTU for legacy PLCs, OPC UA for newer equipment, MQTT for IoT sensors, and inevitably some proprietary garbage that only works with a Windows XP machine running vendor software from 2009.

The standard IoT architecture looks like this:

SensorsModbus/OPCEdge GatewayMQTT/HTTPCloud/On-Prem ServerWebSocketDashboard\text{Sensors} \xrightarrow{\text{Modbus/OPC}} \text{Edge Gateway} \xrightarrow{\text{MQTT/HTTP}} \text{Cloud/On-Prem Server} \xrightarrow{\text{WebSocket}} \text{Dashboard}

But the edge gateway is where things get interesting for sensor fusion. You have two options:

  1. Dumb gateway: Just forward raw sensor data to the cloud, do all processing there
  2. Smart gateway: Run lightweight inference on the edge, only send alerts + aggregated stats

For sensor fusion specifically, smart gateways win. Here’s why: if you’re doing temporal fusion with a 60-second sliding window and 10 Hz sensor sampling, that’s 600 datapoints per sensor per inference pass. Sending all that over cellular/WiFi introduces latency and costs bandwidth. Better to run inference locally and send a health score + attention weights every minute.

Here’s a realistic edge deployment using ONNX Runtime (because you’re not fitting PyTorch on a Raspberry Pi with 512MB RAM):

import onnxruntime as ort
import numpy as np
from collections import deque
import time

class EdgeSensorFusion:
    def __init__(self, model_path, window_size=60, sample_rate=10):
        # Load ONNX model (exported from PyTorch via torch.onnx.export)
        self.session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
        self.window_size = window_size
        self.sample_rate = sample_rate

        # Rolling buffers for each sensor (fixed size for memory efficiency)
        self.buffers = {
            'vibration': deque(maxlen=window_size * sample_rate),
            'temperature': deque(maxlen=window_size * sample_rate),
            'current': deque(maxlen=window_size * sample_rate),
            'pressure': deque(maxlen=window_size * sample_rate)
        }

        self.last_inference = 0
        self.inference_interval = 5.0  # run model every 5 seconds

    def add_reading(self, sensor_name, value):
        # Validation (real sensors occasionally send out-of-range garbage)
        if sensor_name == 'vibration' and (value < 0 or value > 50):
            value = np.nan  # mark as invalid rather than accepting bad data
        elif sensor_name == 'temperature' and (value < -20 or value > 150):
            value = np.nan
        # (add similar checks for current and pressure)

        self.buffers[sensor_name].append(value)

    def run_inference(self):
        now = time.time()
        if now - self.last_inference < self.inference_interval:
            return None

        # Check if buffers are full enough (need at least 50% data)
        min_samples = (self.window_size * self.sample_rate) // 2
        if any(len(buf) < min_samples for buf in self.buffers.values()):
            return None

        # Downsample to fixed window size (model expects exactly window_size timesteps)
        input_data = np.zeros((1, self.window_size, 4), dtype=np.float32)
        for i, sensor in enumerate(['vibration', 'temperature', 'current', 'pressure']):
            buf_array = np.array(self.buffers[sensor])
            # Simple decimation (better would be antialiased downsampling but edge compute is tight)
            indices = np.linspace(0, len(buf_array)-1, self.window_size, dtype=int)
            input_data[0, :, i] = buf_array[indices]

        # ONNX inference
        input_name = self.session.get_inputs()[0].name
        output = self.session.run(None, {input_name: input_data})
        mean, log_var, attention = output[0][0], output[1][0], output[2][0]

        self.last_inference = now

        return {
            'timestamp': now,
            'health_score': float(mean),
            'uncertainty': float(np.exp(0.5 * log_var)),
            'attention': {sensor: float(attn) for sensor, attn in zip(
                ['vibration', 'temperature', 'current', 'pressure'], attention
            )}
        }

# MQTT publisher for sending results to cloud
import paho.mqtt.client as mqtt

def publish_to_cloud(client, result):
    if result is None:
        return

    # Publish to separate topics for easy filtering
    client.publish('factory/pump01/health', result['health_score'])
    client.publish('factory/pump01/uncertainty', result['uncertainty'])
    client.publish('factory/pump01/attention', str(result['attention']))  # JSON would be better

    # Alert topic if health score exceeds threshold
    if result['health_score'] > 0.7 and result['uncertainty'] < 0.15:  # high confidence alert
        client.publish('factory/alerts', f"Pump01 health critical: {result['health_score']:.2f}")

# Main edge loop (runs continuously on gateway device)
def main():
    fusion = EdgeSensorFusion('sensor_fusion_model.onnx')
    mqtt_client = mqtt.Client()
    mqtt_client.connect('mqtt.factory.local', 1883, 60)
    mqtt_client.loop_start()

    # Connect to Modbus/OPC UA sensors (pseudo-code, real impl depends on hardware)
    # modbus_client = ModbusClient('192.168.1.100', port=502)

    while True:
        # Read sensors (in real system this would be async/callback-based)
        # vibration = modbus_client.read_holding_registers(0, 1)[0] / 100.0
        # (simulate for demo)
        vibration = 2.3 + np.random.normal(0, 0.15)
        temperature = 68.5 + np.random.normal(0, 1.2)
        current = 12.1 + np.random.normal(0, 0.3)
        pressure = 5.8 + np.random.normal(0, 0.1)

        fusion.add_reading('vibration', vibration)
        fusion.add_reading('temperature', temperature)
        fusion.add_reading('current', current)
        fusion.add_reading('pressure', pressure)

        # Run inference if enough time has passed
        result = fusion.run_inference()
        publish_to_cloud(mqtt_client, result)

        time.sleep(0.1)  # 10 Hz sampling

This runs comfortably on a Raspberry Pi 4 (tested with 2GB model, though 4GB is safer). The ONNX runtime is fast enough that inference takes ~8ms with the LSTM model I showed earlier. The key optimization is downsampling the rolling buffer before inference—you don’t need 10 Hz resolution for most mechanical failures, 1 Hz is plenty after you’ve captured the raw data.

One gotcha I hit: ONNX export from PyTorch doesn’t always preserve dynamic batch sizes correctly. You might need to explicitly set torch.onnx.export(..., dynamic_axes={'input': {0: 'batch_size'}}) or just fix batch size to 1 for edge deployment. The error messages are cryptic (“RuntimeError: The expanded size of the tensor (60) must match the existing size (120)”)—took me an embarrassing amount of time to debug.

When to Use Which Approach

The weighted average method isn’t wrong. It’s appropriate when:
– You have stable, well-calibrated sensors (not typical, but happens in newer installations)
– The physics is well-understood and you can define thresholds confidently
– You need explainability for regulatory compliance (“vibration exceeded 4.5 mm/s per ISO 10816”)
– You’re in proof-of-concept phase and don’t have training data yet

Go with the temporal fusion model when:
– Sensor correlations are complex (multi-axis vibration + thermal + electrical)
– You have 6+ months of historical data with some labeled failure events
– False alarm rate matters (the learned model adapts to your specific equipment)
– You can afford the compute overhead (edge or cloud)

And here’s something that surprised me: hybrid approaches work well. Use the naive method as a pre-filter (“only run the expensive model if health score > 0.4”) to save compute. Or use the learned model’s attention weights to refine your manual thresholds iteratively.

The IoT integration side is mostly plumbing—Modbus libraries, MQTT brokers, edge deployment scripts. It’s tedious but not conceptually hard. What’s hard is building a fusion system that maintenance teams actually trust. That means logging every alert, tracking false positive rates by equipment type, and iterating on thresholds/models every quarter as sensors drift and equipment ages.

In Part 11, we’ll tackle explainable AI for factory operations—because a black-box model that says “replace this bearing” without showing its reasoning is a non-starter in most manufacturing environments. I’m particularly curious about how to balance model accuracy with interpretability; the best-performing models (transformers, ensembles) are the least explainable, and I haven’t found a great solution yet that doesn’t involve sacrificing one for the other.

Smart Factory with AI Series (10/12)

Did you find this helpful?

☕ Buy me a coffee

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

TODAY 396 | TOTAL 2,619