Real-Time Optical Flow Estimation with RAFT and FlowFormer: Practical Guide for Video Motion Tracking

Updated Feb 6, 2026

Introduction

Optical flow estimation—the task of computing dense pixel-wise motion vectors between consecutive video frames—is fundamental to numerous computer vision applications including autonomous driving, video stabilization, action recognition, and object tracking. While classical methods like Lucas-Kanade and Horn-Schunck dominated for decades, recent deep learning breakthroughs have revolutionized the field with unprecedented accuracy and speed.

In this comprehensive guide, we’ll explore two state-of-the-art architectures: RAFT (Recurrent All-Pairs Field Transforms) and FlowFormer, examining their theoretical foundations, implementation details, and practical deployment strategies for real-time video motion tracking applications.

Understanding Optical Flow: Mathematical Foundation

Optical flow represents the apparent motion of brightness patterns between two consecutive frames. For a pixel at position (x,y)(x, y) in frame I1I_1 moving to (x+Δx,y+Δy)(x+\Delta x, y+\Delta y) in frame I2I_2, the optical flow vector is (u,v)=(Δx,Δy)(u, v) = (\Delta x, \Delta y).

The classic brightness constancy assumption states:

I1(x,y,t)=I2(x+u,y+v,t+Δt)I_1(x, y, t) = I_2(x + u, y + v, t + \Delta t)

Where:
I1,I2I_1, I_2 = intensity values in consecutive frames
(u,v)(u, v) = horizontal and vertical flow components
Δt\Delta t = time interval between frames

Using Taylor expansion and rearranging terms yields the optical flow constraint equation:

Ixu+Iyv+It=0I_x u + I_y v + I_t = 0

Where IxI_x, IyI_y are spatial gradients and ItI_t is the temporal gradient. This single equation cannot solve for two unknowns (u,v)(u, v)—the famous aperture problem—requiring additional constraints or learning-based approaches.

RAFT: Recurrent All-Pairs Field Transforms

Architecture Overview

RAFT, introduced by Teed and Deng (Princeton University, ECCV 2020), treats optical flow as a sequence prediction problem solved through iterative refinement. The architecture consists of three main components:

Component Function Key Innovation
Feature Encoder Extract per-pixel features from both frames Context-aware convolutional layers
Correlation Layer Compute all-pairs correlations between features 4D correlation pyramid for efficiency
Update Operator Iteratively refine flow estimates Recurrent GRU-based updates

All-Pairs Correlation Volume

RAFT’s breakthrough lies in computing correlations between all pairs of pixels across frames. For feature maps f1,f2RH×W×Cf_1, f_2 \in \mathbb{R}^{H \times W \times C}, the correlation volume is:

C(x1,x2)=1Cc=1Cf1(x1)cf2(x2)c\mathbf{C}(x_1, x_2) = \frac{1}{C} \sum_{c=1}^{C} f_1(x_1)_c \cdot f_2(x_2)_c

This creates a 4D volume CRH×W×H×W\mathbf{C} \in \mathbb{R}^{H \times W \times H \times W}. To manage memory:

  1. Pooling pyramid: Build multiple resolution levels (1, 2, 4, 8 pixels)
  2. Local lookup: For each pixel, only retrieve correlations in a local neighborhood
  3. Grid sampling: Index correlation volume using current flow estimate

Iterative Refinement Process

Starting from zero flow f0=0f^0 = 0, RAFT performs NN iterations (typically 12-32):

import torch
import torch.nn as nn

