Introduction
In Part 1, we explored Markov Decision Processes (MDPs) as the theoretical foundation of reinforcement learning. Now, we’ll bridge theory and practice by building custom RL environments using OpenAI Gymnasium (the actively maintained successor to the original Gym library). Whether you’re training agents for trading strategies, robotics, or game AI, mastering custom environment creation is essential for real-world RL applications.
This hands-on guide walks you through the Gymnasium API, demonstrates how to build a stock trading environment from scratch, and covers advanced topics like environment wrappers and vectorization for parallel training.
Why Custom Environments?
While Gymnasium provides classic benchmarks like CartPole and MountainCar, real-world problems rarely fit pre-built environments. Custom environments let you:
- Model domain-specific problems: Trading strategies, industrial control, healthcare optimization
- Control state/action representations: Design exactly what your agent observes and can do
- Engineer reward functions: Shape agent behavior toward your business objectives
- Test hypotheses: Experiment with environment dynamics before deploying to production
The Gymnasium API: Core Components
Every Gymnasium environment inherits from gymnasium.Env and implements five essential methods:
Required Methods
| Method | Purpose | Returns |
|---|---|---|
__init__() |
Initialize environment, define spaces | None |
reset() |
Reset to initial state | (observation, info) |
step(action) |
Execute action, transition state | (obs, reward, terminated, truncated, info) |
render() |
Visualize current state (optional) | None or render output |
close() |
Cleanup resources | None |
Space Definitions
Two critical attributes define your environment’s interface:
observation_space: Defines what the agent sees. Common types:
– Box(low, high, shape): Continuous vectors (e.g., prices, positions)
– Discrete(n): Integer values
– MultiDiscrete([n1, n2, ...]): Multiple discrete variables
– Dict({"key": space, ...}): Structured observations
action_space: Defines what the agent can do:
– Discrete(n): Choose one of actions
– Box(low, high, shape): Continuous control (e.g., torque, trade size)
Building a Stock Trading Environment
Let’s create TradingEnv: an environment where an agent learns to buy/sell stocks to maximize portfolio value.
Design Decisions
State representation: Portfolio value, cash, shares held, recent price history, technical indicators (RSI, moving averages)
Action space: Discrete(3) — {0: Hold, 1: Buy, 2: Sell}
Reward function: Change in portfolio value at each step
Implementation: Basic Structure
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
class TradingEnv(gym.Env):
"""Custom stock trading environment for RL agents."""
metadata = {"render_modes": ["human"], "render_fps": 4}
def __init__(self, df, initial_balance=10000, commission=0.001, render_mode=None):
super().__init__()
# Store price data (DataFrame with columns: Open, High, Low, Close, Volume)
self.df = df
self.commission = commission # Transaction cost (0.1%)
self.initial_balance = initial_balance
self.render_mode = render_mode
# Action space: 0=Hold, 1=Buy, 2=Sell
self.action_space = spaces.Discrete(3)
# Observation space: [balance, shares, price, RSI, MA_20, MA_50]
# All normalized to [0, 10] range for stable learning
self.observation_space = spaces.Box(
low=0, high=10, shape=(6,), dtype=np.float32
)
self.current_step = 0
self.balance = initial_balance
self.shares_held = 0
self.total_shares_bought = 0
self.total_shares_sold = 0
def _calculate_indicators(self, idx):
"""Calculate RSI and moving averages for current step."""
# RSI (Relative Strength Index)
window = min(14, idx + 1)
prices = self.df['Close'].iloc[max(0, idx-window):idx+1]
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).mean()
loss = (-delta.where(delta < 0, 0)).mean()
rs = gain / loss if loss != 0 else 0
rsi = 100 - (100 / (1 + rs))
# Moving averages
ma_20 = self.df['Close'].iloc[max(0, idx-19):idx+1].mean()
ma_50 = self.df['Close'].iloc[max(0, idx-49):idx+1].mean()
return rsi, ma_20, ma_50
def _get_observation(self):
"""Construct state vector from current environment state."""
current_price = self.df['Close'].iloc[self.current_step]
rsi, ma_20, ma_50 = self._calculate_indicators(self.current_step)
# Normalize observation values to [0, 10] range
obs = np.array([
self.balance / self.initial_balance * 5, # Balance ratio
self.shares_held / 100, # Shares held (scaled)
current_price / 100, # Current price (scaled)
rsi / 10, # RSI normalized
ma_20 / 100, # 20-day MA (scaled)
ma_50 / 100 # 50-day MA (scaled)
], dtype=np.float32)
return np.clip(obs, 0, 10) # Ensure bounds
def reset(self, seed=None, options=None):
"""Reset environment to initial state."""
super().reset(seed=seed)
self.current_step = 0
self.balance = self.initial_balance
self.shares_held = 0
self.total_shares_bought = 0
self.total_shares_sold = 0
observation = self._get_observation()
info = {"balance": self.balance, "shares": self.shares_held}
return observation, info
def step(self, action):
"""Execute action and return next state."""
current_price = self.df['Close'].iloc[self.current_step]
prev_portfolio_value = self.balance + self.shares_held * current_price
# Execute action
if action == 1: # Buy
shares_to_buy = int(self.balance / (current_price * (1 + self.commission)))
if shares_to_buy > 0:
cost = shares_to_buy * current_price * (1 + self.commission)
self.balance -= cost
self.shares_held += shares_to_buy
self.total_shares_bought += shares_to_buy
elif action == 2: # Sell
if self.shares_held > 0:
revenue = self.shares_held * current_price * (1 - self.commission)
self.balance += revenue
self.total_shares_sold += self.shares_held
self.shares_held = 0
# Move to next time step
self.current_step += 1
# Check if episode ended
terminated = self.current_step >= len(self.df) - 1
truncated = False # No time limit for now
# Calculate reward (change in portfolio value)
current_price = self.df['Close'].iloc[self.current_step] if not terminated else current_price
current_portfolio_value = self.balance + self.shares_held * current_price
reward = current_portfolio_value - prev_portfolio_value
observation = self._get_observation() if not terminated else self._get_observation()
info = {
"balance": self.balance,
"shares": self.shares_held,
"portfolio_value": current_portfolio_value,
"total_bought": self.total_shares_bought,
"total_sold": self.total_shares_sold
}
return observation, reward, terminated, truncated, info
def render(self):
"""Print current state (human-readable)."""
if self.render_mode == "human":
current_price = self.df['Close'].iloc[self.current_step]
portfolio_value = self.balance + self.shares_held * current_price
profit = portfolio_value - self.initial_balance
print(f"Step: {self.current_step}")
print(f"Price: ${current_price:.2f}")
print(f"Balance: ${self.balance:.2f}")
print(f"Shares: {self.shares_held}")
print(f"Portfolio Value: ${portfolio_value:.2f}")
print(f"Profit: ${profit:.2f} ({profit/self.initial_balance*100:.2f}%)")
print("-" * 40)
Understanding the Reward Function
The reward at step is defined as:
Where (portfolio value) is:
- : Available cash
- : Number of shares held
- : Current stock price
This delta reward encourages the agent to maximize portfolio growth. Alternative reward designs:
- Sharpe ratio: (risk-adjusted returns)
- Sparse rewards: Only reward at episode end (harder to learn)
- Shaped rewards: Add intermediate incentives (e.g., penalize excessive trading)
Environment Registration and Usage
Register your environment for easy instantiation:
from gymnasium.envs.registration import register
register(
id='TradingEnv-v0',
entry_point='trading_env:TradingEnv', # module_name:ClassName
max_episode_steps=252, # One trading year
)
# Now you can create it like built-in envs
env = gym.make('TradingEnv-v0', df=stock_data)
Testing Your Environment
import yfinance as yf
# Download real stock data
stock_data = yf.download('AAPL', start='2020-01-01', end='2023-12-31')
# Create environment
env = TradingEnv(stock_data, initial_balance=10000)
# Random agent test
observation, info = env.reset()
for _ in range(100):
action = env.action_space.sample() # Random action
observation, reward, terminated, truncated, info = env.step(action)
env.render()
if terminated or truncated:
observation, info = env.reset()
env.close()
Environment Wrappers: Enhancing Functionality
Gymnasium wrappers modify environment behavior without changing core logic. Think of them as decorators for environments.
Common Built-in Wrappers
TimeLimit: Enforce maximum episode length
from gymnasium.wrappers import TimeLimit
env = TradingEnv(stock_data)
env = TimeLimit(env, max_episode_steps=500) # Auto-truncate after 500 steps
NormalizeObservation: Maintain running mean/std of observations
from gymnasium.wrappers import NormalizeObservation
env = NormalizeObservation(TradingEnv(stock_data))
# Observations automatically normalized to ~N(0,1)
RecordVideo: Save episode videos
from gymnasium.wrappers import RecordVideo
env = RecordVideo(
TradingEnv(stock_data, render_mode="rgb_array"),
video_folder="./videos",
episode_trigger=lambda x: x % 10 == 0 # Record every 10th episode
)
Custom Wrapper Example: Portfolio Normalization
class NormalizePortfolio(gym.ObservationWrapper):
"""Normalize portfolio value to [0, 1] range."""
def __init__(self, env):
super().__init__(env)
# Update observation space bounds
self.observation_space = spaces.Box(
low=0, high=1, shape=env.observation_space.shape, dtype=np.float32
)
def observation(self, obs):
"""Normalize observation vector."""
# Divide each component by its expected maximum
normalized = obs / 10.0 # Our original space was [0, 10]
return normalized.astype(np.float32)
# Usage
env = TradingEnv(stock_data)
env = NormalizePortfolio(env)
Vectorized Environments: Parallel Training
Training on multiple environment instances simultaneously drastically speeds up data collection. Gymnasium provides vectorization utilities.
SubprocVecEnv: Multi-Process Parallelization
from gymnasium.vector import SyncVectorEnv, AsyncVectorEnv
# Create 8 parallel environments
def make_env():
def _init():
return TradingEnv(stock_data)
return _init
env_fns = [make_env() for _ in range(8)]
vec_env = AsyncVectorEnv(env_fns) # Asynchronous execution
# Now step() returns batched results
observations, infos = vec_env.reset()
for _ in range(1000):
actions = vec_env.action_space.sample() # Shape: (8,)
observations, rewards, terminateds, truncateds, infos = vec_env.step(actions)
# observations.shape: (8, 6)
# rewards.shape: (8,)
vec_env.close()
Performance comparison (steps/second on typical hardware):
| Configuration | Speed | Use Case |
|---|---|---|
| Single env | 1000 | Debugging, simple tasks |
| SyncVectorEnv (4 envs) | 3500 | CPU-bound environments |
| AsyncVectorEnv (8 envs) | 7000 | I/O-bound or slow environments |
Integration with Stable Baselines3
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
# Create vectorized environment
env = SubprocVecEnv([make_env() for _ in range(4)])
# Train agent
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=100000)
# Evaluate
obs = env.reset()
for _ in range(1000):
action, _states = model.predict(obs, deterministic=True)
obs, rewards, dones, info = env.step(action)
Debugging and Validation
Gymnasium provides env_checker to catch common implementation errors:
from gymnasium.utils.env_checker import check_env
env = TradingEnv(stock_data)
try:
check_env(env, warn=True, skip_render_check=True)
print("Environment passed all checks!")
except Exception as e:
print(f"Environment check failed: {e}")
Common issues detected:
– Observation/action outside defined space bounds
– reset() not returning correct tuple format
– step() returning wrong number of values
– Incorrect dtype in spaces
– Mismatch between actual and declared observation shape
Manual Testing Checklist
# 1. Space consistency
obs, info = env.reset()
assert env.observation_space.contains(obs), "Observation out of bounds!"
# 2. Action validation
for action in range(env.action_space.n):
obs, reward, term, trunc, info = env.step(action)
assert isinstance(reward, (int, float)), "Reward must be scalar!"
assert isinstance(term, bool), "Terminated must be boolean!"
# 3. Episode termination
for _ in range(10000): # Force termination
obs, reward, term, trunc, info = env.step(env.action_space.sample())
if term or trunc:
print(f"Episode ended after {env.current_step} steps")
break
# 4. Deterministic reset
env.reset(seed=42)
obs1, _ = env.reset(seed=42)
obs2, _ = env.reset(seed=42)
assert np.allclose(obs1, obs2), "Reset not deterministic!"
Advanced Topics: Stochastic Environments
Real markets have randomness. Add stochasticity to make training more robust:
class StochasticTradingEnv(TradingEnv):
"""Trading environment with random price shocks."""
def step(self, action):
obs, reward, terminated, truncated, info = super().step(action)
# Add random market shock with 5% probability
if self.np_random.random() < 0.05:
shock = self.np_random.normal(0, 0.02) # ±2% volatility
current_price = self.df['Close'].iloc[self.current_step]
self.df.loc[self.current_step, 'Close'] *= (1 + shock)
return obs, reward, terminated, truncated, info
This models market microstructure noise where prices deviate from fundamentals temporarily. Agents must learn to handle uncertainty.
Best Practices Summary
| Principle | Implementation |
|---|---|
| Bounded spaces | Always use Box(low, high) with finite bounds |
| Normalized observations | Scale to [0,1] or [-1,1] for stable learning |
| Reward scaling | Keep rewards in [-10, 10] range initially |
| Deterministic reset | Support seed parameter for reproducibility |
| Fast step() | Avoid heavy computation; precompute when possible |
| Informative info dict | Return diagnostic data for analysis |
| Wrapper composition | Chain wrappers for modular functionality |
| Vectorization | Use 4-16 parallel envs for sample efficiency |
Conclusion
You’ve now mastered the fundamentals of custom Gymnasium environment creation:
- Understanding the
EnvAPI and space definitions - Building a realistic stock trading environment with technical indicators
- Designing reward functions that align with your objectives
- Leveraging wrappers for observation normalization and video recording
- Accelerating training with vectorized environments
- Validating implementations with
env_checker
In Part 3, we’ll dive into agent selection strategies: comparing policy gradient methods (PPO, A2C) against Q-learning approaches (DQN, SAC) and understanding when to use each for your custom environments. You’ll learn how to match algorithm characteristics to environment properties for optimal performance.
The code examples in this tutorial are production-ready starting points—experiment with different state representations, action spaces, and reward functions to tackle your specific RL challenges. Happy environment building!
Did you find this helpful?
☕ Buy me a coffee
Leave a Reply