Edge AI vs Cloud AI: Choosing the Right Architecture for Factories

Updated Feb 6, 2026

When 50ms Feels Like an Eternity

A conveyor belt moves at 2 meters per second. Your vision system spots a defect. By the time the image travels to the cloud, gets processed, and the rejection signal returns, that part has moved 20 centimeters down the line. You just shipped a bad unit.

This isn’t a theoretical problem. I’ve seen factories lose 3-5% yield because their cloud-based QC system couldn’t react fast enough to pneumatic ejectors. The ML model was perfect—96% accuracy, beautifully trained on thousands of labeled defects. The architecture killed it.

So here’s the actual question: when do you need edge AI, and when is cloud perfectly fine?

A white robotic arm operating indoors with a modern design and advanced technology.
Photo by Magda Ehlers on Pexels

The Latency Tax (And When You Can’t Afford It)

Network round-trip time from a factory floor to the nearest AWS region averages 80-150ms. Add inference time (20-50ms for a typical ResNet-50), and you’re looking at 100-200ms total. For quality inspection at 60 items per minute, that’s borderline acceptable. At 300 items per minute? You’re toast.

But latency isn’t just about speed—it’s about predictability. Cloud latency has a long tail. P50 might be 120ms, but P99 could spike to 800ms when someone else’s workload floods the network. On a production line, that P99 case means random missed detections.

Edge AI gives you deterministic latency. A Jetson Orin running YOLOv8 takes 18ms per frame, every frame. No spikes. No network weather.

Here’s what edge inference looks like for defect detection:

import tensorrt as trt
import pycuda.driver as cuda
import numpy as np
import cv2
from collections import deque
import time

class EdgeDefectDetector:
    def __init__(self, engine_path, conf_threshold=0.7):
        # TensorRT engine loaded once at startup
        self.logger = trt.Logger(trt.Logger.WARNING)
        with open(engine_path, 'rb') as f:
            self.engine = trt.Runtime(self.logger).deserialize_cuda_engine(f.read())
        self.context = self.engine.create_execution_context()

        # Pre-allocate GPU memory (this is key for consistent latency)
        self.input_shape = (1, 3, 640, 640)
        self.output_shape = (1, 25200, 85)  # YOLO output: [batch, anchors, 4+1+80]

        self.d_input = cuda.mem_alloc(np.zeros(self.input_shape, dtype=np.float32).nbytes)
        self.d_output = cuda.mem_alloc(np.zeros(self.output_shape, dtype=np.float32).nbytes)

        self.stream = cuda.Stream()
        self.conf_threshold = conf_threshold

        # Track latency distribution (for monitoring)
        self.latency_buffer = deque(maxlen=1000)

    def preprocess(self, img):
        # Resize and normalize (letterbox to preserve aspect ratio)
        h, w = img.shape[:2]
        scale = min(640/h, 640/w)
        nh, nw = int(h*scale), int(w*scale)
        resized = cv2.resize(img, (nw, nh))

        # Pad to square
        canvas = np.full((640, 640, 3), 114, dtype=np.uint8)
        y_offset = (640 - nh) // 2
        x_offset = (640 - nw) // 2
        canvas[y_offset:y_offset+nh, x_offset:x_offset+nw] = resized

        # HWC to CHW, normalize to [0,1]
        blob = canvas.transpose(2, 0, 1).astype(np.float32) / 255.0
        return blob[np.newaxis, ...], scale, (x_offset, y_offset)

    def infer(self, img):
        t0 = time.perf_counter()

        blob, scale, offset = self.preprocess(img)

        # Copy to GPU
        cuda.memcpy_htod_async(self.d_input, blob, self.stream)

        # Run inference
        self.context.execute_async_v2(
            bindings=[int(self.d_input), int(self.d_output)],
            stream_handle=self.stream.handle
        )

        # Copy result back
        output = np.empty(self.output_shape, dtype=np.float32)
        cuda.memcpy_dtoh_async(output, self.d_output, self.stream)
        self.stream.synchronize()

        # Post-process (NMS, coordinate scaling)
        detections = self._postprocess(output[0], scale, offset)

        latency_ms = (time.perf_counter() - t0) * 1000
        self.latency_buffer.append(latency_ms)

        return detections, latency_ms

    def _postprocess(self, output, scale, offset):
        # Apply confidence threshold
        mask = output[:, 4] > self.conf_threshold
        candidates = output[mask]

        if len(candidates) == 0:
            return []

        # Extract boxes, scores, class IDs
        boxes = candidates[:, :4]  # x_center, y_center, w, h
        scores = candidates[:, 4]
        class_probs = candidates[:, 5:]
        class_ids = np.argmax(class_probs, axis=1)

        # Convert to x1, y1, x2, y2 and scale back to original image
        boxes[:, 0] = (boxes[:, 0] - boxes[:, 2]/2 - offset[0]) / scale
        boxes[:, 1] = (boxes[:, 1] - boxes[:, 3]/2 - offset[1]) / scale
        boxes[:, 2] = (boxes[:, 0] + boxes[:, 2]) / scale
        boxes[:, 3] = (boxes[:, 1] + boxes[:, 3]) / scale

        # Simple NMS (you'd use cv2.dnn.NMSBoxes in production)
        keep = self._nms(boxes, scores, iou_threshold=0.45)

        return [(boxes[i], scores[i], class_ids[i]) for i in keep]

    def _nms(self, boxes, scores, iou_threshold):
        # Greedy NMS implementation
        order = scores.argsort()[::-1]
        keep = []

        while len(order) > 0:
            i = order[0]
            keep.append(i)

            if len(order) == 1:
                break

            # Compute IoU with remaining boxes
            ious = self._compute_iou(boxes[i], boxes[order[1:]])

            # Keep only boxes with IoU below threshold
            order = order[1:][ious < iou_threshold]

        return keep

    def _compute_iou(self, box, boxes):
        # box: [x1, y1, x2, y2], boxes: [N, 4]
        x1 = np.maximum(box[0], boxes[:, 0])
        y1 = np.maximum(box[1], boxes[:, 1])
        x2 = np.minimum(box[2], boxes[:, 2])
        y2 = np.minimum(box[3], boxes[:, 3])

        intersection = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1)
        area_box = (box[2] - box[0]) * (box[3] - box[1])
        area_boxes = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])

        return intersection / (area_box + area_boxes - intersection + 1e-6)

    def get_latency_stats(self):
        if len(self.latency_buffer) == 0:
            return {}

        latencies = np.array(self.latency_buffer)
        return {
            'p50': np.percentile(latencies, 50),
            'p95': np.percentile(latencies, 95),
            'p99': np.percentile(latencies, 99),
            'mean': latencies.mean(),
            'std': latencies.std()
        }