class RAFTIterativeUpdate(nn.Module):
    def __init__(self, hidden_dim=128, context_dim=128):
        super().__init__()
        self.gru = nn.GRUCell(hidden_dim + context_dim, hidden_dim)
        self.flow_head = nn.Sequential(
            nn.Conv2d(hidden_dim, 256, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 2, 3, padding=1)
        )

    def forward(self, hidden_state, context, correlation_features, flow_current):
        """
        Args:
            hidden_state: GRU hidden state (B, 128, H, W)
            context: Context features from first frame (B, 128, H, W)
            correlation_features: Indexed correlation volume (B, C, H, W)
            flow_current: Current flow estimate (B, 2, H, W)
        Returns:
            flow_delta: Additive flow correction (B, 2, H, W)
            hidden_state: Updated hidden state (B, 128, H, W)
        """
        # Concatenate inputs for GRU
        input_features = torch.cat([correlation_features, flow_current, context], dim=1)

        # Reshape for GRU: (B*H*W, C)
        B, C, H, W = input_features.shape
        input_flat = input_features.permute(0, 2, 3, 1).reshape(B * H * W, C)
        hidden_flat = hidden_state.permute(0, 2, 3, 1).reshape(B * H * W, -1)

        # GRU update
        hidden_flat = self.gru(input_flat, hidden_flat)
        hidden_state = hidden_flat.reshape(B, H, W, -1).permute(0, 3, 1, 2)

        # Predict flow delta (additive correction)
        flow_delta = self.flow_head(hidden_state)

        return flow_delta, hidden_state

The flow at iteration k+1k+1 is:

fk+1=fk+Δfkf^{k+1} = f^k + \Delta f^k

Where Δfk\Delta f^k is the predicted correction from the GRU update operator.

Training Strategy

RAFT supervises all intermediate iterations using the combined loss:

L=i=1NγNififgt1\mathcal{L} = \sum_{i=1}^{N} \gamma^{N-i} \|f^i – f_{gt}\|_1

Where:
fgtf_{gt} = ground truth flow
γ=0.8\gamma = 0.8 = exponential weighting (emphasizes later iterations)
1\|\cdot\|_1 = L1 norm (robust to outliers)

FlowFormer: Transformer-Based Optical Flow

Motivation: Beyond CNN Limitations

While RAFT achieves excellent accuracy, its CNN-based architecture has limitations:

  • Local receptive fields: Struggles with large displacements
  • Fixed correlation patterns: Cannot adapt to different motion types
  • Sequential processing: Iterative updates limit parallelization

FlowFormer (Huang et al., CVPR 2022) addresses these with transformer attention mechanisms that provide:

  1. Global receptive fields for long-range motion
  2. Adaptive cost aggregation through learned attention
  3. Efficient cross-attention between frames

Architecture Components

Module Purpose Implementation
CNN Encoder Extract hierarchical features ResNet-style backbone
Transformer Encoder Enhance features with self-attention 6-layer transformer
Cost Volume 4D correlation (like RAFT) All-pairs inner product
Transformer Decoder Refine flow with cross-attention Queries from cost volume
Flow Head Predict flow + uncertainty Convolutional layers

Latent Cost Tokens and Cross-Attention

FlowFormer’s key innovation is representing the cost volume as latent tokens that undergo cross-attention with image features:

import torch
import torch.nn as nn

class FlowFormerDecoder(nn.Module):
    def __init__(self, num_layers=6, dim=256, num_heads=8):
        super().__init__()
        self.layers = nn.ModuleList([
            TransformerDecoderLayer(dim, num_heads) for _ in range(num_layers)
        ])
        self.flow_head = nn.Conv2d(dim, 2, 1)

    def forward(self, cost_tokens, context_features):
        """
        Args:
            cost_tokens: Latent cost volume tokens (B, N, dim)
                         where N = H*W, each token represents pixel's cost
            context_features: Image context features (B, N, dim)
        Returns:
            flow: Predicted optical flow (B, 2, H, W)
        """
        # Multi-layer cross-attention refinement
        for layer in self.layers:
            cost_tokens = layer(
                query=cost_tokens,
                key=context_features,
                value=context_features
            )

        # Reshape tokens to spatial grid
        B, N, C = cost_tokens.shape
        H = W = int(N ** 0.5)
        cost_spatial = cost_tokens.transpose(1, 2).reshape(B, C, H, W)

        # Predict flow
        flow = self.flow_head(cost_spatial)
        return flow

