Introduction
Reward engineering is often cited as the most challenging aspect of applied reinforcement learning. While algorithms like PPO and SAC (discussed in Part 4) provide robust training mechanisms, the quality of the learned policy fundamentally depends on how well the reward function captures the desired behavior. A poorly designed reward can lead to reward hacking, where agents exploit loopholes to maximize scores without solving the intended task, or deceptive alignment, where behavior looks correct during training but fails in deployment.
This episode explores the art and science of reward function design, covering theoretical foundations like potential-based reward shaping, domain-specific patterns for finance and robotics, common pathologies, and intrinsic motivation techniques. We’ll implement practical examples for both a trading bot and a robotic manipulation task.
The Theory of Reward Shaping
Potential-Based Reward Shaping
Reward shaping modifies the original reward function to accelerate learning without changing the optimal policy. The key insight from Ng, Harada, and Russell (1999) is that adding a potential-based term guarantees policy invariance:
Where:
– is the original reward for transitioning from state to via action
– is the shaped reward
– is a potential function over states
– is the discount factor
This formulation is equivalent to adding a state-dependent “advice” term that guides the agent toward promising regions without altering the optimal policy.
import numpy as np
from typing import Callable
class PotentialBasedShaping:
"""Wrapper to add potential-based reward shaping to any environment."""
def __init__(self, env, potential_fn: Callable, gamma: float = 0.99):
self.env = env
self.potential_fn = potential_fn
self.gamma = gamma
self.prev_potential = None
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
self.prev_potential = self.potential_fn(obs)
return obs, info
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
# Apply potential-based shaping
current_potential = self.potential_fn(obs)
shaped_reward = reward + self.gamma * current_potential - self.prev_potential
self.prev_potential = current_potential
return obs, shaped_reward, terminated, truncated, info
# Example: distance-based potential for reaching a goal
def goal_distance_potential(obs, goal_position=np.array([10.0, 10.0])):
"""Potential function that increases as agent approaches goal."""
agent_position = obs[:2] # Assume first 2 dims are position
distance = np.linalg.norm(goal_position - agent_position)
return -distance # Negative distance (higher potential = closer to goal)
Financial Domain Rewards
Risk-Adjusted Returns
For trading agents, raw profit is insufficient because it ignores risk. The Sharpe ratio provides a risk-adjusted performance metric:
Where:
– is the expected excess return over the risk-free rate
– is the standard deviation of returns
We can incorporate this into a reward function:
import gymnasium as gym
from collections import deque
class TradingEnv(gym.Env):
"""Financial trading environment with risk-adjusted rewards."""
def __init__(self, price_data, initial_capital=10000, window_size=30):
super().__init__()
self.price_data = price_data
self.initial_capital = initial_capital
self.window_size = window_size
# Action: [position_change, leverage] where position_change in [-1, 1]
self.action_space = gym.spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)
# Observation: price history + portfolio state
self.observation_space = gym.spaces.Box(
low=-np.inf, high=np.inf,
shape=(window_size + 3,), dtype=np.float32
)
self.returns_history = deque(maxlen=100)
self.transaction_costs = 0.001 # 0.1% per trade
def calculate_reward(self, new_portfolio_value, trade_volume):
"""Multi-component reward with risk adjustment."""
# Component 1: Portfolio return
portfolio_return = (new_portfolio_value - self.prev_value) / self.prev_value
self.returns_history.append(portfolio_return)
# Component 2: Sharpe ratio (computed over recent window)
if len(self.returns_history) >= 30:
returns_array = np.array(self.returns_history)
sharpe = np.mean(returns_array) / (np.std(returns_array) + 1e-6)
risk_adjusted_reward = sharpe * 0.1 # Scale factor
else:
risk_adjusted_reward = 0
# Component 3: Drawdown penalty
current_drawdown = (self.peak_value - new_portfolio_value) / self.peak_value
drawdown_penalty = -10.0 * max(0, current_drawdown - 0.1) # Penalize >10% drawdown
# Component 4: Transaction cost
transaction_cost = -self.transaction_costs * trade_volume * new_portfolio_value
# Component 5: Holding penalty (encourage active trading)
holding_penalty = -0.0001 if trade_volume < 0.01 else 0
total_reward = (
portfolio_return * 100 + # Scale return to meaningful range
risk_adjusted_reward +
drawdown_penalty +
transaction_cost +
holding_penalty
)
return total_reward
def step(self, action):
position_change, leverage = action
leverage = np.clip(leverage, 0.5, 2.0) # Limit leverage
# Execute trade and compute new portfolio value
trade_volume = abs(position_change)
new_value = self._execute_trade(position_change, leverage)
reward = self.calculate_reward(new_value, trade_volume)
# Update state
self.peak_value = max(self.peak_value, new_value)
self.prev_value = new_value
self.current_step += 1
terminated = new_value < self.initial_capital * 0.5 # Stop if 50% loss
truncated = self.current_step >= len(self.price_data) - self.window_size
obs = self._get_observation()
return obs, reward, terminated, truncated, {}
def _execute_trade(self, position_change, leverage):
# Simplified trade execution (actual implementation would be more complex)
price_change = self.price_data[self.current_step] / self.price_data[self.current_step - 1] - 1
portfolio_change = position_change * leverage * price_change
return self.prev_value * (1 + portfolio_change)
def _get_observation(self):
# Return normalized price history + portfolio state
price_window = self.price_data[self.current_step - self.window_size:self.current_step]
price_features = (price_window - np.mean(price_window)) / (np.std(price_window) + 1e-6)
portfolio_features = np.array([
self.prev_value / self.initial_capital,
(self.prev_value - self.peak_value) / self.peak_value, # Current drawdown
self.position
])
return np.concatenate([price_features, portfolio_features]).astype(np.float32)
Robotics Domain Rewards
Sparse vs. Dense Rewards
Robotics tasks face the exploration challenge: sparse rewards (e.g., +1 only when goal is reached) provide little guidance, while dense rewards (e.g., negative distance at each step) can introduce local optima.
Sparse reward example (pick-and-place):
def sparse_reward(gripper_pos, object_pos, target_pos, object_grasped):
"""Binary reward: 1.0 if object placed at target, else 0.0."""
if object_grasped and np.linalg.norm(object_pos - target_pos) < 0.05:
return 1.0
return 0.0
Dense reward example:
def dense_reward(gripper_pos, object_pos, target_pos, object_grasped):
"""Distance-based reward with staged objectives."""
# Stage 1: Reach the object
reach_distance = np.linalg.norm(gripper_pos - object_pos)
reach_reward = -reach_distance
# Stage 2: Grasp the object
grasp_reward = 2.0 if object_grasped else 0.0
# Stage 3: Move object to target
if object_grasped:
place_distance = np.linalg.norm(object_pos - target_pos)
place_reward = -place_distance + 5.0 / (place_distance + 0.1) # Asymptotic bonus
else:
place_reward = 0.0
# Stage 4: Success bonus
success_bonus = 10.0 if (object_grasped and place_distance < 0.05) else 0.0
return reach_reward + grasp_reward + place_reward + success_bonus
Curriculum Learning
Gradually increasing task difficulty helps agents learn complex behaviors:
class CurriculumRoboticArm:
"""Robotic arm environment with automatic curriculum."""
def __init__(self):
self.success_rate = deque(maxlen=100)
self.current_difficulty = 0
self.difficulties = [
{"target_distance": 0.2, "target_height": 0.1}, # Easy: close & low
{"target_distance": 0.4, "target_height": 0.2}, # Medium
{"target_distance": 0.6, "target_height": 0.3}, # Hard: far & high
]
def reset(self, **kwargs):
# Update curriculum based on recent performance
if len(self.success_rate) >= 100:
avg_success = np.mean(self.success_rate)
if avg_success > 0.8 and self.current_difficulty < len(self.difficulties) - 1:
self.current_difficulty += 1
print(f"Curriculum advanced to difficulty {self.current_difficulty}")
elif avg_success < 0.3 and self.current_difficulty > 0:
self.current_difficulty -= 1
print(f"Curriculum reduced to difficulty {self.current_difficulty}")
# Sample target based on current difficulty
diff = self.difficulties[self.current_difficulty]
self.target_pos = self._sample_target(diff["target_distance"], diff["target_height"])
return self._get_observation(), {}
def step(self, action):
# Execute action, compute reward
obs, reward, terminated, truncated, info = self._step_simulation(action)
# Track success for curriculum
if terminated and info.get("is_success", False):
self.success_rate.append(1.0)
elif terminated:
self.success_rate.append(0.0)
return obs, reward, terminated, truncated, info
Hindsight Experience Replay (HER)
HER addresses sparse rewards by relabeling failed episodes with alternative goals:
from stable_baselines3 import HerReplayBuffer, SAC
# Create environment with dict observations (goal-conditioned)
env = gym.make("FetchReach-v2") # Has 'observation', 'achieved_goal', 'desired_goal'
# Train with HER
model = SAC(
"MultiInputPolicy",
env,
replay_buffer_class=HerReplayBuffer,
replay_buffer_kwargs=dict(
n_sampled_goal=4, # Relabel each transition with 4 alternative goals
goal_selection_strategy="future", # Use future states as goals
),
verbose=1
)
model.learn(total_timesteps=100000)
Common Reward Pathologies
Reward Hacking
Agents exploit unintended loopholes in reward specifications. Classic example from robotics:
Problem: Reward for “moving forward” in simulation.
def bad_forward_reward(robot_velocity):
return robot_velocity[0] # X-axis velocity
Exploit: Robot learns to fall forward (high velocity, no actual locomotion).
Fix: Multi-component reward with constraints.
def robust_forward_reward(robot_velocity, robot_height, joint_torques):
forward_component = robot_velocity[0]
height_penalty = -10.0 if robot_height < 0.5 else 0.0 # Penalize falling
energy_penalty = -0.01 * np.sum(np.abs(joint_torques)) # Penalize inefficiency
return forward_component + height_penalty + energy_penalty
Reward Sparsity
Table summarizing mitigation strategies:
| Strategy | Description | Use Case |
|---|---|---|
| Reward Shaping | Add potential-based guidance | Navigation, reaching tasks |
| Curriculum Learning | Gradually increase difficulty | Complex manipulation |
| HER | Relabel goals in replay buffer | Goal-conditioned tasks |
| Intrinsic Motivation | Add exploration bonuses | Environments with no external rewards |
| Imitation Learning | Pre-train from demonstrations | High-dimensional action spaces |
Intrinsic Motivation
Curiosity-Driven Exploration
Intrinsic Curiosity Module (ICM) adds exploration bonus based on prediction error:
Where:
– is a learned feature representation
– is the predicted next-state features given and
– is a scaling factor
The agent is rewarded for encountering surprising (unpredictable) states.
import torch
import torch.nn as nn
class ICMModule(nn.Module):
"""Intrinsic Curiosity Module for exploration."""
def __init__(self, obs_dim, action_dim, feature_dim=64):
super().__init__()
# Feature encoder
self.encoder = nn.Sequential(
nn.Linear(obs_dim, 128),
nn.ReLU(),
nn.Linear(128, feature_dim)
)
# Forward model: predict next state features
self.forward_model = nn.Sequential(
nn.Linear(feature_dim + action_dim, 128),
nn.ReLU(),
nn.Linear(128, feature_dim)
)
# Inverse model: predict action from state transition
self.inverse_model = nn.Sequential(
nn.Linear(feature_dim * 2, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)
def compute_intrinsic_reward(self, obs, action, next_obs):
"""Compute curiosity-based intrinsic reward."""
with torch.no_grad():
phi = self.encoder(obs)
phi_next = self.encoder(next_obs)
phi_next_pred = self.forward_model(torch.cat([phi, action], dim=-1))
# Prediction error as intrinsic reward
intrinsic_reward = 0.5 * torch.sum((phi_next_pred - phi_next) ** 2, dim=-1)
return intrinsic_reward.cpu().numpy()
def train_step(self, obs, action, next_obs):
"""Train ICM on batch of transitions."""
phi = self.encoder(obs)
phi_next = self.encoder(next_obs)
# Forward model loss
phi_next_pred = self.forward_model(torch.cat([phi, action], dim=-1))
forward_loss = nn.MSELoss()(phi_next_pred, phi_next.detach())
# Inverse model loss
action_pred = self.inverse_model(torch.cat([phi, phi_next], dim=-1))
inverse_loss = nn.MSELoss()(action_pred, action)
total_loss = forward_loss + inverse_loss
return total_loss
Random Network Distillation (RND)
RND uses prediction error of a fixed random network as exploration bonus:
class RNDModule(nn.Module):
"""Random Network Distillation for exploration."""
def __init__(self, obs_dim, output_dim=64):
super().__init__()
# Fixed random target network
self.target_net = nn.Sequential(
nn.Linear(obs_dim, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
for param in self.target_net.parameters():
param.requires_grad = False # Frozen
# Trainable predictor network
self.predictor_net = nn.Sequential(
nn.Linear(obs_dim, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
def compute_intrinsic_reward(self, obs):
"""Novel states have high prediction error."""
with torch.no_grad():
target = self.target_net(obs)
prediction = self.predictor_net(obs)
intrinsic_reward = torch.sum((prediction - target) ** 2, dim=-1)
return intrinsic_reward.cpu().numpy()
Multi-Objective Rewards
Many real-world tasks involve conflicting objectives (e.g., profit vs. risk, speed vs. safety). Scalarization combines objectives:
Where are manually tuned weights. More sophisticated approaches use Pareto optimization:
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.optimize import minimize
class MultiObjectiveRLWrapper:
"""Train multiple agents on Pareto frontier of objectives."""
def __init__(self, env, objectives_names):
self.env = env
self.objectives = objectives_names
self.agents = [] # Population of agents
def evaluate_agent(self, agent, n_episodes=10):
"""Evaluate agent on all objectives."""
objective_returns = {obj: [] for obj in self.objectives}
for _ in range(n_episodes):
obs, _ = self.env.reset()
episode_objectives = {obj: 0.0 for obj in self.objectives}
while True:
action = agent.predict(obs)
obs, rewards_dict, terminated, truncated, _ = self.env.step(action)
for obj in self.objectives:
episode_objectives[obj] += rewards_dict[obj]
if terminated or truncated:
break
for obj in self.objectives:
objective_returns[obj].append(episode_objectives[obj])
# Return mean performance on each objective
return [np.mean(objective_returns[obj]) for obj in self.objectives]
def get_pareto_optimal_weights(self, num_points=10):
"""Find Pareto-optimal weight combinations."""
# Example: for 2 objectives, sample weights w1 ∈ [0,1], w2 = 1-w1
if len(self.objectives) == 2:
return [(w, 1-w) for w in np.linspace(0, 1, num_points)]
else:
# For >2 objectives, use Dirichlet distribution
return np.random.dirichlet(np.ones(len(self.objectives)), size=num_points)
Practical Example: Complete Trading Bot
Putting it all together:
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
import yfinance as yf
# Download real price data
data = yf.download("AAPL", start="2020-01-01", end="2023-12-31")["Close"].values
# Create environment with sophisticated reward
env = TradingEnv(price_data=data, initial_capital=10000)
env = PotentialBasedShaping(env, goal_distance_potential, gamma=0.99)
env = DummyVecEnv([lambda: env])
# Train agent
model = PPO(
"MlpPolicy",
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
gamma=0.99,
verbose=1
)
model.learn(total_timesteps=100000)
# Evaluate
obs = env.reset()
total_reward = 0
for _ in range(1000):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
total_reward += reward
if done:
break
print(f"Final portfolio value: ${env.envs[0].prev_value:.2f}")
print(f"Total return: {(env.envs[0].prev_value / 10000 - 1) * 100:.2f}%")
Practical Example: Robotic Arm with Dense Rewards
import pybullet_envs
import gymnasium as gym
from stable_baselines3 import SAC
# Create robotic environment
env = gym.make("KukaBulletEnv-v0")
# Custom reward wrapper
class RoboticArmReward(gym.Wrapper):
def step(self, action):
obs, _, terminated, truncated, info = self.env.step(action)
# Extract state components
gripper_pos = obs[:3]
object_pos = obs[3:6]
target_pos = obs[6:9]
object_grasped = obs[9] > 0.5
# Apply dense reward function
reward = dense_reward(gripper_pos, object_pos, target_pos, object_grasped)
return obs, reward, terminated, truncated, info
env = RoboticArmReward(env)
# Train with SAC (continuous actions)
model = SAC("MlpPolicy", env, verbose=1, learning_rate=3e-4)
model.learn(total_timesteps=50000)
# Test
obs, _ = env.reset()
for _ in range(500):
action, _ = model.predict(obs, deterministic=True)
obs, reward, terminated, truncated, _ = env.step(action)
env.render()
if terminated or truncated:
obs, _ = env.reset()
Debugging Reward Functions
Common debugging techniques:
- Reward distribution analysis: Plot histogram of rewards per episode
- Component breakdown: Log individual reward components separately
- Manual policy testing: Test hand-coded policies to verify reward signal
- Ablation studies: Remove reward components one at a time
import matplotlib.pyplot as plt
def analyze_rewards(env, policy, n_episodes=100):
"""Collect and visualize reward statistics."""
episode_rewards = []
component_rewards = {"reach": [], "grasp": [], "place": [], "success": []}
for _ in range(n_episodes):
obs, _ = env.reset()
ep_reward = 0
ep_components = {k: 0 for k in component_rewards.keys()}
while True:
action = policy(obs)
obs, reward, terminated, truncated, info = env.step(action)
ep_reward += reward
# Assuming env returns component breakdown in info
for k in component_rewards.keys():
ep_components[k] += info.get(f"reward_{k}", 0)
if terminated or truncated:
break
episode_rewards.append(ep_reward)
for k, v in ep_components.items():
component_rewards[k].append(v)
# Visualize
fig, axes = plt.subplots(2, 1, figsize=(10, 8))
# Total rewards
axes[0].hist(episode_rewards, bins=30, alpha=0.7)
axes[0].set_xlabel("Episode Return")
axes[0].set_ylabel("Frequency")
axes[0].set_title("Distribution of Episode Returns")
# Component breakdown
components_array = np.array([component_rewards[k] for k in component_rewards.keys()])
axes[1].boxplot(components_array.T, labels=component_rewards.keys())
axes[1].set_ylabel("Reward Value")
axes[1].set_title("Reward Component Breakdown")
plt.tight_layout()
plt.savefig("reward_analysis.png")
print(f"Mean episode return: {np.mean(episode_rewards):.2f} ± {np.std(episode_rewards):.2f}")
Conclusion
Reward engineering remains the critical bottleneck in applied reinforcement learning. This episode covered the theoretical foundation of potential-based reward shaping, domain-specific patterns for financial and robotic applications, common failure modes like reward hacking and sparsity, and modern techniques like intrinsic motivation and curriculum learning.
Key takeaways:
– Always validate that your reward function incentivizes the intended behavior through manual testing
– Use multi-component rewards with explicit terms for constraints and auxiliary objectives
– For sparse reward tasks, consider HER, curriculum learning, or intrinsic motivation
– Monitor reward statistics during training to detect pathologies early
– Multi-objective formulations are preferable to hand-tuned weight combinations
In Part 6, we’ll address the final challenge: bridging the sim-to-real gap when deploying RL agents to physical systems. We’ll explore domain randomization, system identification, and real-world deployment strategies that ensure simulated policies transfer successfully to production environments.
Did you find this helpful?
☕ Buy me a coffee
Leave a Reply