On a Jetson Orin Nano (8GB), this runs at 16-18ms per frame consistently. The P99 latency is 19ms—only 1ms worse than median. That’s the edge advantage.

But edge has costs. That Jetson costs 500.Youneedonepercamera.Forafactorywith20inspectionstations,thats500. You need one per camera. For a factory with 20 inspection stations, that's10k in hardware before considering mounting, power, and cooling.

The Economics of Data Transfer (And Why Cloud Still Wins for Training)

Here’s the uncomfortable truth: edge is fast for inference, but terrible for everything else.

Let’s say you’re collecting images for retraining. A single 1920×1080 image at 24-bit color is ~6MB. At 10 fps for an 8-hour shift, that’s 1.7TB per camera per day. Uploading that over a typical factory network (10-50 Mbps uplink) would take 3-15 days. You’d need to physically ship hard drives.

Cloud AI sidesteps this by storing everything in the same region where training happens. Your labeled dataset grows from 10k images to 500k images? Just scale up the EC2 instance. No shipping drives, no local storage headaches.

But here’s where hybrid architectures shine. Run inference on edge. Log predictions and confidence scores (tiny—kilobytes per image). Upload only the uncertain cases and false positives flagged by operators. If your model is 95% confident, you don’t need that image in the cloud. If it’s 60% confident and the operator corrects it, that’s gold for retraining.

import boto3
import json
from datetime import datetime
import threading
import queue