class TransformerDecoderLayer(nn.Module):
    def __init__(self, dim=256, num_heads=8, mlp_ratio=4.0):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(dim, num_heads, batch_first=True)
        self.cross_attn = nn.MultiheadAttention(dim, num_heads, batch_first=True)
        self.mlp = nn.Sequential(
            nn.Linear(dim, int(dim * mlp_ratio)),
            nn.GELU(),
            nn.Linear(int(dim * mlp_ratio), dim)
        )
        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)
        self.norm3 = nn.LayerNorm(dim)

    def forward(self, query, key, value):
        # Self-attention on cost tokens
        q = query + self.self_attn(self.norm1(query), self.norm1(query), self.norm1(query))[0]

        # Cross-attention with context features
        q = q + self.cross_attn(self.norm2(q), key, value)[0]

        # MLP feedforward
        q = q + self.mlp(self.norm3(q))
        return q

Hierarchical Flow Prediction

FlowFormer uses a coarse-to-fine pyramid:

  1. 1/8 resolution: Initial coarse flow from transformer decoder
  2. 1/4 resolution: Upsample + refine with additional decoder
  3. Full resolution: Final upsampling with convolution refinement

This hierarchy balances accuracy and computational cost, with attention only computed at lower resolutions.

Practical Implementation: Real-Time Video Processing

Setting Up the Environment

# Install dependencies
pip install torch torchvision
pip install opencv-python
pip install einops  # For tensor operations in FlowFormer

# Clone RAFT repository
git clone https://github.com/princeton-vl/RAFT.git
cd RAFT

# Download pretrained weights
wget https://github.com/princeton-vl/RAFT/releases/download/v1.0/raft-things.pth

Loading RAFT for Inference

import torch
import cv2
import numpy as np
from PIL import Image
import sys
sys.path.append('RAFT/core')

from raft import RAFT
from utils.utils import InputPadder

class OpticalFlowEstimator:
    def __init__(self, model_path='raft-things.pth', device='cuda'):
        self.device = device

        # Initialize RAFT model
        args = type('Args', (), {
            'model': model_path,
            'small': False,
            'mixed_precision': True,
            'alternate_corr': False
        })

        self.model = RAFT(args)
        self.model.load_state_dict(torch.load(model_path, map_location=device))
        self.model = self.model.to(device)
        self.model.eval()

    def preprocess_frame(self, frame):
        """
        Convert BGR frame to RGB tensor normalized to [0, 255]

        Args:
            frame: numpy array (H, W, 3) in BGR format
        Returns:
            tensor: (1, 3, H, W) in RGB, range [0, 255]
        """
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        tensor = torch.from_numpy(frame_rgb).permute(2, 0, 1).float()
        return tensor[None].to(self.device)

    def compute_flow(self, frame1, frame2, iters=20):
        """
        Compute optical flow between two frames

        Args:
            frame1, frame2: numpy arrays (H, W, 3) BGR
            iters: number of refinement iterations
        Returns:
            flow: numpy array (H, W, 2) with (u, v) motion vectors
        """
        # Preprocess
        img1 = self.preprocess_frame(frame1)
        img2 = self.preprocess_frame(frame2)

        # Pad to multiple of 8
        padder = InputPadder(img1.shape)
        img1, img2 = padder.pad(img1, img2)

        # Inference
        with torch.no_grad():
            _, flow = self.model(img1, img2, iters=iters, test_mode=True)

        # Unpad and convert to numpy
        flow = padder.unpad(flow[0]).cpu().numpy().transpose(1, 2, 0)
        return flow

Real-Time Video Processing Pipeline

import time

