Part 5: Reward Engineering: How to Shape Behaviors in Financial/Robotic Tasks

Updated Feb 6, 2026

Introduction

Reward engineering is often cited as the most challenging aspect of applied reinforcement learning. While algorithms like PPO and SAC (discussed in Part 4) provide robust training mechanisms, the quality of the learned policy fundamentally depends on how well the reward function captures the desired behavior. A poorly designed reward can lead to reward hacking, where agents exploit loopholes to maximize scores without solving the intended task, or deceptive alignment, where behavior looks correct during training but fails in deployment.

This episode explores the art and science of reward function design, covering theoretical foundations like potential-based reward shaping, domain-specific patterns for finance and robotics, common pathologies, and intrinsic motivation techniques. We’ll implement practical examples for both a trading bot and a robotic manipulation task.

The Theory of Reward Shaping

Potential-Based Reward Shaping

Reward shaping modifies the original reward function to accelerate learning without changing the optimal policy. The key insight from Ng, Harada, and Russell (1999) is that adding a potential-based term guarantees policy invariance:

R(s,a,s)=R(s,a,s)+γΦ(s)Φ(s)R'(s, a, s') = R(s, a, s') + \gamma \Phi(s') – \Phi(s)

Where:
R(s,a,s)R(s, a, s') is the original reward for transitioning from state ss to ss' via action aa
R(s,a,s)R'(s, a, s') is the shaped reward
Φ(s)\Phi(s) is a potential function over states
γ\gamma is the discount factor

This formulation is equivalent to adding a state-dependent “advice” term that guides the agent toward promising regions without altering the optimal policy.

import numpy as np
from typing import Callable

class PotentialBasedShaping:
    """Wrapper to add potential-based reward shaping to any environment."""

    def __init__(self, env, potential_fn: Callable, gamma: float = 0.99):
        self.env = env
        self.potential_fn = potential_fn
        self.gamma = gamma
        self.prev_potential = None

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self.prev_potential = self.potential_fn(obs)
        return obs, info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)

        # Apply potential-based shaping
        current_potential = self.potential_fn(obs)
        shaped_reward = reward + self.gamma * current_potential - self.prev_potential
        self.prev_potential = current_potential

        return obs, shaped_reward, terminated, truncated, info

# Example: distance-based potential for reaching a goal
def goal_distance_potential(obs, goal_position=np.array([10.0, 10.0])):
    """Potential function that increases as agent approaches goal."""
    agent_position = obs[:2]  # Assume first 2 dims are position
    distance = np.linalg.norm(goal_position - agent_position)
    return -distance  # Negative distance (higher potential = closer to goal)

Financial Domain Rewards

Risk-Adjusted Returns

For trading agents, raw profit is insufficient because it ignores risk. The Sharpe ratio provides a risk-adjusted performance metric:

Sharpe Ratio=E[RRf]σR\text{Sharpe Ratio} = \frac{E[R – R_f]}{\sigma_R}

Where:
E[RRf]E[R – R_f] is the expected excess return over the risk-free rate
σR\sigma_R is the standard deviation of returns

We can incorporate this into a reward function:

import gymnasium as gym
from collections import deque

