Introduction
In the previous five episodes of this series, we explored the foundations of reinforcement learning—from MDPs and custom Gym environments to policy gradient methods, Stable Baselines3, and reward engineering. Now we face the ultimate challenge: deploying RL policies in the real world.
Simulation is where most RL breakthroughs happen. It’s safe, fast, and cheap. But policies trained in simulation often fail catastrophically when deployed on real robots, trading systems, or industrial equipment. This phenomenon is known as the sim-to-real gap.
In this final episode, we’ll explore why this gap exists, techniques to bridge it (domain randomization, system identification, transfer learning), and real-world deployment considerations for both robotics and financial applications. We’ll also look at emerging approaches like world models and offline RL that promise safer, more robust real-world deployment.
The Sim-to-Real Gap: Why Policies Fail in Reality
The sim-to-real gap arises from discrepancies between simulation and reality across multiple dimensions:
| Dimension | Simulation | Reality |
|---|---|---|
| Physics | Perfect equations, no friction modeling errors | Complex contact dynamics, wear, temperature effects |
| Sensing | Noise-free observations | Sensor noise, occlusions, calibration drift |
| Actuation | Instantaneous, precise control | Latency, backlash, actuator limits |
| Dynamics | Deterministic or controlled stochastic | Unmodeled disturbances, regime shifts |
| Visual | Simplified textures, lighting | Complex lighting, reflections, motion blur |
Example: Robot Grasping
Consider a robotic arm trained in simulation to grasp objects:
- In simulation: The gripper closes with perfect force control, the object has known mass and friction coefficients, vision provides clean RGB-D data.
- In reality: Gripper force sensors are noisy, objects slip due to unmodeled surface properties, lighting changes affect vision, and cable stiffness creates unexpected forces.
The policy overfits to the simulator’s idealized physics and fails when these assumptions break.
Example: Algorithmic Trading
A trading agent trained on historical backtests:
- In simulation: Perfect execution at closing prices, no slippage, infinite liquidity.
- In reality: Orders take time to fill, prices move against you (slippage), market microstructure matters, and regime changes (e.g., COVID-19) invalidate historical patterns.
The policy learns to exploit backtesting artifacts rather than robust market inefficiencies.
Domain Randomization: Embracing Uncertainty
Domain randomization (DR) is a simple yet powerful technique: randomize simulation parameters during training so the policy learns to be robust to variation. When deployed to the real world, reality is just another sample from the randomized distribution.
What to Randomize
- Physics parameters: Mass, friction, damping, restitution coefficients
- Visual appearance: Textures, colors, lighting, camera position
- Dynamics: Actuator gains, time delays, action noise
- Environmental factors: Object positions, sizes, external forces
Implementation Example
Let’s randomize a simple CartPole environment:
import gymnasium as gym
import numpy as np
from gymnasium.wrappers import TimeLimit
class RandomizedCartPoleEnv(gym.Wrapper):
"""
Domain randomization wrapper for CartPole.
Randomizes pole length, mass, and friction on each episode reset.
"""
def __init__(self, env):
super().__init__(env)
# Store default parameters
self.default_length = self.env.unwrapped.length
self.default_masspole = self.env.unwrapped.masspole
self.default_friction = self.env.unwrapped.tau # Friction approximation
def reset(self, **kwargs):
# Randomize pole length ±30%
self.env.unwrapped.length = self.default_length * np.random.uniform(0.7, 1.3)
# Randomize pole mass ±50%
self.env.unwrapped.masspole = self.default_masspole * np.random.uniform(0.5, 1.5)
# Randomize friction ±20%
self.env.unwrapped.tau = self.default_friction * np.random.uniform(0.8, 1.2)
# Update derived quantities
self.env.unwrapped.total_mass = (self.env.unwrapped.masspole +
self.env.unwrapped.masscart)
self.env.unwrapped.polemass_length = (self.env.unwrapped.masspole *
self.env.unwrapped.length)
return self.env.reset(**kwargs)
# Usage with Stable Baselines3
from stable_baselines3 import PPO
env = RandomizedCartPoleEnv(gym.make('CartPole-v1'))
model = PPO('MlpPolicy', env, verbose=1)
model.learn(total_timesteps=50000)
Visual Domain Randomization for Vision-Based Tasks
For robotic vision tasks, randomize textures, lighting, and backgrounds:
import cv2
import numpy as np
class VisualRandomizationWrapper(gym.ObservationWrapper):
"""
Randomizes brightness, contrast, and adds noise to image observations.
"""
def observation(self, obs):
# Assume obs is RGB image (H, W, 3)
img = obs.astype(np.float32)
# Random brightness adjustment ±30%
brightness = np.random.uniform(0.7, 1.3)
img *= brightness
# Random contrast adjustment ±20%
contrast = np.random.uniform(0.8, 1.2)
img = (img - 128) * contrast + 128
# Add Gaussian noise
noise = np.random.normal(0, 5, img.shape)
img += noise
# Clip to valid range
img = np.clip(img, 0, 255).astype(np.uint8)
return img
When Domain Randomization Works
- Large randomization range: The real-world parameters fall within the randomized distribution.
- Sufficient training data: The policy sees enough diversity to generalize.
- Smooth policies: The task allows robust, conservative strategies (e.g., grasping with higher force margin).
Limitations
- Over-conservatism: Policies may become overly cautious, sacrificing performance.
- Sample inefficiency: Training on diverse scenarios requires more data.
- Unknown unknowns: You can only randomize what you anticipate varying.
System Identification: Calibrating the Simulator
Instead of randomizing, we can calibrate the simulator to match reality using real-world data. This is called system identification.
The Process
- Collect real-world trajectory data: Execute actions on the real system, record states.
- Define parameterized simulator: Create a simulator with tunable parameters (e.g., friction coefficient , damping ).
- Optimize parameters: Minimize the difference between simulated and real trajectories:
Where:
– = optimal simulator parameters
– = real-world state at time
– = simulated state with parameters
– = trajectory length
Example: Calibrating a Pendulum Simulator
import numpy as np
from scipy.optimize import minimize
class PendulumSimulator:
"""Simple pendulum simulator with tunable damping."""
def __init__(self, damping=0.1, length=1.0, mass=1.0, dt=0.01):
self.damping = damping # Tunable parameter
self.length = length
self.mass = mass
self.g = 9.81
self.dt = dt
def step(self, theta, theta_dot, torque):
"""Simulate one step. Returns next (theta, theta_dot)."""
# Equation of motion: theta_ddot = (torque - damping*theta_dot - m*g*L*sin(theta)) / (m*L^2)
theta_ddot = (torque - self.damping * theta_dot -
self.mass * self.g * self.length * np.sin(theta)) / (self.mass * self.length**2)
theta_dot_new = theta_dot + theta_ddot * self.dt
theta_new = theta + theta_dot_new * self.dt
return theta_new, theta_dot_new
def simulate_trajectory(self, initial_state, actions):
"""Simulate full trajectory given initial state and action sequence."""
theta, theta_dot = initial_state
states = [(theta, theta_dot)]
for action in actions:
theta, theta_dot = self.step(theta, theta_dot, action)
states.append((theta, theta_dot))
return np.array(states)
def calibrate_damping(real_trajectory, actions, initial_state):
"""
Find optimal damping parameter that matches real trajectory.
Args:
real_trajectory: (T+1, 2) array of (theta, theta_dot) from real system
actions: (T,) array of torques applied
initial_state: (theta_0, theta_dot_0)
"""
def loss(damping):
sim = PendulumSimulator(damping=damping[0])
sim_trajectory = sim.simulate_trajectory(initial_state, actions)
return np.sum((real_trajectory - sim_trajectory)**2)
result = minimize(loss, x0=[0.1], bounds=[(0.0, 1.0)], method='L-BFGS-B')
return result.x[0]
# Example usage
real_trajectory = np.load('real_pendulum_trajectory.npy') # From real robot
actions = np.load('pendulum_actions.npy')
initial_state = real_trajectory[0]
optimal_damping = calibrate_damping(real_trajectory, actions, initial_state)
print(f"Optimal damping coefficient: {optimal_damping:.4f}")
Advanced: Bayesian System Identification
Use Bayesian optimization to handle noisy observations and uncertainty quantification:
from skopt import gp_minimize
from skopt.space import Real
def bayesian_calibrate(real_trajectory, actions, initial_state):
"""
Bayesian optimization for system identification.
Returns distribution over parameters, not just point estimate.
"""
space = [Real(0.0, 1.0, name='damping'),
Real(0.5, 1.5, name='length')]
def objective(params):
damping, length = params
sim = PendulumSimulator(damping=damping, length=length)
sim_trajectory = sim.simulate_trajectory(initial_state, actions)
return np.sum((real_trajectory - sim_trajectory)**2)
result = gp_minimize(objective, space, n_calls=50, random_state=42)
return result.x # [optimal_damping, optimal_length]
Transfer Learning Approaches
When the sim-to-real gap is large, we can use transfer learning: start with a policy trained in simulation, then adapt it using limited real-world data.
Fine-Tuning
The simplest approach: continue training the simulated policy on real-world data.
from stable_baselines3 import SAC
# Step 1: Train in simulation
sim_env = gym.make('Pendulum-v1')
sim_model = SAC('MlpPolicy', sim_env, verbose=1)
sim_model.learn(total_timesteps=100000)
# Step 2: Fine-tune on real robot (assuming real_env wrapper exists)
real_env = RealPendulumEnv() # Connects to actual hardware
sim_model.set_env(real_env)
sim_model.learn(total_timesteps=5000) # Limited real-world data
sim_model.save('fine_tuned_policy')
Challenges:
– Risk of catastrophic forgetting (losing simulation knowledge)
– Real-world data collection is expensive and risky
Progressive Neural Networks
Progressive nets freeze the simulation policy and add new “columns” for real-world adaptation, preventing forgetting:
import torch
import torch.nn as nn
class ProgressiveNet(nn.Module):
"""
Two-column progressive network.
Column 1 (frozen): trained in simulation
Column 2 (trainable): adapts to real world using lateral connections
"""
def __init__(self, obs_dim, act_dim, hidden_dim=64):
super().__init__()
# Column 1: Simulation policy (frozen)
self.sim_fc1 = nn.Linear(obs_dim, hidden_dim)
self.sim_fc2 = nn.Linear(hidden_dim, hidden_dim)
self.sim_fc3 = nn.Linear(hidden_dim, act_dim)
# Column 2: Real-world policy (trainable)
self.real_fc1 = nn.Linear(obs_dim, hidden_dim)
self.real_fc2 = nn.Linear(hidden_dim * 2, hidden_dim) # Lateral connection from col1
self.real_fc3 = nn.Linear(hidden_dim * 2, act_dim) # Lateral connection from col1
# Freeze simulation column
for param in [self.sim_fc1, self.sim_fc2, self.sim_fc3]:
param.requires_grad = False
def forward(self, x, use_real=True):
# Column 1 forward pass (always computed)
h1_sim = torch.relu(self.sim_fc1(x))
h2_sim = torch.relu(self.sim_fc2(h1_sim))
if not use_real:
return torch.tanh(self.sim_fc3(h2_sim))
# Column 2 forward pass with lateral connections
h1_real = torch.relu(self.real_fc1(x))
h2_real = torch.relu(self.real_fc2(torch.cat([h1_real, h1_sim], dim=-1)))
action = torch.tanh(self.real_fc3(torch.cat([h2_real, h2_sim], dim=-1)))
return action
Domain Adaptation with Adversarial Training
Train a domain discriminator to make simulation and real-world state distributions indistinguishable:
Where:
– = standard RL loss (policy gradient, Q-learning)
– = binary classification loss (sim vs real)
– = adversarial loss weight
The policy learns features that work in both domains by fooling the discriminator.
Real-World Deployment Considerations
Safety Constraints
Real systems can be damaged. Use constrained RL or safety wrappers:
class SafetyWrapper(gym.ActionWrapper):
"""
Clips actions and enforces joint limits for robot safety.
"""
def __init__(self, env, max_joint_velocity=1.0, forbidden_zones=None):
super().__init__(env)
self.max_velocity = max_joint_velocity
self.forbidden_zones = forbidden_zones or [] # List of (min, max) tuples
def action(self, act):
# Clip velocities
act = np.clip(act, -self.max_velocity, self.max_velocity)
# Check forbidden zones (e.g., collision regions)
current_pos = self.env.unwrapped.robot.get_joint_positions()
for i, (min_pos, max_pos) in enumerate(self.forbidden_zones):
if min_pos <= current_pos[i] <= max_pos:
act[i] = 0.0 # Stop motion in forbidden zone
return act
Action Smoothing
RL policies can produce jerky actions. Apply exponential smoothing:
class ActionSmoothingWrapper(gym.ActionWrapper):
"""
Exponential moving average smoothing for actions.
"""
def __init__(self, env, alpha=0.3):
super().__init__(env)
self.alpha = alpha # Smoothing factor (0 = all previous, 1 = all current)
self.prev_action = None
def action(self, act):
if self.prev_action is None:
self.prev_action = act
# Exponential smoothing: a_smooth = alpha * a_current + (1 - alpha) * a_prev
smoothed = self.alpha * act + (1 - self.alpha) * self.prev_action
self.prev_action = smoothed
return smoothed
def reset(self, **kwargs):
self.prev_action = None
return self.env.reset(**kwargs)
Latency Handling
Real systems have observation and actuation delays. Model latency explicitly:
from collections import deque
class LatencyWrapper(gym.Wrapper):
"""
Simulates observation and action latency.
"""
def __init__(self, env, obs_delay=2, act_delay=1):
super().__init__(env)
self.obs_delay = obs_delay # Steps of observation delay
self.act_delay = act_delay # Steps of action delay
self.obs_buffer = deque(maxlen=obs_delay + 1)
self.act_buffer = deque(maxlen=act_delay + 1)
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
self.obs_buffer.clear()
self.act_buffer.clear()
# Fill buffers with initial state
for _ in range(self.obs_delay + 1):
self.obs_buffer.append(obs)
return self.obs_buffer[0], info # Return delayed observation
def step(self, action):
# Add action to buffer
self.act_buffer.append(action)
# Execute delayed action
if len(self.act_buffer) > self.act_delay:
actual_action = self.act_buffer[0]
else:
actual_action = np.zeros_like(action) # Default action during warmup
obs, reward, terminated, truncated, info = self.env.step(actual_action)
# Add observation to buffer
self.obs_buffer.append(obs)
# Return delayed observation
return self.obs_buffer[0], reward, terminated, truncated, info
Financial Sim-to-Real: Backtesting vs Live Trading
The sim-to-real gap in finance manifests as the difference between backtesting and live trading.
Key Challenges
| Challenge | Description | Mitigation |
|---|---|---|
| Slippage | Order fills at worse prices than expected | Model slippage explicitly in simulation |
| Market impact | Large orders move prices | Use volume-weighted execution, limit order size |
| Latency | Delayed data feeds and execution | Include realistic latency in backtests |
| Regime changes | Market conditions shift (e.g., volatility spikes) | Train on multiple market regimes, use online adaptation |
| Lookahead bias | Accidentally using future data | Strict event-driven backtesting framework |
Modeling Slippage
import numpy as np
class SlippageModel:
"""
Realistic slippage model for backtesting.
Slippage increases with order size and volatility.
"""
def __init__(self, base_slippage_bps=5, volume_impact=0.01):
self.base_slippage_bps = base_slippage_bps # Base slippage in basis points
self.volume_impact = volume_impact # Impact per 1% of daily volume
def calculate_fill_price(self, desired_price, order_size, daily_volume, direction):
"""
Calculate actual fill price including slippage.
Args:
desired_price: Intended execution price
order_size: Number of shares to trade
daily_volume: Average daily trading volume
direction: 1 for buy, -1 for sell
"""
# Base slippage
slippage_pct = self.base_slippage_bps / 10000
# Volume impact (trading 1% of daily volume adds 0.01% slippage)
volume_fraction = order_size / daily_volume
slippage_pct += self.volume_impact * volume_fraction
# Random component (bid-ask bounce)
slippage_pct += np.random.normal(0, 0.0002) # ±2 bps noise
# Apply slippage (worse price for buys, worse for sells)
fill_price = desired_price * (1 + direction * slippage_pct)
return fill_price
# Example: Backtest with slippage
class TradingEnvWithSlippage(gym.Env):
def __init__(self, price_data, initial_cash=100000):
self.prices = price_data
self.cash = initial_cash
self.position = 0
self.slippage_model = SlippageModel()
self.current_step = 0
def step(self, action):
# action: fraction of portfolio to allocate (-1 to 1)
current_price = self.prices[self.current_step]
portfolio_value = self.cash + self.position * current_price
target_position = int(action * portfolio_value / current_price)
order_size = abs(target_position - self.position)
direction = 1 if target_position > self.position else -1
if order_size > 0:
# Calculate fill price with slippage
daily_volume = 1000000 # Example: 1M shares/day
fill_price = self.slippage_model.calculate_fill_price(
current_price, order_size, daily_volume, direction
)
# Execute trade
cost = order_size * fill_price
self.cash -= direction * cost
self.position = target_position
# Move to next step
self.current_step += 1
reward = (self.cash + self.position * self.prices[self.current_step]) - portfolio_value
terminated = self.current_step >= len(self.prices) - 1
return self._get_obs(), reward, terminated, False, {}
def _get_obs(self):
return np.array([self.position, self.cash, self.prices[self.current_step]])
Handling Regime Changes
Train on multiple market regimes (bull, bear, high volatility) and use context-aware policies:
class RegimeAwarePolicy(nn.Module):
"""
Policy that adapts to different market regimes.
Uses volatility as regime indicator.
"""
def __init__(self, obs_dim, act_dim, hidden_dim=64):
super().__init__()
self.regime_encoder = nn.Sequential(
nn.Linear(1, 32), # Input: volatility estimate
nn.ReLU()
)
self.policy_net = nn.Sequential(
nn.Linear(obs_dim + 32, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, act_dim),
nn.Tanh()
)
def forward(self, obs, volatility):
regime_embedding = self.regime_encoder(volatility.unsqueeze(-1))
combined = torch.cat([obs, regime_embedding], dim=-1)
return self.policy_net(combined)
Robotics Sim-to-Real: Tools and Pipeline
Modern Simulation Platforms
| Platform | Strengths | Use Case |
|---|---|---|
| Isaac Gym | GPU-accelerated, massive parallelism (1000s envs) | Large-scale RL training |
| MuJoCo | Accurate contact dynamics, fast | Manipulation, locomotion |
| PyBullet | Free, easy to use, ROS integration | Prototyping, education |
| Gazebo | ROS native, sensor simulation | Full robot system testing |
Example: Isaac Gym Domain Randomization
# Pseudocode for Isaac Gym randomization
from isaacgym import gymapi
def create_randomized_env(gym, sim, env_idx):
"""
Create environment with randomized parameters.
"""
# Randomize object mass
object_props = gymapi.RigidBodyProperties()
object_props.mass = np.random.uniform(0.1, 2.0)
# Randomize friction
object_props.friction = np.random.uniform(0.5, 1.5)
# Randomize visual appearance
color = gymapi.Vec3(np.random.rand(), np.random.rand(), np.random.rand())
# Create actor with randomized properties
actor_handle = gym.create_actor(env_idx, object_asset,
object_props, color)
return actor_handle
Real Robot Deployment Pipeline
- Train in simulation (Isaac Gym / MuJoCo) with domain randomization
- System identification on real robot (calibrate key parameters)
- Safe policy evaluation using safety wrappers and human supervision
- Collect real-world data (1-2 hours of robot operation)
- Fine-tune policy on real data using small learning rate
- Iterate: Improve simulation based on failure modes observed in reality
Future Directions
World Models
World models learn a predictive model of the environment, enabling policy training in the learned model instead of the true simulator. This reduces the sim-to-real gap if the world model is learned from real data.
import torch
import torch.nn as nn
class WorldModel(nn.Module):
"""
Simple world model: predicts next state given current state and action.
Train on real-world data, then use for policy optimization.
"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, state_dim)
)
def forward(self, state, action):
"""Predict next state."""
x = torch.cat([state, action], dim=-1)
delta = self.net(x)
return state + delta # Predict state delta for stability
def train_on_real_data(self, real_transitions, epochs=50):
"""Train world model on real (s, a, s') tuples."""
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
for epoch in range(epochs):
for s, a, s_next in real_transitions:
pred_next = self.forward(s, a)
loss = nn.MSELoss()(pred_next, s_next)
optimizer.zero_grad()
loss.backward()
optimizer.step()
Model-Based RL
Combine learned dynamics models with planning (e.g., Model Predictive Control) for safer deployment:
class MPCController:
"""
Model Predictive Control using learned world model.
Plans actions over horizon, executes first action, replans.
"""
def __init__(self, world_model, horizon=10, num_samples=100):
self.world_model = world_model
self.horizon = horizon
self.num_samples = num_samples
def plan(self, current_state, reward_fn):
"""Find best action sequence via random shooting."""
best_return = -float('inf')
best_action_seq = None
for _ in range(self.num_samples):
# Sample random action sequence
action_seq = torch.randn(self.horizon, self.action_dim)
# Rollout in world model
state = current_state
total_reward = 0
for t in range(self.horizon):
state = self.world_model(state, action_seq[t])
total_reward += reward_fn(state, action_seq[t])
if total_reward > best_return:
best_return = total_reward
best_action_seq = action_seq
return best_action_seq[0] # Return first action only
Offline RL for Safer Deployment
Offline RL trains policies purely from pre-collected datasets, avoiding risky online exploration:
from stable_baselines3 import CQL # Conservative Q-Learning
# Train on logged data (no environment interaction)
offline_dataset = load_real_world_trajectories() # (s, a, r, s') tuples
model = CQL('MlpPolicy', offline_dataset, verbose=1)
model.learn(total_timesteps=100000)
# Deploy trained policy (no online training needed)
model.save('offline_policy')
Offline RL is promising for high-stakes domains (robotics, healthcare, finance) where exploration is expensive or dangerous.
Conclusion
The sim-to-real gap is the final boss of reinforcement learning deployment. While simulation provides a safe, fast training ground, real-world success requires bridging the inevitable discrepancies through:
- Domain randomization: Training robust policies that generalize across parameter variations
- System identification: Calibrating simulators to match real-world dynamics
- Transfer learning: Leveraging simulation while adapting to real data
- Deployment best practices: Safety wrappers, action smoothing, latency handling
For financial applications, the gap manifests as backtesting vs live trading—requiring explicit slippage modeling, regime awareness, and conservative risk management. For robotics, modern tools like Isaac Gym and MuJoCo, combined with careful real-world fine-tuning, enable increasingly reliable deployment.
Emerging techniques—world models, model-based RL, offline RL—promise to further narrow the gap by learning directly from limited real-world data or enabling safer policy optimization.
Throughout this 6-part series, we’ve journeyed from the theoretical foundations of MDPs to the practical realities of deploying RL in the wild. The path from simulation to reality is challenging, but with the right tools and techniques, reinforcement learning is ready to solve real-world problems at scale.
Thank you for following along. Now go build something amazing—and remember to test it in the real world!
Did you find this helpful?
☕ Buy me a coffee
Leave a Reply