class HybridInferenceLogger:
    """Edge inference with selective cloud upload for retraining."""

    def __init__(self, s3_bucket, uncertainty_threshold=0.85):
        self.s3 = boto3.client('s3')
        self.bucket = s3_bucket
        self.uncertainty_threshold = uncertainty_threshold

        # Background upload queue (don't block inference)
        self.upload_queue = queue.Queue(maxsize=100)
        self.upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
        self.upload_thread.start()

    def log_inference(self, img, detections, metadata):
        """Log inference result, upload to S3 if uncertain."""

        # Find max confidence across all detections
        max_conf = max([d[1] for d in detections], default=0.0)

        # Decide if we need to upload
        should_upload = (
            max_conf < self.uncertainty_threshold or  # Low confidence
            metadata.get('operator_flagged', False) or  # Operator override
            len(detections) == 0  # No detection (possible false negative)
        )

        # Always log metadata locally (cheap)
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'detections': [
                {'bbox': d[0].tolist(), 'conf': float(d[1]), 'class': int(d[2])}
                for d in detections
            ],
            'max_confidence': float(max_conf),
            'uploaded': should_upload,
            **metadata
        }

        # Local JSON log (this shouldn't fail even if S3 is down)
        with open('/var/log/inference.jsonl', 'a') as f:
            f.write(json.dumps(log_entry) + '\n')

        # Upload image to cloud if uncertain (non-blocking)
        if should_upload:
            try:
                self.upload_queue.put_nowait((img, log_entry))
            except queue.Full:
                # Queue full—skip this upload (we're falling behind)
                # This shouldn't happen in practice, but better than blocking
                pass

    def _upload_worker(self):
        """Background thread for S3 uploads."""
        while True:
            img, log_entry = self.upload_queue.get()

            try:
                # Generate unique key
                timestamp = log_entry['timestamp'].replace(':', '-')
                key = f"uncertain/{timestamp}.jpg"

                # Encode image to JPEG
                import cv2
                _, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 90])

                # Upload image and metadata
                self.s3.put_object(
                    Bucket=self.bucket,
                    Key=key,
                    Body=buffer.tobytes(),
                    Metadata={
                        'max_confidence': str(log_entry['max_confidence']),
                        'num_detections': str(len(log_entry['detections'])),
                        'station': log_entry.get('station', 'unknown')
                    }
                )

                # Upload JSON sidecar
                self.s3.put_object(
                    Bucket=self.bucket,
                    Key=key.replace('.jpg', '.json'),
                    Body=json.dumps(log_entry),
                    ContentType='application/json'
                )

            except Exception as e:
                # Log error but don't crash the thread
                print(f"Upload failed: {e}")  # In production, use proper logging

            finally:
                self.upload_queue.task_done()

This approach cuts upload bandwidth by 90-95%. Instead of 1.7TB/day, you’re uploading maybe 50-100GB—manageable over a typical network.

And here’s the clever bit: those uncertain cases are exactly the images you want for active learning. Your next training run focuses on the decision boundary, not the easy cases the model already nails.

Power, Thermal, and Why Data Centers Don’t Have Fans Covered in Metal Dust

Factories are hostile to electronics. I’m not entirely sure why anyone thought putting GPUs next to grinding wheels was a good idea, but here we are.

Edge devices need IP65-rated enclosures (dust and water resistant). A Jetson Orin draws 15-25W under load. That doesn’t sound like much, but in a sealed enclosure on a factory floor at 35°C ambient, you need active cooling or the GPU throttles at 80°C junction temperature. Add a fan, and you’ve introduced a mechanical failure point in a dusty environment. Fan dies, GPU overheats, production stops.

Cloud AI runs in temperature-controlled data centers. No dust. No vibration. No thermal throttling. AWS Inferentia chips run at steady-state 60°C with enterprise-grade cooling. They’ll outlive any edge device in a harsh environment.

But there’s a middle ground: fanless edge AI with older, lower-power models. A Jetson Nano (10W TDP) running MobileNetV2 or EfficientNet-Lite can do defect detection at 30 fps with passive cooling in an IP65 case. You give up some accuracy (maybe 92% instead of 96%), but you gain reliability. In some environments, that’s the right trade.

Model Size and the “Download a 2GB Model Over Factory WiFi” Problem

Cloud AI can serve massive models because storage and RAM are cheap at scale. A 7B parameter LLM for natural language work instructions? Fine—just spin up an instance with 32GB RAM.

Edge devices are constrained. The Jetson Orin Nano has 8GB RAM total, shared between the OS, application, and GPU memory. A full YOLOv8-X model (87M parameters, ~350MB) is fine. A Segment Anything Model (SAM, 638M parameters, ~2.4GB) barely fits, and you can’t run anything else.

Quantization helps. Int8 quantization cuts model size by 4× with minimal accuracy loss (usually <1% drop). Here’s what that looks like with TensorRT:

import tensorrt as trt