class TradingEnv(gym.Env):
    """Financial trading environment with risk-adjusted rewards."""

    def __init__(self, price_data, initial_capital=10000, window_size=30):
        super().__init__()
        self.price_data = price_data
        self.initial_capital = initial_capital
        self.window_size = window_size

        # Action: [position_change, leverage] where position_change in [-1, 1]
        self.action_space = gym.spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)
        # Observation: price history + portfolio state
        self.observation_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, 
            shape=(window_size + 3,), dtype=np.float32
        )

        self.returns_history = deque(maxlen=100)
        self.transaction_costs = 0.001  # 0.1% per trade

    def calculate_reward(self, new_portfolio_value, trade_volume):
        """Multi-component reward with risk adjustment."""
        # Component 1: Portfolio return
        portfolio_return = (new_portfolio_value - self.prev_value) / self.prev_value
        self.returns_history.append(portfolio_return)

        # Component 2: Sharpe ratio (computed over recent window)
        if len(self.returns_history) >= 30:
            returns_array = np.array(self.returns_history)
            sharpe = np.mean(returns_array) / (np.std(returns_array) + 1e-6)
            risk_adjusted_reward = sharpe * 0.1  # Scale factor
        else:
            risk_adjusted_reward = 0

        # Component 3: Drawdown penalty
        current_drawdown = (self.peak_value - new_portfolio_value) / self.peak_value
        drawdown_penalty = -10.0 * max(0, current_drawdown - 0.1)  # Penalize >10% drawdown

        # Component 4: Transaction cost
        transaction_cost = -self.transaction_costs * trade_volume * new_portfolio_value

        # Component 5: Holding penalty (encourage active trading)
        holding_penalty = -0.0001 if trade_volume < 0.01 else 0

        total_reward = (
            portfolio_return * 100 +  # Scale return to meaningful range
            risk_adjusted_reward +
            drawdown_penalty +
            transaction_cost +
            holding_penalty
        )

        return total_reward

    def step(self, action):
        position_change, leverage = action
        leverage = np.clip(leverage, 0.5, 2.0)  # Limit leverage

        # Execute trade and compute new portfolio value
        trade_volume = abs(position_change)
        new_value = self._execute_trade(position_change, leverage)

        reward = self.calculate_reward(new_value, trade_volume)

        # Update state
        self.peak_value = max(self.peak_value, new_value)
        self.prev_value = new_value
        self.current_step += 1

        terminated = new_value < self.initial_capital * 0.5  # Stop if 50% loss
        truncated = self.current_step >= len(self.price_data) - self.window_size

        obs = self._get_observation()
        return obs, reward, terminated, truncated, {}

    def _execute_trade(self, position_change, leverage):
        # Simplified trade execution (actual implementation would be more complex)
        price_change = self.price_data[self.current_step] / self.price_data[self.current_step - 1] - 1
        portfolio_change = position_change * leverage * price_change
        return self.prev_value * (1 + portfolio_change)

    def _get_observation(self):
        # Return normalized price history + portfolio state
        price_window = self.price_data[self.current_step - self.window_size:self.current_step]
        price_features = (price_window - np.mean(price_window)) / (np.std(price_window) + 1e-6)
        portfolio_features = np.array([
            self.prev_value / self.initial_capital,
            (self.prev_value - self.peak_value) / self.peak_value,  # Current drawdown
            self.position
        ])
        return np.concatenate([price_features, portfolio_features]).astype(np.float32)

Robotics Domain Rewards

Sparse vs. Dense Rewards

Robotics tasks face the exploration challenge: sparse rewards (e.g., +1 only when goal is reached) provide little guidance, while dense rewards (e.g., negative distance at each step) can introduce local optima.

Sparse reward example (pick-and-place):

def sparse_reward(gripper_pos, object_pos, target_pos, object_grasped):
    """Binary reward: 1.0 if object placed at target, else 0.0."""
    if object_grasped and np.linalg.norm(object_pos - target_pos) < 0.05:
        return 1.0
    return 0.0

Dense reward example:

def dense_reward(gripper_pos, object_pos, target_pos, object_grasped):
    """Distance-based reward with staged objectives."""
    # Stage 1: Reach the object
    reach_distance = np.linalg.norm(gripper_pos - object_pos)
    reach_reward = -reach_distance

    # Stage 2: Grasp the object
    grasp_reward = 2.0 if object_grasped else 0.0

    # Stage 3: Move object to target
    if object_grasped:
        place_distance = np.linalg.norm(object_pos - target_pos)
        place_reward = -place_distance + 5.0 / (place_distance + 0.1)  # Asymptotic bonus
    else:
        place_reward = 0.0

    # Stage 4: Success bonus
    success_bonus = 10.0 if (object_grasped and place_distance < 0.05) else 0.0

    return reach_reward + grasp_reward + place_reward + success_bonus

Curriculum Learning

Gradually increasing task difficulty helps agents learn complex behaviors:

