Introduction
Optical flow estimation—the task of computing dense pixel-wise motion vectors between consecutive video frames—is fundamental to numerous computer vision applications including autonomous driving, video stabilization, action recognition, and object tracking. While classical methods like Lucas-Kanade and Horn-Schunck dominated for decades, recent deep learning breakthroughs have revolutionized the field with unprecedented accuracy and speed.
In this comprehensive guide, we’ll explore two state-of-the-art architectures: RAFT (Recurrent All-Pairs Field Transforms) and FlowFormer, examining their theoretical foundations, implementation details, and practical deployment strategies for real-time video motion tracking applications.
Understanding Optical Flow: Mathematical Foundation
Optical flow represents the apparent motion of brightness patterns between two consecutive frames. For a pixel at position in frame moving to in frame , the optical flow vector is .
The classic brightness constancy assumption states:
Where:
– = intensity values in consecutive frames
– = horizontal and vertical flow components
– = time interval between frames
Using Taylor expansion and rearranging terms yields the optical flow constraint equation:
Where , are spatial gradients and is the temporal gradient. This single equation cannot solve for two unknowns —the famous aperture problem—requiring additional constraints or learning-based approaches.
RAFT: Recurrent All-Pairs Field Transforms
Architecture Overview
RAFT, introduced by Teed and Deng (Princeton University, ECCV 2020), treats optical flow as a sequence prediction problem solved through iterative refinement. The architecture consists of three main components:
| Component | Function | Key Innovation |
|---|---|---|
| Feature Encoder | Extract per-pixel features from both frames | Context-aware convolutional layers |
| Correlation Layer | Compute all-pairs correlations between features | 4D correlation pyramid for efficiency |
| Update Operator | Iteratively refine flow estimates | Recurrent GRU-based updates |
All-Pairs Correlation Volume
RAFT’s breakthrough lies in computing correlations between all pairs of pixels across frames. For feature maps , the correlation volume is:
This creates a 4D volume . To manage memory:
- Pooling pyramid: Build multiple resolution levels (1, 2, 4, 8 pixels)
- Local lookup: For each pixel, only retrieve correlations in a local neighborhood
- Grid sampling: Index correlation volume using current flow estimate
Iterative Refinement Process
Starting from zero flow , RAFT performs iterations (typically 12-32):
import torch
import torch.nn as nn
class RAFTIterativeUpdate(nn.Module):
def __init__(self, hidden_dim=128, context_dim=128):
super().__init__()
self.gru = nn.GRUCell(hidden_dim + context_dim, hidden_dim)
self.flow_head = nn.Sequential(
nn.Conv2d(hidden_dim, 256, 3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 2, 3, padding=1)
)
def forward(self, hidden_state, context, correlation_features, flow_current):
"""
Args:
hidden_state: GRU hidden state (B, 128, H, W)
context: Context features from first frame (B, 128, H, W)
correlation_features: Indexed correlation volume (B, C, H, W)
flow_current: Current flow estimate (B, 2, H, W)
Returns:
flow_delta: Additive flow correction (B, 2, H, W)
hidden_state: Updated hidden state (B, 128, H, W)
"""
# Concatenate inputs for GRU
input_features = torch.cat([correlation_features, flow_current, context], dim=1)
# Reshape for GRU: (B*H*W, C)
B, C, H, W = input_features.shape
input_flat = input_features.permute(0, 2, 3, 1).reshape(B * H * W, C)
hidden_flat = hidden_state.permute(0, 2, 3, 1).reshape(B * H * W, -1)
# GRU update
hidden_flat = self.gru(input_flat, hidden_flat)
hidden_state = hidden_flat.reshape(B, H, W, -1).permute(0, 3, 1, 2)
# Predict flow delta (additive correction)
flow_delta = self.flow_head(hidden_state)
return flow_delta, hidden_state
The flow at iteration is:
Where is the predicted correction from the GRU update operator.
Training Strategy
RAFT supervises all intermediate iterations using the combined loss:
Where:
– = ground truth flow
– = exponential weighting (emphasizes later iterations)
– = L1 norm (robust to outliers)
FlowFormer: Transformer-Based Optical Flow
Motivation: Beyond CNN Limitations
While RAFT achieves excellent accuracy, its CNN-based architecture has limitations:
- Local receptive fields: Struggles with large displacements
- Fixed correlation patterns: Cannot adapt to different motion types
- Sequential processing: Iterative updates limit parallelization
FlowFormer (Huang et al., CVPR 2022) addresses these with transformer attention mechanisms that provide:
- Global receptive fields for long-range motion
- Adaptive cost aggregation through learned attention
- Efficient cross-attention between frames
Architecture Components
| Module | Purpose | Implementation |
|---|---|---|
| CNN Encoder | Extract hierarchical features | ResNet-style backbone |
| Transformer Encoder | Enhance features with self-attention | 6-layer transformer |
| Cost Volume | 4D correlation (like RAFT) | All-pairs inner product |
| Transformer Decoder | Refine flow with cross-attention | Queries from cost volume |
| Flow Head | Predict flow + uncertainty | Convolutional layers |
Latent Cost Tokens and Cross-Attention
FlowFormer’s key innovation is representing the cost volume as latent tokens that undergo cross-attention with image features:
import torch
import torch.nn as nn
class FlowFormerDecoder(nn.Module):
def __init__(self, num_layers=6, dim=256, num_heads=8):
super().__init__()
self.layers = nn.ModuleList([
TransformerDecoderLayer(dim, num_heads) for _ in range(num_layers)
])
self.flow_head = nn.Conv2d(dim, 2, 1)
def forward(self, cost_tokens, context_features):
"""
Args:
cost_tokens: Latent cost volume tokens (B, N, dim)
where N = H*W, each token represents pixel's cost
context_features: Image context features (B, N, dim)
Returns:
flow: Predicted optical flow (B, 2, H, W)
"""
# Multi-layer cross-attention refinement
for layer in self.layers:
cost_tokens = layer(
query=cost_tokens,
key=context_features,
value=context_features
)
# Reshape tokens to spatial grid
B, N, C = cost_tokens.shape
H = W = int(N ** 0.5)
cost_spatial = cost_tokens.transpose(1, 2).reshape(B, C, H, W)
# Predict flow
flow = self.flow_head(cost_spatial)
return flow
class TransformerDecoderLayer(nn.Module):
def __init__(self, dim=256, num_heads=8, mlp_ratio=4.0):
super().__init__()
self.self_attn = nn.MultiheadAttention(dim, num_heads, batch_first=True)
self.cross_attn = nn.MultiheadAttention(dim, num_heads, batch_first=True)
self.mlp = nn.Sequential(
nn.Linear(dim, int(dim * mlp_ratio)),
nn.GELU(),
nn.Linear(int(dim * mlp_ratio), dim)
)
self.norm1 = nn.LayerNorm(dim)
self.norm2 = nn.LayerNorm(dim)
self.norm3 = nn.LayerNorm(dim)
def forward(self, query, key, value):
# Self-attention on cost tokens
q = query + self.self_attn(self.norm1(query), self.norm1(query), self.norm1(query))[0]
# Cross-attention with context features
q = q + self.cross_attn(self.norm2(q), key, value)[0]
# MLP feedforward
q = q + self.mlp(self.norm3(q))
return q
Hierarchical Flow Prediction
FlowFormer uses a coarse-to-fine pyramid:
- 1/8 resolution: Initial coarse flow from transformer decoder
- 1/4 resolution: Upsample + refine with additional decoder
- Full resolution: Final upsampling with convolution refinement
This hierarchy balances accuracy and computational cost, with attention only computed at lower resolutions.
Practical Implementation: Real-Time Video Processing
Setting Up the Environment
# Install dependencies
pip install torch torchvision
pip install opencv-python
pip install einops # For tensor operations in FlowFormer
# Clone RAFT repository
git clone https://github.com/princeton-vl/RAFT.git
cd RAFT
# Download pretrained weights
wget https://github.com/princeton-vl/RAFT/releases/download/v1.0/raft-things.pth
Loading RAFT for Inference
import torch
import cv2
import numpy as np
from PIL import Image
import sys
sys.path.append('RAFT/core')
from raft import RAFT
from utils.utils import InputPadder
class OpticalFlowEstimator:
def __init__(self, model_path='raft-things.pth', device='cuda'):
self.device = device
# Initialize RAFT model
args = type('Args', (), {
'model': model_path,
'small': False,
'mixed_precision': True,
'alternate_corr': False
})
self.model = RAFT(args)
self.model.load_state_dict(torch.load(model_path, map_location=device))
self.model = self.model.to(device)
self.model.eval()
def preprocess_frame(self, frame):
"""
Convert BGR frame to RGB tensor normalized to [0, 255]
Args:
frame: numpy array (H, W, 3) in BGR format
Returns:
tensor: (1, 3, H, W) in RGB, range [0, 255]
"""
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(frame_rgb).permute(2, 0, 1).float()
return tensor[None].to(self.device)
def compute_flow(self, frame1, frame2, iters=20):
"""
Compute optical flow between two frames
Args:
frame1, frame2: numpy arrays (H, W, 3) BGR
iters: number of refinement iterations
Returns:
flow: numpy array (H, W, 2) with (u, v) motion vectors
"""
# Preprocess
img1 = self.preprocess_frame(frame1)
img2 = self.preprocess_frame(frame2)
# Pad to multiple of 8
padder = InputPadder(img1.shape)
img1, img2 = padder.pad(img1, img2)
# Inference
with torch.no_grad():
_, flow = self.model(img1, img2, iters=iters, test_mode=True)
# Unpad and convert to numpy
flow = padder.unpad(flow[0]).cpu().numpy().transpose(1, 2, 0)
return flow
Real-Time Video Processing Pipeline
import time
class VideoMotionTracker:
def __init__(self, video_path, output_path=None, visualize=True):
self.cap = cv2.VideoCapture(video_path)
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.flow_estimator = OpticalFlowEstimator()
self.visualize = visualize
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.writer = cv2.VideoWriter(output_path, fourcc, self.fps,
(self.width, self.height))
else:
self.writer = None
def flow_to_color(self, flow):
"""
Convert flow to HSV color visualization
Hue = motion direction, Saturation = motion magnitude
"""
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.uint8)
hsv[..., 0] = angle * 180 / np.pi / 2 # Hue: direction
hsv[..., 1] = 255 # Full saturation
hsv[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX) # Value: magnitude
rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
return rgb
def process_video(self):
ret, prev_frame = self.cap.read()
if not ret:
print("Cannot read video")
return
frame_count = 0
total_time = 0
while True:
ret, curr_frame = self.cap.read()
if not ret:
break
# Compute optical flow
start_time = time.time()
flow = self.flow_estimator.compute_flow(prev_frame, curr_frame, iters=12)
elapsed = time.time() - start_time
total_time += elapsed
frame_count += 1
if self.visualize:
# Visualize flow as color map
flow_color = self.flow_to_color(flow)
# Blend with original frame
overlay = cv2.addWeighted(curr_frame, 0.5, flow_color, 0.5, 0)
# Add FPS counter
fps_text = f"FPS: {1.0 / elapsed:.1f}"
cv2.putText(overlay, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('Optical Flow', overlay)
if self.writer:
self.writer.write(overlay)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
prev_frame = curr_frame
avg_fps = frame_count / total_time if total_time > 0 else 0
print(f"Processed {frame_count} frames at {avg_fps:.2f} FPS")
self.cap.release()
if self.writer:
self.writer.release()
cv2.destroyAllWindows()
# Usage
tracker = VideoMotionTracker('input_video.mp4', output_path='output_flow.mp4')
tracker.process_video()
Performance Comparison: RAFT vs FlowFormer
Benchmark Results on Sintel and KITTI Datasets
| Model | Sintel Clean (EPE) | Sintel Final (EPE) | KITTI (F1-all) | FPS (RTX 3090) |
|---|---|---|---|---|
| PWC-Net | 2.55 | 3.93 | 9.60% | 35 |
| RAFT | 1.43 | 2.71 | 5.04% | 25 |
| RAFT (small) | 2.14 | 3.18 | 7.65% | 60 |
| FlowFormer | 1.01 | 2.40 | 4.09% | 12 |
| FlowFormer++ | 0.85 | 2.19 | 3.69% | 8 |
Key metrics:
– EPE (End-Point Error): Average Euclidean distance between predicted and ground truth flow (lower = better)
– F1-all: Percentage of outlier pixels (error > 3px or > 5%) on KITTI (lower = better)
– FPS: Frames per second on 1024×436 resolution
When to Use Each Model
Choose RAFT if:
– Real-time processing is critical (≥25 FPS)
– Edge deployment with limited GPU memory
– Balanced accuracy and speed for general applications
– Iterative refinement aligns with your application (e.g., tracking)
Choose FlowFormer if:
– Maximum accuracy is paramount (autonomous vehicles, medical imaging)
– Large displacements are common (fast camera motion)
– GPU resources are abundant
– Offline processing is acceptable
Practical recommendation: Start with RAFT for prototyping due to simpler setup and faster inference. Switch to FlowFormer when accuracy requirements increase or when processing is non-real-time.
Advanced Applications: Motion-Based Object Tracking
Using Flow Magnitude for Moving Object Detection
import numpy as np
from scipy import ndimage
class MotionDetector:
def __init__(self, magnitude_threshold=2.0, min_area=500):
self.threshold = magnitude_threshold
self.min_area = min_area
def detect_moving_objects(self, flow):
"""
Detect moving objects from optical flow
Args:
flow: (H, W, 2) optical flow array
Returns:
bboxes: list of (x, y, w, h) bounding boxes
motion_mask: (H, W) binary mask of moving regions
"""
# Compute flow magnitude
magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
# Threshold to get motion mask
motion_mask = (magnitude > self.threshold).astype(np.uint8)
# Morphological operations to clean up noise
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
motion_mask = cv2.morphologyEx(motion_mask, cv2.MORPH_CLOSE, kernel)
motion_mask = cv2.morphologyEx(motion_mask, cv2.MORPH_OPEN, kernel)
# Find connected components
labeled, num_features = ndimage.label(motion_mask)
# Extract bounding boxes for each component
bboxes = []
for i in range(1, num_features + 1):
component_mask = (labeled == i)
area = np.sum(component_mask)
if area < self.min_area:
continue
# Find bounding box
coords = np.argwhere(component_mask)
y_min, x_min = coords.min(axis=0)
y_max, x_max = coords.max(axis=0)
bboxes.append((int(x_min), int(y_min),
int(x_max - x_min), int(y_max - y_min)))
return bboxes, motion_mask
def visualize_detections(self, frame, bboxes, flow):
"""
Draw bounding boxes and motion vectors on frame
"""
result = frame.copy()
for (x, y, w, h) in bboxes:
# Draw bounding box
cv2.rectangle(result, (x, y), (x+w, y+h), (0, 255, 0), 2)
# Draw average motion vector for this region
roi_flow = flow[y:y+h, x:x+w]
avg_flow = roi_flow.mean(axis=(0, 1))
center = (x + w//2, y + h//2)
end_point = (int(center[0] + avg_flow[0] * 5),
int(center[1] + avg_flow[1] * 5))
cv2.arrowedLine(result, center, end_point, (255, 0, 0), 2)
return result
# Integration with video tracker
detector = MotionDetector(magnitude_threshold=3.0, min_area=800)
ret, prev_frame = cap.read()
while True:
ret, curr_frame = cap.read()
if not ret:
break
flow = flow_estimator.compute_flow(prev_frame, curr_frame)
bboxes, motion_mask = detector.detect_moving_objects(flow)
result = detector.visualize_detections(curr_frame, bboxes, flow)
cv2.imshow('Motion Detection', result)
prev_frame = curr_frame
Optimization Strategies for Edge Deployment
Model Quantization with ONNX
import torch.onnx
import onnxruntime as ort
class ONNXFlowEstimator:
def __init__(self, onnx_path):
# Load ONNX model with optimized runtime
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(
onnx_path,
sess_options,
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self.input_name = self.session.get_inputs()[0].name
def compute_flow(self, frame1, frame2):
# Prepare input (assuming preprocessing done)
input_tensor = np.concatenate([frame1, frame2], axis=1)
# Run inference
flow = self.session.run(None, {self.input_name: input_tensor})[0]
return flow
# Export PyTorch RAFT model to ONNX
def export_raft_to_onnx(model, output_path='raft.onnx'):
model.eval()
dummy_input1 = torch.randn(1, 3, 480, 640).cuda()
dummy_input2 = torch.randn(1, 3, 480, 640).cuda()
torch.onnx.export(
model,
(dummy_input1, dummy_input2),
output_path,
export_params=True,
opset_version=14,
do_constant_folding=True,
input_names=['frame1', 'frame2'],
output_names=['flow'],
dynamic_axes={
'frame1': {0: 'batch', 2: 'height', 3: 'width'},
'frame2': {0: 'batch', 2: 'height', 3: 'width'},
'flow': {0: 'batch', 2: 'height', 3: 'width'}
}
)
Resolution Scaling Trade-offs
| Input Resolution | RAFT FPS | FlowFormer FPS | Accuracy Loss (EPE) |
|---|---|---|---|
| 1024×436 (full) | 25 | 12 | 0% (baseline) |
| 768×328 (0.75×) | 45 | 22 | +0.15 |
| 512×218 (0.5×) | 95 | 48 | +0.35 |
| 384×164 (0.375×) | 165 | 82 | +0.68 |
Strategy: Process at 0.5× resolution for real-time applications (>60 FPS), then upsample flow using bilinear interpolation scaled by the resolution factor.
Common Pitfalls and Solutions
Issue 1: Padding Artifacts at Boundaries
Problem: Models require input dimensions divisible by 8, causing edge artifacts after unpadding.
Solution: Use reflection padding instead of zero padding:
def pad_image_reflect(image, divisor=8):
h, w = image.shape[-2:]
pad_h = (divisor - h % divisor) % divisor
pad_w = (divisor - w % divisor) % divisor
# Apply reflection padding
padded = torch.nn.functional.pad(
image,
(0, pad_w, 0, pad_h),
mode='reflect'
)
return padded, (pad_h, pad_w)
Issue 2: Temporal Flickering in Flow Estimates
Problem: Frame-by-frame processing causes inconsistent flow, especially in static regions.
Solution: Apply temporal smoothing with exponential moving average:
class TemporalFlowSmoother:
def __init__(self, alpha=0.7):
self.alpha = alpha
self.prev_flow = None
def smooth(self, flow_current):
if self.prev_flow is None:
self.prev_flow = flow_current
return flow_current
# Exponential moving average
flow_smoothed = self.alpha * flow_current + (1 - self.alpha) * self.prev_flow
self.prev_flow = flow_smoothed
return flow_smoothed
Issue 3: Out-of-Memory Errors on High-Resolution Video
Solution: Use gradient checkpointing and mixed precision:
import torch.cuda.amp as amp
class MemoryEfficientFlowEstimator:
def __init__(self, model):
self.model = model
self.scaler = amp.GradScaler()
@torch.no_grad()
def compute_flow(self, frame1, frame2):
with amp.autocast():
flow = self.model(frame1, frame2)
return flow.float() # Convert back to FP32 for downstream tasks
Conclusion
Optical flow estimation has evolved from classical variational methods to sophisticated deep learning architectures like RAFT and FlowFormer, enabling accurate real-time motion tracking across diverse applications.
Key takeaways:
- RAFT provides the best balance of speed (25-60 FPS) and accuracy through iterative refinement with all-pairs correlations, ideal for real-time systems
- FlowFormer achieves state-of-the-art accuracy using transformer attention mechanisms but trades off inference speed (8-12 FPS)
- Implementation considerations: Always use reflection padding, apply temporal smoothing for video, and leverage ONNX/quantization for edge deployment
- Application-specific tuning: Adjust iteration count (RAFT) or resolution scaling based on speed/accuracy requirements
- Motion detection: Flow magnitude thresholding + connected components analysis enables effective moving object detection without explicit tracking
For production systems, start prototyping with RAFT-small for rapid iteration, then optimize with full RAFT or FlowFormer based on performance profiling. The combination of optical flow with modern object detectors (YOLO, etc.) creates powerful multi-modal tracking systems that excel in challenging scenarios like occlusions and fast motion.
The field continues advancing with hybrid architectures (GMFlow, FlowFormer++) and self-supervised training methods, promising even better performance without expensive ground truth annotation. Whether building autonomous navigation systems or creative video effects tools, mastering these optical flow techniques provides a solid foundation for sophisticated motion understanding.
Did you find this helpful?
☕ Buy me a coffee
Leave a Reply