def export_int8_trt_engine(onnx_path, engine_path, calibration_data):
    """Convert ONNX model to TensorRT INT8 engine."""

    logger = trt.Logger(trt.Logger.WARNING)
    builder = trt.Builder(logger)
    network = builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
    parser = trt.OnnxParser(network, logger)

    # Parse ONNX model
    with open(onnx_path, 'rb') as f:
        if not parser.parse(f.read()):
            for error in range(parser.num_errors):
                print(parser.get_error(error))
            raise RuntimeError("Failed to parse ONNX")

    # Configure builder
    config = builder.create_builder_config()
    config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 2 << 30)  # 2GB workspace

    # Enable INT8 quantization
    config.set_flag(trt.BuilderFlag.INT8)

    # Provide calibration data (this is key for accuracy)
    calibrator = EntropyCalibrator(calibration_data, batch_size=8)
    config.int8_calibrator = calibrator

    # Build engine
    print("Building TensorRT engine (this takes 5-10 min)...")
    engine = builder.build_serialized_network(network, config)

    # Save to disk
    with open(engine_path, 'wb') as f:
        f.write(engine)

    print(f"Saved INT8 engine to {engine_path}")

class EntropyCalibrator(trt.IInt8EntropyCalibrator2):
    """Calibrator for INT8 quantization using entropy minimization."""

    def __init__(self, calibration_images, batch_size):
        super().__init__()
        self.images = calibration_images  # List of preprocessed numpy arrays
        self.batch_size = batch_size
        self.current_index = 0

        # Allocate GPU memory for calibration batches
        self.device_input = cuda.mem_alloc(self.images[0].nbytes * batch_size)

    def get_batch_size(self):
        return self.batch_size

    def get_batch(self, names):
        if self.current_index + self.batch_size > len(self.images):
            return None  # Done calibrating

        batch = np.stack(self.images[self.current_index:self.current_index + self.batch_size])
        cuda.memcpy_htod(self.device_input, batch)

        self.current_index += self.batch_size
        return [int(self.device_input)]

    def read_calibration_cache(self):
        # No cached calibration
        return None

    def write_calibration_cache(self, cache):
        # Save calibration to disk for future use
        with open('calibration.cache', 'wb') as f:
            f.write(cache)

On the Orin Nano, INT8 quantization of YOLOv8-Large drops inference time from 28ms to 11ms and model size from 350MB to 90MB. Accuracy on our defect dataset went from 94.2% mAP to 93.8% mAP—totally acceptable.

But even quantized models need to be deployed to edge devices. Over-the-air updates sound great until you try pushing a 90MB model to 50 edge devices over factory WiFi. At 10 Mbps (shared across devices), that’s 12 minutes per device if nothing else is using the network. In practice, stagger the rollout or you’ll DOS your own infrastructure.

When Cloud Actually Makes Sense (And It’s More Often Than You’d Think)

Not every factory task needs millisecond latency. Predictive maintenance—the topic we covered in Part 3—is perfectly fine on cloud. You’re predicting failures hours or days in advance. An extra 200ms doesn’t matter.

Same for digital twin simulations (Part 5). Those run on historical data, not real-time streams. You want beefy CPUs and lots of RAM. An AWS r6i.8xlarge instance (32 cores, 256GB RAM) costs $2/hour on-demand. Running that workload on edge hardware would cost tens of thousands in upfront CapEx.

Here’s a quick decision heuristic:

Use Case Latency Requirement Data Volume Architecture
QC defect detection (high-speed line) <50ms Low (inference only) Edge
QC defect detection (manual inspection) <500ms Low Cloud (cheaper)
Predictive maintenance Minutes to hours High (continuous sensor streams) Cloud
Production scheduling optimization Seconds Medium Cloud
Anomaly detection (critical equipment) <100ms Low Edge
Anomaly detection (non-critical) <1s Low Cloud
Digital twin simulation N/A (batch) Very high Cloud
Worker safety monitoring (PPE detection) <200ms Low Edge (liability if offline)

Notice the pattern? Latency-critical + safety-critical = edge. Everything else, default to cloud unless you have a specific reason not to.

And don’t overlook the operational burden. Edge devices need firmware updates, security patches, and physical maintenance. Cloud instances are someone else’s problem. For a factory with 5 edge devices, self-managing is fine. For 500 devices across 10 sites? You need a full-time DevOps person just for edge fleet management.

The Hybrid Sweet Spot: Inference on Edge, Everything Else in Cloud

My recommendation for most smart factory deployments: run inference on edge, run training, monitoring, and analytics in cloud.