class CurriculumRoboticArm:
    """Robotic arm environment with automatic curriculum."""

    def __init__(self):
        self.success_rate = deque(maxlen=100)
        self.current_difficulty = 0
        self.difficulties = [
            {"target_distance": 0.2, "target_height": 0.1},  # Easy: close & low
            {"target_distance": 0.4, "target_height": 0.2},  # Medium
            {"target_distance": 0.6, "target_height": 0.3},  # Hard: far & high
        ]

    def reset(self, **kwargs):
        # Update curriculum based on recent performance
        if len(self.success_rate) >= 100:
            avg_success = np.mean(self.success_rate)
            if avg_success > 0.8 and self.current_difficulty < len(self.difficulties) - 1:
                self.current_difficulty += 1
                print(f"Curriculum advanced to difficulty {self.current_difficulty}")
            elif avg_success < 0.3 and self.current_difficulty > 0:
                self.current_difficulty -= 1
                print(f"Curriculum reduced to difficulty {self.current_difficulty}")

        # Sample target based on current difficulty
        diff = self.difficulties[self.current_difficulty]
        self.target_pos = self._sample_target(diff["target_distance"], diff["target_height"])

        return self._get_observation(), {}

    def step(self, action):
        # Execute action, compute reward
        obs, reward, terminated, truncated, info = self._step_simulation(action)

        # Track success for curriculum
        if terminated and info.get("is_success", False):
            self.success_rate.append(1.0)
        elif terminated:
            self.success_rate.append(0.0)

        return obs, reward, terminated, truncated, info

Hindsight Experience Replay (HER)

HER addresses sparse rewards by relabeling failed episodes with alternative goals:

from stable_baselines3 import HerReplayBuffer, SAC

# Create environment with dict observations (goal-conditioned)
env = gym.make("FetchReach-v2")  # Has 'observation', 'achieved_goal', 'desired_goal'

# Train with HER
model = SAC(
    "MultiInputPolicy",
    env,
    replay_buffer_class=HerReplayBuffer,
    replay_buffer_kwargs=dict(
        n_sampled_goal=4,  # Relabel each transition with 4 alternative goals
        goal_selection_strategy="future",  # Use future states as goals
    ),
    verbose=1
)

model.learn(total_timesteps=100000)

Common Reward Pathologies

Reward Hacking

Agents exploit unintended loopholes in reward specifications. Classic example from robotics:

Problem: Reward for “moving forward” in simulation.

def bad_forward_reward(robot_velocity):
    return robot_velocity[0]  # X-axis velocity

Exploit: Robot learns to fall forward (high velocity, no actual locomotion).

Fix: Multi-component reward with constraints.

def robust_forward_reward(robot_velocity, robot_height, joint_torques):
    forward_component = robot_velocity[0]
    height_penalty = -10.0 if robot_height < 0.5 else 0.0  # Penalize falling
    energy_penalty = -0.01 * np.sum(np.abs(joint_torques))  # Penalize inefficiency
    return forward_component + height_penalty + energy_penalty

Reward Sparsity

Table summarizing mitigation strategies:

Strategy Description Use Case
Reward Shaping Add potential-based guidance Navigation, reaching tasks
Curriculum Learning Gradually increase difficulty Complex manipulation
HER Relabel goals in replay buffer Goal-conditioned tasks
Intrinsic Motivation Add exploration bonuses Environments with no external rewards
Imitation Learning Pre-train from demonstrations High-dimensional action spaces

Intrinsic Motivation

Curiosity-Driven Exploration

Intrinsic Curiosity Module (ICM) adds exploration bonus based on prediction error:

rtintrinsic=ηϕ^(st+1)ϕ(st+1)2r_t^{intrinsic} = \eta \cdot \left\| \hat{\phi}(s_{t+1}) – \phi(s_{t+1}) \right\|^2

Where:
ϕ(s)\phi(s) is a learned feature representation
ϕ^(st+1)\hat{\phi}(s_{t+1}) is the predicted next-state features given sts_t and ata_t
η\eta is a scaling factor

The agent is rewarded for encountering surprising (unpredictable) states.

import torch
import torch.nn as nn