class VideoMotionTracker:
    def __init__(self, video_path, output_path=None, visualize=True):
        self.cap = cv2.VideoCapture(video_path)
        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        self.flow_estimator = OpticalFlowEstimator()
        self.visualize = visualize

        if output_path:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            self.writer = cv2.VideoWriter(output_path, fourcc, self.fps, 
                                         (self.width, self.height))
        else:
            self.writer = None

    def flow_to_color(self, flow):
        """
        Convert flow to HSV color visualization
        Hue = motion direction, Saturation = motion magnitude
        """
        magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])

        hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.uint8)
        hsv[..., 0] = angle * 180 / np.pi / 2  # Hue: direction
        hsv[..., 1] = 255  # Full saturation
        hsv[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)  # Value: magnitude

        rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
        return rgb

    def process_video(self):
        ret, prev_frame = self.cap.read()
        if not ret:
            print("Cannot read video")
            return

        frame_count = 0
        total_time = 0

        while True:
            ret, curr_frame = self.cap.read()
            if not ret:
                break

            # Compute optical flow
            start_time = time.time()
            flow = self.flow_estimator.compute_flow(prev_frame, curr_frame, iters=12)
            elapsed = time.time() - start_time
            total_time += elapsed
            frame_count += 1

            if self.visualize:
                # Visualize flow as color map
                flow_color = self.flow_to_color(flow)

                # Blend with original frame
                overlay = cv2.addWeighted(curr_frame, 0.5, flow_color, 0.5, 0)

                # Add FPS counter
                fps_text = f"FPS: {1.0 / elapsed:.1f}"
                cv2.putText(overlay, fps_text, (10, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

                cv2.imshow('Optical Flow', overlay)

                if self.writer:
                    self.writer.write(overlay)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            prev_frame = curr_frame

        avg_fps = frame_count / total_time if total_time > 0 else 0
        print(f"Processed {frame_count} frames at {avg_fps:.2f} FPS")

        self.cap.release()
        if self.writer:
            self.writer.release()
        cv2.destroyAllWindows()

# Usage
tracker = VideoMotionTracker('input_video.mp4', output_path='output_flow.mp4')
tracker.process_video()

Performance Comparison: RAFT vs FlowFormer

Benchmark Results on Sintel and KITTI Datasets

Model Sintel Clean (EPE) Sintel Final (EPE) KITTI (F1-all) FPS (RTX 3090)
PWC-Net 2.55 3.93 9.60% 35
RAFT 1.43 2.71 5.04% 25
RAFT (small) 2.14 3.18 7.65% 60
FlowFormer 1.01 2.40 4.09% 12
FlowFormer++ 0.85 2.19 3.69% 8

Key metrics:
EPE (End-Point Error): Average Euclidean distance between predicted and ground truth flow (lower = better)
F1-all: Percentage of outlier pixels (error > 3px or > 5%) on KITTI (lower = better)
FPS: Frames per second on 1024×436 resolution

When to Use Each Model

Choose RAFT if:
– Real-time processing is critical (≥25 FPS)
– Edge deployment with limited GPU memory
– Balanced accuracy and speed for general applications
– Iterative refinement aligns with your application (e.g., tracking)

Choose FlowFormer if:
– Maximum accuracy is paramount (autonomous vehicles, medical imaging)
– Large displacements are common (fast camera motion)
– GPU resources are abundant
– Offline processing is acceptable

Practical recommendation: Start with RAFT for prototyping due to simpler setup and faster inference. Switch to FlowFormer when accuracy requirements increase or when processing is non-real-time.

Advanced Applications: Motion-Based Object Tracking

Using Flow Magnitude for Moving Object Detection

import numpy as np
from scipy import ndimage

class MotionDetector:
    def __init__(self, magnitude_threshold=2.0, min_area=500):
        self.threshold = magnitude_threshold
        self.min_area = min_area

    def detect_moving_objects(self, flow):
        """
        Detect moving objects from optical flow

        Args:
            flow: (H, W, 2) optical flow array
        Returns:
            bboxes: list of (x, y, w, h) bounding boxes
            motion_mask: (H, W) binary mask of moving regions
        """
        # Compute flow magnitude
        magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)

        # Threshold to get motion mask
        motion_mask = (magnitude > self.threshold).astype(np.uint8)

        # Morphological operations to clean up noise
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        motion_mask = cv2.morphologyEx(motion_mask, cv2.MORPH_CLOSE, kernel)
        motion_mask = cv2.morphologyEx(motion_mask, cv2.MORPH_OPEN, kernel)

        # Find connected components
        labeled, num_features = ndimage.label(motion_mask)

        # Extract bounding boxes for each component
        bboxes = []
        for i in range(1, num_features + 1):
            component_mask = (labeled == i)
            area = np.sum(component_mask)

            if area < self.min_area:
                continue

            # Find bounding box
            coords = np.argwhere(component_mask)
            y_min, x_min = coords.min(axis=0)
            y_max, x_max = coords.max(axis=0)

            bboxes.append((int(x_min), int(y_min), 
                          int(x_max - x_min), int(y_max - y_min)))

        return bboxes, motion_mask

    def visualize_detections(self, frame, bboxes, flow):
        """
        Draw bounding boxes and motion vectors on frame
        """
        result = frame.copy()

        for (x, y, w, h) in bboxes:
            # Draw bounding box
            cv2.rectangle(result, (x, y), (x+w, y+h), (0, 255, 0), 2)

            # Draw average motion vector for this region
            roi_flow = flow[y:y+h, x:x+w]
            avg_flow = roi_flow.mean(axis=(0, 1))

            center = (x + w//2, y + h//2)
            end_point = (int(center[0] + avg_flow[0] * 5), 
                        int(center[1] + avg_flow[1] * 5))
            cv2.arrowedLine(result, center, end_point, (255, 0, 0), 2)

        return result

# Integration with video tracker
detector = MotionDetector(magnitude_threshold=3.0, min_area=800)

ret, prev_frame = cap.read()
while True:
    ret, curr_frame = cap.read()
    if not ret:
        break

    flow = flow_estimator.compute_flow(prev_frame, curr_frame)
    bboxes, motion_mask = detector.detect_moving_objects(flow)
    result = detector.visualize_detections(curr_frame, bboxes, flow)

    cv2.imshow('Motion Detection', result)
    prev_frame = curr_frame

Optimization Strategies for Edge Deployment

Model Quantization with ONNX

import torch.onnx
import onnxruntime as ort

class ONNXFlowEstimator:
    def __init__(self, onnx_path):
        # Load ONNX model with optimized runtime
        sess_options = ort.SessionOptions()
        sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL

        self.session = ort.InferenceSession(
            onnx_path, 
            sess_options,
            providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
        )

        self.input_name = self.session.get_inputs()[0].name

    def compute_flow(self, frame1, frame2):
        # Prepare input (assuming preprocessing done)
        input_tensor = np.concatenate([frame1, frame2], axis=1)

        # Run inference
        flow = self.session.run(None, {self.input_name: input_tensor})[0]
        return flow

# Export PyTorch RAFT model to ONNX
def export_raft_to_onnx(model, output_path='raft.onnx'):
    model.eval()
    dummy_input1 = torch.randn(1, 3, 480, 640).cuda()
    dummy_input2 = torch.randn(1, 3, 480, 640).cuda()

    torch.onnx.export(
        model,
        (dummy_input1, dummy_input2),
        output_path,
        export_params=True,
        opset_version=14,
        do_constant_folding=True,
        input_names=['frame1', 'frame2'],
        output_names=['flow'],
        dynamic_axes={
            'frame1': {0: 'batch', 2: 'height', 3: 'width'},
            'frame2': {0: 'batch', 2: 'height', 3: 'width'},
            'flow': {0: 'batch', 2: 'height', 3: 'width'}
        }
    )

Resolution Scaling Trade-offs

Input Resolution RAFT FPS FlowFormer FPS Accuracy Loss (EPE)
1024×436 (full) 25 12 0% (baseline)
768×328 (0.75×) 45 22 +0.15
512×218 (0.5×) 95 48 +0.35
384×164 (0.375×) 165 82 +0.68

Strategy: Process at 0.5× resolution for real-time applications (>60 FPS), then upsample flow using bilinear interpolation scaled by the resolution factor.

Common Pitfalls and Solutions

Issue 1: Padding Artifacts at Boundaries

Problem: Models require input dimensions divisible by 8, causing edge artifacts after unpadding.

Solution: Use reflection padding instead of zero padding:

def pad_image_reflect(image, divisor=8):
    h, w = image.shape[-2:]
    pad_h = (divisor - h % divisor) % divisor
    pad_w = (divisor - w % divisor) % divisor

    # Apply reflection padding
    padded = torch.nn.functional.pad(
        image, 
        (0, pad_w, 0, pad_h), 
        mode='reflect'
    )
    return padded, (pad_h, pad_w)

Issue 2: Temporal Flickering in Flow Estimates

Problem: Frame-by-frame processing causes inconsistent flow, especially in static regions.

Solution: Apply temporal smoothing with exponential moving average:

class TemporalFlowSmoother:
    def __init__(self, alpha=0.7):
        self.alpha = alpha
        self.prev_flow = None

    def smooth(self, flow_current):
        if self.prev_flow is None:
            self.prev_flow = flow_current
            return flow_current

        # Exponential moving average
        flow_smoothed = self.alpha * flow_current + (1 - self.alpha) * self.prev_flow
        self.prev_flow = flow_smoothed
        return flow_smoothed

Issue 3: Out-of-Memory Errors on High-Resolution Video

Solution: Use gradient checkpointing and mixed precision:

import torch.cuda.amp as amp

class MemoryEfficientFlowEstimator:
    def __init__(self, model):
        self.model = model
        self.scaler = amp.GradScaler()

    @torch.no_grad()
    def compute_flow(self, frame1, frame2):
        with amp.autocast():
            flow = self.model(frame1, frame2)
        return flow.float()  # Convert back to FP32 for downstream tasks

Conclusion

Optical flow estimation has evolved from classical variational methods to sophisticated deep learning architectures like RAFT and FlowFormer, enabling accurate real-time motion tracking across diverse applications.

Key takeaways:

  1. RAFT provides the best balance of speed (25-60 FPS) and accuracy through iterative refinement with all-pairs correlations, ideal for real-time systems
  2. FlowFormer achieves state-of-the-art accuracy using transformer attention mechanisms but trades off inference speed (8-12 FPS)
  3. Implementation considerations: Always use reflection padding, apply temporal smoothing for video, and leverage ONNX/quantization for edge deployment
  4. Application-specific tuning: Adjust iteration count (RAFT) or resolution scaling based on speed/accuracy requirements
  5. Motion detection: Flow magnitude thresholding + connected components analysis enables effective moving object detection without explicit tracking

For production systems, start prototyping with RAFT-small for rapid iteration, then optimize with full RAFT or FlowFormer based on performance profiling. The combination of optical flow with modern object detectors (YOLO, etc.) creates powerful multi-modal tracking systems that excel in challenging scenarios like occlusions and fast motion.

The field continues advancing with hybrid architectures (GMFlow, FlowFormer++) and self-supervised training methods, promising even better performance without expensive ground truth annotation. Whether building autonomous navigation systems or creative video effects tools, mastering these optical flow techniques provides a solid foundation for sophisticated motion understanding.

Did you find this helpful?

☕ Buy me a coffee

Comments

  1. Thank you😁

    1. Thank you for visit here

Leave a Reply

Your email address will not be published. Required fields are marked *

TODAY 474 | TOTAL 2,697