Here’s the data flow:

  1. Edge device: Runs TensorRT-optimized model, logs predictions locally, uploads uncertain cases to S3
  2. Cloud (S3): Stores uncertain images + operator corrections
  3. Cloud (Lambda): Triggers when N new uncertain images accumulate, kicks off retraining job
  4. Cloud (SageMaker): Retrains model on expanded dataset (old data + new corrections), exports ONNX
  5. Edge device: Downloads new model over secure channel, hot-swaps in production

This gives you the best of both worlds. Edge handles the latency-critical path. Cloud handles the data-heavy, compute-heavy, and operationally complex tasks.

The tricky part is model versioning. You can’t just push a new model and hope it works. You need A/B testing on edge. Run the new model in shadow mode (inference happens, but results aren’t acted on) for 1000 samples. Compare predictions to the current production model. If accuracy is equal or better and latency is acceptable, promote to production. If not, roll back.

This is where having both models in memory helps (if you have the RAM):

class ABTestingInference:
    """Run two models in parallel for A/B testing on edge."""

    def __init__(self, model_a_path, model_b_path, test_ratio=0.1):
        self.model_a = EdgeDefectDetector(model_a_path)  # Current production
        self.model_b = EdgeDefectDetector(model_b_path)  # Candidate
        self.test_ratio = test_ratio

        # Track metrics for both models
        self.metrics = {'a': [], 'b': []}

    def infer(self, img, ground_truth=None):
        import random

        # Always run model A (production)
        det_a, lat_a = self.model_a.infer(img)

        # Run model B for test_ratio% of traffic
        if random.random() < self.test_ratio:
            det_b, lat_b = self.model_b.infer(img)

            # If we have ground truth (from operator), record accuracy
            if ground_truth is not None:
                self.metrics['a'].append(self._compute_accuracy(det_a, ground_truth))
                self.metrics['b'].append(self._compute_accuracy(det_b, ground_truth))

        # Always use model A for production decisions
        return det_a

    def _compute_accuracy(self, detections, ground_truth):
        # Simplified accuracy: IoU > 0.5 with any ground truth box = TP
        # (Real implementation would be more sophisticated)
        if len(ground_truth) == 0:
            return 1.0 if len(detections) == 0 else 0.0

        # ... IoU computation omitted for brevity
        pass

    def should_promote_model_b(self, min_samples=1000):
        """Decide if model B is better than A."""
        if len(self.metrics['b']) < min_samples:
            return False  # Not enough data yet

        acc_a = np.mean(self.metrics['a'][-min_samples:])
        acc_b = np.mean(self.metrics['b'][-min_samples:])

        # Promote if B is at least as good as A (within 0.5%)
        return acc_b >= acc_a - 0.005

This adds complexity, but it prevents bad models from reaching production. I’ve seen cases where a retrained model looked great in validation (96% → 97%) but failed spectacularly on a new defect type that appeared post-training. Shadow testing caught it before it cost real yield.

What I Still Don’t Know (And What I’m Watching)

I’m not entirely sure how the economics will shake out as edge chips get cheaper. The Hailo-8 (70,26TOPS)andGoogleCoral(70, 26 TOPS) and Google Coral (60, 4 TOPS) are making edge inference absurdly cheap. At some point, it becomes cheaper to throw a 70acceleratorattheproblemthantopay70 accelerator at the problem than to pay50/month for cloud inference.

But cloud providers are responding with cheaper inference options—AWS Inferentia2, Google TPU v5e. The cost gap is narrowing from both sides.

I’m also curious about federated learning for factories. Train local models on each site’s data without uploading raw images. Aggregate model updates in the cloud. The privacy story is compelling (especially for factories that don’t want to upload proprietary product images), but the infrastructure complexity is still high. Most factories I’ve worked with aren’t ready for that.

For now, my stance: start with cloud unless you have a hard latency requirement under 100ms or a data privacy/sovereignty constraint. Cloud is operationally simpler, cheaper to prototype, and easier to scale. Once you’ve proven the value, then consider edge for the latency-critical components. Don’t start with edge just because it sounds cool.

Next up in Part 8: time series forecasting for demand planning. We’ll look at when LSTMs actually beat ARIMA (spoiler: less often than you’d think), how to handle supply chain shocks, and why your forecast accuracy metric is probably lying to you.

Smart Factory with AI Series (7/12)

Did you find this helpful?

☕ Buy me a coffee

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

TODAY 436 | TOTAL 2,659