class ICMModule(nn.Module):
    """Intrinsic Curiosity Module for exploration."""

    def __init__(self, obs_dim, action_dim, feature_dim=64):
        super().__init__()
        # Feature encoder
        self.encoder = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, feature_dim)
        )

        # Forward model: predict next state features
        self.forward_model = nn.Sequential(
            nn.Linear(feature_dim + action_dim, 128),
            nn.ReLU(),
            nn.Linear(128, feature_dim)
        )

        # Inverse model: predict action from state transition
        self.inverse_model = nn.Sequential(
            nn.Linear(feature_dim * 2, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def compute_intrinsic_reward(self, obs, action, next_obs):
        """Compute curiosity-based intrinsic reward."""
        with torch.no_grad():
            phi = self.encoder(obs)
            phi_next = self.encoder(next_obs)
            phi_next_pred = self.forward_model(torch.cat([phi, action], dim=-1))

            # Prediction error as intrinsic reward
            intrinsic_reward = 0.5 * torch.sum((phi_next_pred - phi_next) ** 2, dim=-1)

        return intrinsic_reward.cpu().numpy()

    def train_step(self, obs, action, next_obs):
        """Train ICM on batch of transitions."""
        phi = self.encoder(obs)
        phi_next = self.encoder(next_obs)

        # Forward model loss
        phi_next_pred = self.forward_model(torch.cat([phi, action], dim=-1))
        forward_loss = nn.MSELoss()(phi_next_pred, phi_next.detach())

        # Inverse model loss
        action_pred = self.inverse_model(torch.cat([phi, phi_next], dim=-1))
        inverse_loss = nn.MSELoss()(action_pred, action)

        total_loss = forward_loss + inverse_loss
        return total_loss

Random Network Distillation (RND)

RND uses prediction error of a fixed random network as exploration bonus:

class RNDModule(nn.Module):
    """Random Network Distillation for exploration."""

    def __init__(self, obs_dim, output_dim=64):
        super().__init__()
        # Fixed random target network
        self.target_net = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )
        for param in self.target_net.parameters():
            param.requires_grad = False  # Frozen

        # Trainable predictor network
        self.predictor_net = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )

    def compute_intrinsic_reward(self, obs):
        """Novel states have high prediction error."""
        with torch.no_grad():
            target = self.target_net(obs)
            prediction = self.predictor_net(obs)
            intrinsic_reward = torch.sum((prediction - target) ** 2, dim=-1)
        return intrinsic_reward.cpu().numpy()

Multi-Objective Rewards

Many real-world tasks involve conflicting objectives (e.g., profit vs. risk, speed vs. safety). Scalarization combines objectives:

Rtotal=i=1nwiRiR_{total} = \sum_{i=1}^n w_i \cdot R_i

Where wiw_i are manually tuned weights. More sophisticated approaches use Pareto optimization:

from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.optimize import minimize

class MultiObjectiveRLWrapper:
    """Train multiple agents on Pareto frontier of objectives."""

    def __init__(self, env, objectives_names):
        self.env = env
        self.objectives = objectives_names
        self.agents = []  # Population of agents

    def evaluate_agent(self, agent, n_episodes=10):
        """Evaluate agent on all objectives."""
        objective_returns = {obj: [] for obj in self.objectives}

        for _ in range(n_episodes):
            obs, _ = self.env.reset()
            episode_objectives = {obj: 0.0 for obj in self.objectives}

            while True:
                action = agent.predict(obs)
                obs, rewards_dict, terminated, truncated, _ = self.env.step(action)

                for obj in self.objectives:
                    episode_objectives[obj] += rewards_dict[obj]

                if terminated or truncated:
                    break

            for obj in self.objectives:
                objective_returns[obj].append(episode_objectives[obj])

        # Return mean performance on each objective
        return [np.mean(objective_returns[obj]) for obj in self.objectives]

    def get_pareto_optimal_weights(self, num_points=10):
        """Find Pareto-optimal weight combinations."""
        # Example: for 2 objectives, sample weights w1 ∈ [0,1], w2 = 1-w1
        if len(self.objectives) == 2:
            return [(w, 1-w) for w in np.linspace(0, 1, num_points)]
        else:
            # For >2 objectives, use Dirichlet distribution
            return np.random.dirichlet(np.ones(len(self.objectives)), size=num_points)

Practical Example: Complete Trading Bot

Putting it all together:

import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
import yfinance as yf

# Download real price data
data = yf.download("AAPL", start="2020-01-01", end="2023-12-31")["Close"].values

# Create environment with sophisticated reward
env = TradingEnv(price_data=data, initial_capital=10000)
env = PotentialBasedShaping(env, goal_distance_potential, gamma=0.99)
env = DummyVecEnv([lambda: env])

# Train agent
model = PPO(
    "MlpPolicy",
    env,
    learning_rate=3e-4,
    n_steps=2048,
    batch_size=64,
    gamma=0.99,
    verbose=1
)

model.learn(total_timesteps=100000)

# Evaluate
obs = env.reset()
total_reward = 0
for _ in range(1000):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    total_reward += reward
    if done:
        break

print(f"Final portfolio value: ${env.envs[0].prev_value:.2f}")
print(f"Total return: {(env.envs[0].prev_value / 10000 - 1) * 100:.2f}%")

Practical Example: Robotic Arm with Dense Rewards

import pybullet_envs
import gymnasium as gym
from stable_baselines3 import SAC

# Create robotic environment
env = gym.make("KukaBulletEnv-v0")

# Custom reward wrapper
class RoboticArmReward(gym.Wrapper):
    def step(self, action):
        obs, _, terminated, truncated, info = self.env.step(action)

        # Extract state components
        gripper_pos = obs[:3]
        object_pos = obs[3:6]
        target_pos = obs[6:9]
        object_grasped = obs[9] > 0.5

        # Apply dense reward function
        reward = dense_reward(gripper_pos, object_pos, target_pos, object_grasped)

        return obs, reward, terminated, truncated, info

env = RoboticArmReward(env)

# Train with SAC (continuous actions)
model = SAC("MlpPolicy", env, verbose=1, learning_rate=3e-4)
model.learn(total_timesteps=50000)

# Test
obs, _ = env.reset()
for _ in range(500):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, _ = env.step(action)
    env.render()
    if terminated or truncated:
        obs, _ = env.reset()

Debugging Reward Functions

Common debugging techniques:

  1. Reward distribution analysis: Plot histogram of rewards per episode
  2. Component breakdown: Log individual reward components separately
  3. Manual policy testing: Test hand-coded policies to verify reward signal
  4. Ablation studies: Remove reward components one at a time
import matplotlib.pyplot as plt

def analyze_rewards(env, policy, n_episodes=100):
    """Collect and visualize reward statistics."""
    episode_rewards = []
    component_rewards = {"reach": [], "grasp": [], "place": [], "success": []}

    for _ in range(n_episodes):
        obs, _ = env.reset()
        ep_reward = 0
        ep_components = {k: 0 for k in component_rewards.keys()}

        while True:
            action = policy(obs)
            obs, reward, terminated, truncated, info = env.step(action)
            ep_reward += reward

            # Assuming env returns component breakdown in info
            for k in component_rewards.keys():
                ep_components[k] += info.get(f"reward_{k}", 0)

            if terminated or truncated:
                break

        episode_rewards.append(ep_reward)
        for k, v in ep_components.items():
            component_rewards[k].append(v)

    # Visualize
    fig, axes = plt.subplots(2, 1, figsize=(10, 8))

    # Total rewards
    axes[0].hist(episode_rewards, bins=30, alpha=0.7)
    axes[0].set_xlabel("Episode Return")
    axes[0].set_ylabel("Frequency")
    axes[0].set_title("Distribution of Episode Returns")

    # Component breakdown
    components_array = np.array([component_rewards[k] for k in component_rewards.keys()])
    axes[1].boxplot(components_array.T, labels=component_rewards.keys())
    axes[1].set_ylabel("Reward Value")
    axes[1].set_title("Reward Component Breakdown")

    plt.tight_layout()
    plt.savefig("reward_analysis.png")
    print(f"Mean episode return: {np.mean(episode_rewards):.2f} ± {np.std(episode_rewards):.2f}")

Conclusion

Reward engineering remains the critical bottleneck in applied reinforcement learning. This episode covered the theoretical foundation of potential-based reward shaping, domain-specific patterns for financial and robotic applications, common failure modes like reward hacking and sparsity, and modern techniques like intrinsic motivation and curriculum learning.

Key takeaways:
– Always validate that your reward function incentivizes the intended behavior through manual testing
– Use multi-component rewards with explicit terms for constraints and auxiliary objectives
– For sparse reward tasks, consider HER, curriculum learning, or intrinsic motivation
– Monitor reward statistics during training to detect pathologies early
– Multi-objective formulations are preferable to hand-tuned weight combinations

In Part 6, we’ll address the final challenge: bridging the sim-to-real gap when deploying RL agents to physical systems. We’ll explore domain randomization, system identification, and real-world deployment strategies that ensure simulated policies transfer successfully to production environments.

Deep Reinforcement Learning: From Theory to Custom Environments Series (5/6)

Did you find this helpful?

☕ Buy me a coffee

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

TODAY 315 | TOTAL 2,538