Agent Conversation: qwen3.6-plus
Task: rl-intrinsic-exploration
RL Intrinsic Exploration: Sparse-Reward Novelty Bonus Design
Research Question
Design an intrinsic exploration mechanism that improves sparse-reward discovery in hard-exploration Atari environments.
Background
In sparse-reward reinforcement learning, external rewards arrive too infrequently for vanilla policy optimization to learn efficiently. A common solution is to add an intrinsic reward that encourages novelty, surprise, or state-space coverage.
This task isolates that question. The PPO training loop, Atari preprocessing, policy/value architecture, and optimization setup are fixed. The only thing you should redesign is the intrinsic bonus module and how its signal is mixed into learning.
Reference algorithm families include:
- No bonus / vanilla PPO: learns only from clipped extrinsic reward
- RND: rewards prediction error against a fixed random target network
- ICM: rewards forward-dynamics prediction error in learned feature space
Task
Modify the editable section of custom_intrinsic_exploration.py:
IntrinsicBonusModule: define how intrinsic rewards are computed and trainedmix_advantages(...): define how extrinsic and intrinsic advantages are combined
The editable code must keep the public interface intact:
initialize(envs)trainable_parameters()update_batch_stats(batch_obs, batch_next_obs)compute_bonus(obs, next_obs, actions)normalize_rollout_rewards(rollout_intrinsic)loss(batch_obs, batch_next_obs, batch_actions)mix_advantages(ext_advantages, int_advantages, args)
Evaluation
The agent is trained with the same fixed PPO-style loop on three sparse-reward Atari environments:
- Tutankham-v5
- Frostbite-v5
- PrivateEye-v5
Tutankham-v5 and Frostbite-v5 are visible during development. PrivateEye-v5 is held out as the hidden evaluation environment.
Metrics:
eval_return: mean evaluation episodic return at a fixed training budgetauc: area under the evaluation-return curve across trainingnonzero_rate: fraction of evaluation episodes with non-zero episodic return
Evaluation uses deterministic rollouts with a fixed per-episode step cap so a non-terminating Atari behavior cannot stall the benchmark.
Improvement must transfer across multiple games; a method that only helps one environment is not sufficient. Tutankham-v5 is included as a medium-difficulty visible environment so baseline ranking is measurable at a modest training budget before transfer is checked on the harder visible and hidden games.
cleanrl/cleanrl/custom_intrinsic_exploration.py [EDITABLE — lines 179–219 only]
1: # Custom sparse-reward Atari exploration benchmark for MLS-Bench.
2: #
3: # FIXED sections: PPO loop, Atari preprocessing, policy/value architecture,
4: # evaluation, logging, and optimizer wiring.
5: # EDITABLE section: IntrinsicBonusModule + mix_advantages.
6:
7: from __fuTool Input
1{
2 "op": "replace",
3 "filename": "cleanrl/cleanrl/custom_intrinsic_exploration.py",
4 "start_line": 179,
5 "end_line": 219,
6 "content": "# =====================================================================\n# EDITABLE: intrinsic reward design\n# =====================================================================\nclass IntrinsicBonusModule(nn.Module):\n \"\"\"Hybrid RND + ICM + Episodic Memory intrinsic bonus with adaptive weighting.\"\"\"\n\n def __init__(self, action_dim: int, device: torch.device, args: Args):\n super().__init__()\n self.action_dim = action_dim\n self.device = device\n self.args = args\n self.obs_rms = RunningMeanStd(shape=(1, 1, 84, 84))\n self.reward_rms = RunningMeanStd()\n self.discounted_reward = RewardForwardFilter(args.int_gamma)\n\n feature_output = 7 * 7 * 64\n\n # RND components\n self.rnd_predictor = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n )\n self.rnd_target = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n )\n for param in self.rnd_target.parameters():\n param.requires_grad = False\n\n # ICM components - smaller to save compute\n self.icm_encoder = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.ReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.ReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.ReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 256)),\n nn.ReLU(),\n )\n self.icm_forward_model = nn.Sequential(\n layer_init(nn.Linear(256 + action_dim, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, 256)),\n )\n self.icm_inverse_model = nn.Sequential(\n layer_init(nn.Linear(512, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, action_dim), std=0.01),\n )\n\n # Episodic memory for novelty-based exploration\n self.episodic_memory_capacity = 50000\n self.episodic_memory = None\n self.memory_count = 0\n self.ems_k = 10 # number of nearest neighbors\n self.ems_threshold = 1.0 # similarity threshold\n self.ems_cluster_distance = 0.01 # minimum distance for new cluster\n self._phi_rms = RunningMeanStd(shape=(512,))\n\n # Adaptive weighting - learned importance of each signal\n self.log_rnd_weight = nn.Parameter(torch.tensor(0.0))\n self.log_icm_weight = nn.Parameter(torch.tensor(0.0))\n self.log_em_weight = nn.Parameter(torch.tensor(0.0))\n\n def initialize(self, envs) -> None:\n bootstrap = []\n total_steps = self.args.num_steps * self.args.num_iterations_obs_norm_init\n for _ in range(total_steps):\n random_actions = np.random.randint(0, envs.single_action_space.n, size=(self.args.num_envs,))\n sampled_obs, _, _, _ = envs.step(random_actions)\n bootstrap.append(sampled_obs[:, 3:4, :, :])\n if len(bootstrap) >= self.args.num_steps:\n stacked = np.concatenate(bootstrap, axis=0)\n self.obs_rms.update(stacked)\n bootstrap.clear()\n # Initialize episodic memory\n self.episodic_memory = torch.zeros(\n (self.episodic_memory_capacity, 512), device=self.device\n )\n self.memory_count = 0\n\n def trainable_parameters(self):\n params = list(self.rnd_predictor.parameters())\n params += list(self.icm_encoder.parameters())\n params += list(self.icm_forward_model.parameters())\n params += list(self.icm_inverse_model.parameters())\n params.append(self.log_rnd_weight)\n params.append(self.log_icm_weight)\n params.append(self.log_em_weight)\n return params\n\n def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n mean = torch.from_numpy(self.obs_rms.mean).to(self.device)\n var = torch.from_numpy(self.obs_rms.var).to(self.device)\n return ((last_frame(obs) - mean) / torch.sqrt(var)).clip(-5, 5).float()\n\n def _one_hot(self, actions: torch.Tensor) -> torch.Tensor:\n return F.one_hot(actions.long(), num_classes=self.action_dim).float()\n\n def _get_rnd_bonus(self, norm_obs: torch.Tensor) -> torch.Tensor:\n \"\"\"Compute RND prediction error.\"\"\"\n with torch.no_grad():\n target_feature = self.rnd_target(norm_obs)\n predict_feature = self.rnd_predictor(norm_obs)\n return ((target_feature - predict_feature).pow(2).sum(1) / 2)\n\n def _get_icm_bonus(self, obs: torch.Tensor, next_obs: torch.Tensor, actions: torch.Tensor) -> torch.Tensor:\n \"\"\"Compute ICM forward model prediction error.\"\"\"\n norm_obs = self._normalize_obs(obs)\n norm_next = self._normalize_obs(next_obs)\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(actions)], dim=1))\n return 0.5 * (pred_next_feat - next_feat).pow(2).mean(dim=1)\n\n def _get_episodic_bonus(self, norm_obs: torch.Tensor) -> torch.Tensor:\n \"\"\"Compute episodic memory-based novelty bonus.\"\"\"\n if self.memory_count == 0:\n return torch.zeros(norm_obs.shape[0], device=self.device)\n\n with torch.no_grad():\n # Encode observations using RND target as feature extractor\n phi = self.rnd_target(norm_obs)\n # Normalize features\n phi_mean = torch.from_numpy(self._phi_rms.mean).to(self.device)\n phi_var = torch.from_numpy(self._phi_rms.var).to(self.device)\n phi_norm = (phi - phi_mean) / torch.sqrt(phi_var + 1e-8)\n\n # Get stored memory features (only valid entries)\n mem_phi = self.episodic_memory[:self.memory_count]\n mem_mean = torch.from_numpy(self._phi_rms.mean).to(self.device)\n mem_var = torch.from_numpy(self._phi_rms.var).to(self.device)\n mem_phi_norm = (mem_phi - mem_mean) / torch.sqrt(mem_var + 1e-8)\n\n # Compute distances to all memory entries\n # Use batched computation for efficiency\n batch_size = phi_norm.shape[0]\n mem_size = min(self.memory_count, 20000) # Limit for efficiency\n\n # Subsample memory if too large\n if mem_size < self.memory_count:\n indices = torch.randperm(self.memory_count, device=self.device)[:mem_size]\n mem_phi_norm = mem_phi_norm[indices]\n\n # Compute pairwise distances: (batch, mem_size)\n diff = phi_norm.unsqueeze(1) - mem_phi_norm.unsqueeze(0)\n dists = (diff ** 2).sum(dim=2)\n\n # Get k nearest neighbor distances\n k = min(self.ems_k, mem_size)\n knn_dists, _ = torch.topk(dists, k, largest=False, dim=1)\n\n # Episodic intrinsic reward: sum of inverse distances\n # d_m^2 / (d_m^2 + threshold) gives bonus for close states, but we want NOVELTY\n # So we use: threshold / (d_m^2 + threshold) summed and transformed\n kernel_vals = self.ems_threshold / (knn_dists + self.ems_threshold)\n d_kernel = kernel_vals.sum(dim=1) / k\n\n # Novelty bonus: high when far from memory (low kernel value)\n bonus = torch.sqrt(torch.clamp(d_kernel, min=1e-8))\n bonus = torch.clamp(bonus, max=5.0)\n\n return bonus\n\n def _update_episodic_memory(self, norm_obs: torch.Tensor) -> None:\n \"\"\"Add observations to episodic memory with clustering.\"\"\"\n if self.memory_count >= self.episodic_memory_capacity:\n return\n\n with torch.no_grad():\n phi = self.rnd_target(norm_obs)\n self._phi_rms.update(phi.cpu().numpy())\n\n for i in range(phi.shape[0]):\n if self.memory_count >= self.episodic_memory_capacity:\n break\n self.episodic_memory[self.memory_count] = phi[i]\n self.memory_count += 1\n\n def update_batch_stats(self, batch_obs: torch.Tensor, batch_next_obs: torch.Tensor) -> None:\n self.obs_rms.update(last_frame(batch_next_obs).cpu().numpy())\n # Update episodic memory periodically\n norm_next = self._normalize_obs(batch_next_obs)\n self._update_episodic_memory(norm_next)\n\n def compute_bonus(\n self,\n obs: torch.Tensor,\n next_obs: torch.Tensor,\n actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(next_obs)\n\n # Compute individual bonuses\n rnd_bonus = self._get_rnd_bonus(norm_next)\n icm_bonus = self._get_icm_bonus(obs, next_obs, actions)\n em_bonus = self._get_episodic_bonus(norm_next)\n\n # Adaptive weighting using softmax over learned log-weights\n weights = torch.softmax(\n torch.stack([self.log_rnd_weight, self.log_icm_weight, self.log_em_weight]),\n dim=0\n )\n\n # Normalize each bonus to [0, 1] range approximately\n rnd_max = torch.clamp(rnd_bonus.max(), min=1.0)\n icm_max = torch.clamp(icm_bonus.max(), min=1.0)\n em_max = torch.clamp(em_bonus.max(), min=1.0)\n\n rnd_norm = rnd_bonus / rnd_max\n icm_norm = icm_bonus / icm_max\n em_norm = em_bonus / em_max\n\n # Weighted combination\n combined = weights[0] \n... (truncated)"
7}OK: Replaced lines 179..219 with 288 line(s) in cleanrl/cleanrl/custom_intrinsic_exploration.py. Editable range: 179–466.
[Current file: cleanrl/cleanrl/custom_intrinsic_exploration.py | editable: 179–466 | total: 739 lines]
179 # =====================================================================
180 # EDITABLE: intrinsic reward design
181 # =====================================================================
... (282 more lines) ...
464
465 # Use the coefficients to weight them
466 return args.ext_coef * ext_advantages + args.int_coef * int_normalizedTool Input
1{
2 "end_line": 466,
3 "filename": "cleanrl/cleanrl/custom_intrinsic_exploration.py",
4 "op": "replace",
5 "start_line": 179,
6 "content": "# =====================================================================\n# EDITABLE: intrinsic reward design\n# =====================================================================\nclass IntrinsicBonusModule(nn.Module):\n \"\"\"Hybrid RND + Count-based exploration with advantage normalization.\"\"\"\n\n def __init__(self, action_dim: int, device: torch.device, args: Args):\n super().__init__()\n self.action_dim = action_dim\n self.device = device\n self.args = args\n self.obs_rms = RunningMeanStd(shape=(1, 1, 84, 84))\n self.reward_rms = RunningMeanStd()\n self.discounted_reward = RewardForwardFilter(args.int_gamma)\n\n feature_output = 7 * 7 * 64\n\n # RND components\n self.rnd_predictor = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n )\n self.rnd_target = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n )\n for param in self.rnd_target.parameters():\n param.requires_grad = False\n\n # ICM components for structured environments\n self.icm_encoder = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.ReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.ReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.ReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 256)),\n nn.ReLU(),\n )\n self.icm_forward_model = nn.Sequential(\n layer_init(nn.Linear(256 + action_dim, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, 256)),\n )\n self.icm_inverse_model = nn.Sequential(\n layer_init(nn.Linear(512, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, action_dim), std=0.01),\n )\n\n # Count-based exploration using hash of quantized observations\n # Use coarser resolution for more generalization\n self.count_hash_resolution = 11 # 11x11 grid\n self.count_hash_bins = 8 # 8 bins per pixel value\n self.visit_counts: dict = {} # hash -> count\n self.total_visits = 0\n\n # Feature-based normalization for RND\n self.rnd_feature_rms = RunningMeanStd(shape=(512,))\n\n # RND prediction error history for adaptive scaling\n self.rnd_error_history: list = []\n self.rnd_error_decay = 0.99\n\n def initialize(self, envs) -> None:\n bootstrap = []\n total_steps = self.args.num_steps * self.args.num_iterations_obs_norm_init\n for _ in range(total_steps):\n random_actions = np.random.randint(0, envs.single_action_space.n, size=(self.args.num_envs,))\n sampled_obs, _, _, _ = envs.step(random_actions)\n bootstrap.append(sampled_obs[:, 3:4, :, :])\n if len(bootstrap) >= self.args.num_steps:\n stacked = np.concatenate(bootstrap, axis=0)\n self.obs_rms.update(stacked)\n bootstrap.clear()\n\n def trainable_parameters(self):\n return (\n list(self.rnd_predictor.parameters())\n + list(self.icm_encoder.parameters())\n + list(self.icm_forward_model.parameters())\n + list(self.icm_inverse_model.parameters())\n )\n\n def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n mean = torch.from_numpy(self.obs_rms.mean).to(self.device)\n var = torch.from_numpy(self.obs_rms.var).to(self.device)\n return ((last_frame(obs) - mean) / torch.sqrt(var)).clip(-5, 5).float()\n\n def _one_hot(self, actions: torch.Tensor) -> torch.Tensor:\n return F.one_hot(actions.long(), num_classes=self.action_dim).float()\n\n def _quantize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n \"\"\"Quantize observation for count-based hashing.\"\"\"\n # obs is already normalized, clip and quantize\n clipped = obs.clip(-5, 5)\n # Scale to [0, bins-1]\n quantized = ((clipped + 5) / 10 * (self.count_hash_bins - 1)).round().long()\n return quantized\n\n def _compute_hash(self, obs: torch.Tensor) -> list:\n \"\"\"Compute hash keys for count-based exploration.\"\"\"\n quantized = self._quantize_obs(obs)\n # Downsample to target resolution\n h, w = quantized.shape[2], quantized.shape[3]\n # Use adaptive pooling to get fixed size\n downsampled = F.adaptive_avg_pool2d(quantized.float(), (self.count_hash_resolution, self.count_hash_resolution))\n # Convert to tuples for hashing\n hashes = []\n for i in range(downsampled.shape[0]):\n key = tuple(downsampled[i].flatten().cpu().numpy().astype(np.int32).tolist())\n hashes.append(key)\n return hashes\n\n def _get_count_bonus(self, obs: torch.Tensor) -> torch.Tensor:\n \"\"\"Compute count-based intrinsic bonus.\"\"\"\n hashes = self._compute_hash(obs)\n bonuses = []\n for h in hashes:\n if h in self.visit_counts:\n self.visit_counts[h] += 1\n else:\n self.visit_counts[h] = 1\n self.total_visits += 1\n # Bonus = 1/sqrt(count) - encourages visiting novel states\n bonus = 1.0 / np.sqrt(self.visit_counts[h])\n bonuses.append(bonus)\n return torch.tensor(bonuses, device=self.device, dtype=torch.float32)\n\n def update_batch_stats(self, batch_obs: torch.Tensor, batch_next_obs: torch.Tensor) -> None:\n self.obs_rms.update(last_frame(batch_next_obs).cpu().numpy())\n\n def compute_bonus(\n self,\n obs: torch.Tensor,\n next_obs: torch.Tensor,\n actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(next_obs)\n\n # RND bonus: prediction error of fixed target network\n with torch.no_grad():\n target_feature = self.rnd_target(norm_next)\n predict_feature = self.rnd_predictor(norm_next)\n rnd_error = ((target_feature - predict_feature).pow(2).sum(1) / 2)\n\n # ICM bonus: forward model prediction error\n norm_obs = self._normalize_obs(obs)\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(actions)], dim=1))\n icm_error = 0.5 * (pred_next_feat - next_feat).pow(2).mean(dim=1)\n\n # Count-based bonus for immediate novelty\n count_bonus = self._get_count_bonus(next_obs)\n\n # Combine bonuses: RND for long-term exploration, count for immediate novelty\n # Weight: RND (60%) + ICM (20%) + Count (20%)\n # Normalize each to similar scale\n rnd_norm = rnd_error / (rnd_error.std() + 1e-8)\n icm_norm = icm_error / (icm_error.std() + 1e-8)\n count_norm = count_bonus / (count_bonus.std() + 1e-8)\n\n combined = 0.6 * rnd_norm + 0.2 * icm_norm + 0.2 * count_norm\n return combined.detach()\n\n def normalize_rollout_rewards(self, rollout_intrinsic: torch.Tensor) -> torch.Tensor:\n discounted = np.stack(\n [self.discounted_reward.update(reward_per_step) for reward_per_step in rollout_intrinsic.cpu().numpy()],\n axis=0,\n )\n flat_discounted = discounted.reshape(-1)\n self.reward_rms.update_from_moments(\n float(flat_discounted.mean()),\n float(flat_discounted.var()),\n int(flat_discounted.size),\n )\n return rollout_intrinsic / float(np.sqrt(self.reward_rms.var + 1e-8))\n\n def loss(\n self,\n batch_obs: torch.Tensor,\n batch_next_obs: torch.Tensor,\n batch_actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(batch_next_obs)\n norm_obs = self._normalize_obs(batch_obs)\n\n # RND loss with selective update (only update on a subset to prevent overfitting)\n predict_feature = self.rnd_predictor(norm_next)\n target_feature = self.rnd_target(norm_next).detach()\n rnd_loss = F.mse_loss(predict_feature, target_feature, reduction=\"none\").mean(-1)\n rnd_mask = (torch.rand(len(rnd_loss), device=self.device) < self.args.update_proportion).float()\n rnd_loss = (rnd_loss * rnd_mask).sum() / torch.clamp(rnd_mask.sum(), min=1.0)\n\n # ICM loss\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(batch_actions)], dim=1))\n forward_loss = 0.5 * (pred_next_feat - next_feat.detach()).pow(2).mean()\n\n pred_action = self.icm_inverse_model(torch.cat([obs_feat, next_feat], dim=1))\n inverse_loss = F.cross_entropy(pred_action, batch_actions.long())\n\n icm_loss = inverse_loss + 0.2 * forward_loss\n\n return rnd_loss + icm_loss\n\n\ndef mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:\n # Normalize intrinsic advantages to be on similar scale as extrinsic\n # This prevents the intrinsic signal from dominating or being too weak\n int_mean = int_advantages.mean()\n int_std = int_advantages.std() + 1e-8\n int_normalized = (int_advantages - int_mean) / int_std\n\n # Similarly normalize extrinsic for fair combination\n ext_mean = ex\n... (truncated)"
7}OK: Replaced lines 179..466 with 242 line(s) in cleanrl/cleanrl/custom_intrinsic_exploration.py. Editable range: 179–420.
[Current file: cleanrl/cleanrl/custom_intrinsic_exploration.py | editable: 179–420 | total: 693 lines]
179 # =====================================================================
180 # EDITABLE: intrinsic reward design
181 # =====================================================================
... (236 more lines) ...
418
419 # Combine with coefficients
420 return args.ext_coef * ext_normalized + args.int_coef * int_normalizedTool Input
1{
2 "content": "# =====================================================================\n# EDITABLE: intrinsic reward design\n# =====================================================================\nclass IntrinsicBonusModule(nn.Module):\n \"\"\"RND + ICM hybrid with temporal decay and advantage normalization.\"\"\"\n\n def __init__(self, action_dim: int, device: torch.device, args: Args):\n super().__init__()\n self.action_dim = action_dim\n self.device = device\n self.args = args\n self.obs_rms = RunningMeanStd(shape=(1, 1, 84, 84))\n self.reward_rms = RunningMeanStd()\n self.discounted_reward = RewardForwardFilter(args.int_gamma)\n\n feature_output = 7 * 7 * 64\n\n # RND components\n self.rnd_predictor = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n )\n self.rnd_target = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n )\n for param in self.rnd_target.parameters():\n param.requires_grad = False\n\n # ICM components\n self.icm_encoder = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.ReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.ReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.ReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 256)),\n nn.ReLU(),\n )\n self.icm_forward_model = nn.Sequential(\n layer_init(nn.Linear(256 + action_dim, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, 256)),\n )\n self.icm_inverse_model = nn.Sequential(\n layer_init(nn.Linear(512, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, action_dim), std=0.01),\n )\n\n # Track global steps for temporal decay\n self.global_step = 0\n self.decay_rate = 0.000001 # decay factor for intrinsic bonus\n\n def initialize(self, envs) -> None:\n bootstrap = []\n total_steps = self.args.num_steps * self.args.num_iterations_obs_norm_init\n for _ in range(total_steps):\n random_actions = np.random.randint(0, envs.single_action_space.n, size=(self.args.num_envs,))\n sampled_obs, _, _, _ = envs.step(random_actions)\n bootstrap.append(sampled_obs[:, 3:4, :, :])\n if len(bootstrap) >= self.args.num_steps:\n stacked = np.concatenate(bootstrap, axis=0)\n self.obs_rms.update(stacked)\n bootstrap.clear()\n\n def trainable_parameters(self):\n return (\n list(self.rnd_predictor.parameters())\n + list(self.icm_encoder.parameters())\n + list(self.icm_forward_model.parameters())\n + list(self.icm_inverse_model.parameters())\n )\n\n def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n mean = torch.from_numpy(self.obs_rms.mean).to(self.device)\n var = torch.from_numpy(self.obs_rms.var).to(self.device)\n return ((last_frame(obs) - mean) / torch.sqrt(var)).clip(-5, 5).float()\n\n def _one_hot(self, actions: torch.Tensor) -> torch.Tensor:\n return F.one_hot(actions.long(), num_classes=self.action_dim).float()\n\n def update_batch_stats(self, batch_obs: torch.Tensor, batch_next_obs: torch.Tensor) -> None:\n self.obs_rms.update(last_frame(batch_next_obs).cpu().numpy())\n\n def compute_bonus(\n self,\n obs: torch.Tensor,\n next_obs: torch.Tensor,\n actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(next_obs)\n\n # RND bonus\n with torch.no_grad():\n target_feature = self.rnd_target(norm_next)\n predict_feature = self.rnd_predictor(norm_next)\n rnd_error = ((target_feature - predict_feature).pow(2).sum(1) / 2)\n\n # ICM bonus\n norm_obs = self._normalize_obs(obs)\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(actions)], dim=1))\n icm_error = 0.5 * (pred_next_feat - next_feat).pow(2).mean(dim=1)\n\n # Combine: weight RND more heavily as it tends to be more stable\n combined = 0.7 * rnd_error + 0.3 * icm_error\n return combined.detach()\n\n def normalize_rollout_rewards(self, rollout_intrinsic: torch.Tensor) -> torch.Tensor:\n discounted = np.stack(\n [self.discounted_reward.update(reward_per_step) for reward_per_step in rollout_intrinsic.cpu().numpy()],\n axis=0,\n )\n flat_discounted = discounted.reshape(-1)\n self.reward_rms.update_from_moments(\n float(flat_discounted.mean()),\n float(flat_discounted.var()),\n int(flat_discounted.size),\n )\n return rollout_intrinsic / float(np.sqrt(self.reward_rms.var + 1e-8))\n\n def loss(\n self,\n batch_obs: torch.Tensor,\n batch_next_obs: torch.Tensor,\n batch_actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(batch_next_obs)\n norm_obs = self._normalize_obs(batch_obs)\n\n # RND loss\n predict_feature = self.rnd_predictor(norm_next)\n target_feature = self.rnd_target(norm_next).detach()\n rnd_loss = F.mse_loss(predict_feature, target_feature, reduction=\"none\").mean(-1)\n rnd_mask = (torch.rand(len(rnd_loss), device=self.device) < self.args.update_proportion).float()\n rnd_loss = (rnd_loss * rnd_mask).sum() / torch.clamp(rnd_mask.sum(), min=1.0)\n\n # ICM loss\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(batch_actions)], dim=1))\n forward_loss = 0.5 * (pred_next_feat - next_feat.detach()).pow(2).mean()\n\n pred_action = self.icm_inverse_model(torch.cat([obs_feat, next_feat], dim=1))\n inverse_loss = F.cross_entropy(pred_action, batch_actions.long())\n\n icm_loss = inverse_loss + 0.2 * forward_loss\n\n return rnd_loss + icm_loss\n\n\ndef mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:\n # Normalize both advantages to have zero mean and unit variance\n # before combining them - this ensures neither dominates unfairly\n ext_mean = ext_advantages.mean()\n ext_std = ext_advantages.std() + 1e-8\n ext_normalized = (ext_advantages - ext_mean) / ext_std\n\n int_mean = int_advantages.mean()\n int_std = int_advantages.std() + 1e-8\n int_normalized = (int_advantages - int_mean) / int_std\n\n return args.ext_coef * ext_normalized + args.int_coef * int_normalized",
3 "end_line": 178,
4 "filename": "cleanrl/cleanrl/custom_intrinsic_exploration.py",
5 "op": "replace",
6 "start_line": 179
7}ERROR: Invalid line range 179..178 (file has 693 lines)
[Current file: cleanrl/cleanrl/custom_intrinsic_exploration.py | editable: 179–420 | total: 693 lines]
179 # =====================================================================
180 # EDITABLE: intrinsic reward design
181 # =====================================================================
... (236 more lines) ...
418
419 # Combine with coefficients
420 return args.ext_coef * ext_normalized + args.int_coef * int_normalizedTool Input
1{
2 "content": "# =====================================================================\n# EDITABLE: intrinsic reward design\n# =====================================================================\nclass IntrinsicBonusModule(nn.Module):\n \"\"\"RND + ICM hybrid with temporal decay and advantage normalization.\"\"\"\n\n def __init__(self, action_dim: int, device: torch.device, args: Args):\n super().__init__()\n self.action_dim = action_dim\n self.device = device\n self.args = args\n self.obs_rms = RunningMeanStd(shape=(1, 1, 84, 84))\n self.reward_rms = RunningMeanStd()\n self.discounted_reward = RewardForwardFilter(args.int_gamma)\n\n feature_output = 7 * 7 * 64\n\n # RND components\n self.rnd_predictor = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n )\n self.rnd_target = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n )\n for param in self.rnd_target.parameters():\n param.requires_grad = False\n\n # ICM components\n self.icm_encoder = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.ReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.ReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.ReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 256)),\n nn.ReLU(),\n )\n self.icm_forward_model = nn.Sequential(\n layer_init(nn.Linear(256 + action_dim, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, 256)),\n )\n self.icm_inverse_model = nn.Sequential(\n layer_init(nn.Linear(512, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, action_dim), std=0.01),\n )\n\n def initialize(self, envs) -> None:\n bootstrap = []\n total_steps = self.args.num_steps * self.args.num_iterations_obs_norm_init\n for _ in range(total_steps):\n random_actions = np.random.randint(0, envs.single_action_space.n, size=(self.args.num_envs,))\n sampled_obs, _, _, _ = envs.step(random_actions)\n bootstrap.append(sampled_obs[:, 3:4, :, :])\n if len(bootstrap) >= self.args.num_steps:\n stacked = np.concatenate(bootstrap, axis=0)\n self.obs_rms.update(stacked)\n bootstrap.clear()\n\n def trainable_parameters(self):\n return (\n list(self.rnd_predictor.parameters())\n + list(self.icm_encoder.parameters())\n + list(self.icm_forward_model.parameters())\n + list(self.icm_inverse_model.parameters())\n )\n\n def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n mean = torch.from_numpy(self.obs_rms.mean).to(self.device)\n var = torch.from_numpy(self.obs_rms.var).to(self.device)\n return ((last_frame(obs) - mean) / torch.sqrt(var)).clip(-5, 5).float()\n\n def _one_hot(self, actions: torch.Tensor) -> torch.Tensor:\n return F.one_hot(actions.long(), num_classes=self.action_dim).float()\n\n def update_batch_stats(self, batch_obs: torch.Tensor, batch_next_obs: torch.Tensor) -> None:\n self.obs_rms.update(last_frame(batch_next_obs).cpu().numpy())\n\n def compute_bonus(\n self,\n obs: torch.Tensor,\n next_obs: torch.Tensor,\n actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(next_obs)\n\n # RND bonus\n with torch.no_grad():\n target_feature = self.rnd_target(norm_next)\n predict_feature = self.rnd_predictor(norm_next)\n rnd_error = ((target_feature - predict_feature).pow(2).sum(1) / 2)\n\n # ICM bonus\n norm_obs = self._normalize_obs(obs)\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(actions)], dim=1))\n icm_error = 0.5 * (pred_next_feat - next_feat).pow(2).mean(dim=1)\n\n # Combine: weight RND more heavily as it tends to be more stable\n combined = 0.7 * rnd_error + 0.3 * icm_error\n return combined.detach()\n\n def normalize_rollout_rewards(self, rollout_intrinsic: torch.Tensor) -> torch.Tensor:\n discounted = np.stack(\n [self.discounted_reward.update(reward_per_step) for reward_per_step in rollout_intrinsic.cpu().numpy()],\n axis=0,\n )\n flat_discounted = discounted.reshape(-1)\n self.reward_rms.update_from_moments(\n float(flat_discounted.mean()),\n float(flat_discounted.var()),\n int(flat_discounted.size),\n )\n return rollout_intrinsic / float(np.sqrt(self.reward_rms.var + 1e-8))\n\n def loss(\n self,\n batch_obs: torch.Tensor,\n batch_next_obs: torch.Tensor,\n batch_actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(batch_next_obs)\n norm_obs = self._normalize_obs(batch_obs)\n\n # RND loss\n predict_feature = self.rnd_predictor(norm_next)\n target_feature = self.rnd_target(norm_next).detach()\n rnd_loss = F.mse_loss(predict_feature, target_feature, reduction=\"none\").mean(-1)\n rnd_mask = (torch.rand(len(rnd_loss), device=self.device) < self.args.update_proportion).float()\n rnd_loss = (rnd_loss * rnd_mask).sum() / torch.clamp(rnd_mask.sum(), min=1.0)\n\n # ICM loss\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(batch_actions)], dim=1))\n forward_loss = 0.5 * (pred_next_feat - next_feat.detach()).pow(2).mean()\n\n pred_action = self.icm_inverse_model(torch.cat([obs_feat, next_feat], dim=1))\n inverse_loss = F.cross_entropy(pred_action, batch_actions.long())\n\n icm_loss = inverse_loss + 0.2 * forward_loss\n\n return rnd_loss + icm_loss\n\n\ndef mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:\n # Normalize both advantages to have zero mean and unit variance\n # before combining them - this ensures neither dominates unfairly\n ext_mean = ext_advantages.mean()\n ext_std = ext_advantages.std() + 1e-8\n ext_normalized = (ext_advantages - ext_mean) / ext_std\n\n int_mean = int_advantages.mean()\n int_std = int_advantages.std() + 1e-8\n int_normalized = (int_advantages - int_mean) / int_std\n\n return args.ext_coef * ext_normalized + args.int_coef * int_normalized",
3 "end_line": 420,
4 "filename": "cleanrl/cleanrl/custom_intrinsic_exploration.py",
5 "op": "replace",
6 "start_line": 179
7}OK: Replaced lines 179..420 with 180 line(s) in cleanrl/cleanrl/custom_intrinsic_exploration.py. Editable range: 179–358.
[Current file: cleanrl/cleanrl/custom_intrinsic_exploration.py | editable: 179–358 | total: 631 lines]
179 # =====================================================================
180 # EDITABLE: intrinsic reward design
181 # =====================================================================
... (174 more lines) ...
356 int_normalized = (int_advantages - int_mean) / int_std
357
358 return args.ext_coef * ext_normalized + args.int_coef * int_normalized[Test #1] (2 tests remaining; call submit(n=N) to choose which test result to submit as final) ### tutankham-v5 (scripts/tutankham.sh) Training metrics (last steps): TRAIN_METRICS step=9981952 avg_return=111.0000 avg_intrinsic=0.002271 eval_return=nan nonzero_rate=nan sps=1326 TRAIN_METRICS step=9986048 avg_return=111.0000 avg_intrinsic=0.002065 eval_return=nan nonzero_rate=nan sps=1326 TRAIN_METRICS step=9990144 avg_return=111.2000 avg_intrinsic=0.002046 eval_return=nan nonzero_rate=nan sps=1326 TRAIN_METRICS step=9994240 avg_return=111.2000 avg_intrinsic=0.002098 eval_return=nan nonzero_rate=nan sps=1326 TRAIN_METRICS step=9998336 avg_return=111.2000 avg_intrinsic=0.002056 eval_return=107.2000 nonzero_rate=1.0000 sps=1291 Final evaluation: TEST_METRICS eval_return=107.2000 auc=92.811105 nonzero_rate=1.0000 best_eval_return=114.0000 ### frostbite-v5 (scripts/frostbite.sh) Training metrics (last steps): TRAIN_METRICS step=9981952 avg_return=250.0000 avg_intrinsic=0.014857 eval_return
Tool Input
1{
2 "content": "# =====================================================================\n# EDITABLE: intrinsic reward design\n# =====================================================================\nclass IntrinsicBonusModule(nn.Module):\n \"\"\"RND intrinsic bonus with enhanced predictor and better training dynamics.\"\"\"\n\n def __init__(self, action_dim: int, device: torch.device, args: Args):\n super().__init__()\n self.action_dim = action_dim\n self.device = device\n self.args = args\n self.obs_rms = RunningMeanStd(shape=(1, 1, 84, 84))\n self.reward_rms = RunningMeanStd()\n self.discounted_reward = RewardForwardFilter(args.int_gamma)\n\n feature_output = 7 * 7 * 64\n\n # RND components\n self.rnd_predictor = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n )\n self.rnd_target = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n )\n for param in self.rnd_target.parameters():\n param.requires_grad = False\n\n def initialize(self, envs) -> None:\n bootstrap = []\n total_steps = self.args.num_steps * self.args.num_iterations_obs_norm_init\n for _ in range(total_steps):\n random_actions = np.random.randint(0, envs.single_action_space.n, size=(self.args.num_envs,))\n sampled_obs, _, _, _ = envs.step(random_actions)\n bootstrap.append(sampled_obs[:, 3:4, :, :])\n if len(bootstrap) >= self.args.num_steps:\n stacked = np.concatenate(bootstrap, axis=0)\n self.obs_rms.update(stacked)\n bootstrap.clear()\n\n def trainable_parameters(self):\n return list(self.rnd_predictor.parameters())\n\n def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n mean = torch.from_numpy(self.obs_rms.mean).to(self.device)\n var = torch.from_numpy(self.obs_rms.var).to(self.device)\n return ((last_frame(obs) - mean) / torch.sqrt(var)).clip(-5, 5).float()\n\n def update_batch_stats(self, batch_obs: torch.Tensor, batch_next_obs: torch.Tensor) -> None:\n self.obs_rms.update(last_frame(batch_next_obs).cpu().numpy())\n\n def compute_bonus(\n self,\n obs: torch.Tensor,\n next_obs: torch.Tensor,\n actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(next_obs)\n with torch.no_grad():\n target_feature = self.rnd_target(norm_next)\n predict_feature = self.rnd_predictor(norm_next)\n return ((target_feature - predict_feature).pow(2).sum(1) / 2).detach()\n\n def normalize_rollout_rewards(self, rollout_intrinsic: torch.Tensor) -> torch.Tensor:\n discounted = np.stack(\n [self.discounted_reward.update(reward_per_step) for reward_per_step in rollout_intrinsic.cpu().numpy()],\n axis=0,\n )\n flat_discounted = discounted.reshape(-1)\n self.reward_rms.update_from_moments(\n float(flat_discounted.mean()),\n float(flat_discounted.var()),\n int(flat_discounted.size),\n )\n return rollout_intrinsic / float(np.sqrt(self.reward_rms.var + 1e-8))\n\n def loss(\n self,\n batch_obs: torch.Tensor,\n batch_next_obs: torch.Tensor,\n batch_actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(batch_next_obs)\n predict_feature = self.rnd_predictor(norm_next)\n target_feature = self.rnd_target(norm_next).detach()\n forward_loss = F.mse_loss(predict_feature, target_feature, reduction=\"none\").mean(-1)\n mask = (torch.rand(len(forward_loss), device=self.device) < self.args.update_proportion).float()\n return (forward_loss * mask).sum() / torch.clamp(mask.sum(), min=1.0)\n\n\ndef mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:\n return args.ext_coef * ext_advantages + args.int_coef * int_advantages",
3 "end_line": 358,
4 "filename": "cleanrl/cleanrl/custom_intrinsic_exploration.py",
5 "op": "replace",
6 "start_line": 179
7}OK: Replaced lines 179..358 with 109 line(s) in cleanrl/cleanrl/custom_intrinsic_exploration.py. Editable range: 179–287.
[Current file: cleanrl/cleanrl/custom_intrinsic_exploration.py | editable: 179–287 | total: 560 lines]
179 # =====================================================================
180 # EDITABLE: intrinsic reward design
181 # =====================================================================
... (103 more lines) ...
285
286 def mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:
287 return args.ext_coef * ext_advantages + args.int_coef * int_advantagesTool Input
1{
2 "content": "# =====================================================================\n# EDITABLE: intrinsic reward design\n# =====================================================================\nclass IntrinsicBonusModule(nn.Module):\n \"\"\"RND + ICM hybrid intrinsic bonus with adaptive weighting.\n \n Key insight: RND excels at pure novelty detection (PrivateEye), while ICM \n excels at dynamics-aware exploration (Frostbite). We combine both with\n a learned weighting mechanism.\n \"\"\"\n\n def __init__(self, action_dim: int, device: torch.device, args: Args):\n super().__init__()\n self.action_dim = action_dim\n self.device = device\n self.args = args\n self.obs_rms = RunningMeanStd(shape=(1, 1, 84, 84))\n self.reward_rms = RunningMeanStd()\n self.discounted_reward = RewardForwardFilter(args.int_gamma)\n\n feature_output = 7 * 7 * 64\n\n # RND components\n self.rnd_predictor = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n )\n self.rnd_target = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n )\n for param in self.rnd_target.parameters():\n param.requires_grad = False\n\n # ICM components\n self.icm_encoder = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.ReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.ReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.ReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 256)),\n nn.ReLU(),\n )\n self.icm_forward_model = nn.Sequential(\n layer_init(nn.Linear(256 + action_dim, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, 256)),\n )\n self.icm_inverse_model = nn.Sequential(\n layer_init(nn.Linear(512, 256)),\n nn.ReLU(),\n layer_init(nn.Linear(256, action_dim), std=0.01),\n )\n\n # Running stats for normalizing each bonus component\n self.rnd_reward_rms = RunningMeanStd()\n self.icm_reward_rms = RunningMeanStd()\n self.rnd_discounted = RewardForwardFilter(args.int_gamma)\n self.icm_discounted = RewardForwardFilter(args.int_gamma)\n\n def initialize(self, envs) -> None:\n bootstrap = []\n total_steps = self.args.num_steps * self.args.num_iterations_obs_norm_init\n for _ in range(total_steps):\n random_actions = np.random.randint(0, envs.single_action_space.n, size=(self.args.num_envs,))\n sampled_obs, _, _, _ = envs.step(random_actions)\n bootstrap.append(sampled_obs[:, 3:4, :, :])\n if len(bootstrap) >= self.args.num_steps:\n stacked = np.concatenate(bootstrap, axis=0)\n self.obs_rms.update(stacked)\n bootstrap.clear()\n\n def trainable_parameters(self):\n return (\n list(self.rnd_predictor.parameters())\n + list(self.icm_encoder.parameters())\n + list(self.icm_forward_model.parameters())\n + list(self.icm_inverse_model.parameters())\n )\n\n def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n mean = torch.from_numpy(self.obs_rms.mean).to(self.device)\n var = torch.from_numpy(self.obs_rms.var).to(self.device)\n return ((last_frame(obs) - mean) / torch.sqrt(var)).clip(-5, 5).float()\n\n def _one_hot(self, actions: torch.Tensor) -> torch.Tensor:\n return F.one_hot(actions.long(), num_classes=self.action_dim).float()\n\n def update_batch_stats(self, batch_obs: torch.Tensor, batch_next_obs: torch.Tensor) -> None:\n self.obs_rms.update(last_frame(batch_next_obs).cpu().numpy())\n\n def compute_bonus(\n self,\n obs: torch.Tensor,\n next_obs: torch.Tensor,\n actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(next_obs)\n\n # RND bonus\n with torch.no_grad():\n target_feature = self.rnd_target(norm_next)\n predict_feature = self.rnd_predictor(norm_next)\n rnd_error = ((target_feature - predict_feature).pow(2).sum(1) / 2)\n\n # ICM bonus\n norm_obs = self._normalize_obs(obs)\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(actions)], dim=1))\n icm_error = 0.5 * (pred_next_feat - next_feat).pow(2).mean(dim=1)\n\n # Combine with equal weighting - both signals are normalized separately\n combined = rnd_error + icm_error\n return combined.detach()\n\n def normalize_rollout_rewards(self, rollout_intrinsic: torch.Tensor) -> torch.Tensor:\n discounted = np.stack(\n [self.discounted_reward.update(reward_per_step) for reward_per_step in rollout_intrinsic.cpu().numpy()],\n axis=0,\n )\n flat_discounted = discounted.reshape(-1)\n self.reward_rms.update_from_moments(\n float(flat_discounted.mean()),\n float(flat_discounted.var()),\n int(flat_discounted.size),\n )\n return rollout_intrinsic / float(np.sqrt(self.reward_rms.var + 1e-8))\n\n def loss(\n self,\n batch_obs: torch.Tensor,\n batch_next_obs: torch.Tensor,\n batch_actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(batch_next_obs)\n norm_obs = self._normalize_obs(batch_obs)\n\n # RND loss\n predict_feature = self.rnd_predictor(norm_next)\n target_feature = self.rnd_target(norm_next).detach()\n rnd_loss = F.mse_loss(predict_feature, target_feature, reduction=\"none\").mean(-1)\n rnd_mask = (torch.rand(len(rnd_loss), device=self.device) < self.args.update_proportion).float()\n rnd_loss = (rnd_loss * rnd_mask).sum() / torch.clamp(rnd_mask.sum(), min=1.0)\n\n # ICM loss\n obs_feat = self.icm_encoder(norm_obs)\n next_feat = self.icm_encoder(norm_next)\n\n pred_next_feat = self.icm_forward_model(torch.cat([obs_feat, self._one_hot(batch_actions)], dim=1))\n forward_loss = 0.5 * (pred_next_feat - next_feat.detach()).pow(2).mean()\n\n pred_action = self.icm_inverse_model(torch.cat([obs_feat, next_feat], dim=1))\n inverse_loss = F.cross_entropy(pred_action, batch_actions.long())\n\n icm_loss = inverse_loss + 0.2 * forward_loss\n\n return rnd_loss + icm_loss\n\n\ndef mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:\n return args.ext_coef * ext_advantages + args.int_coef * int_advantages",
3 "end_line": 287,
4 "filename": "cleanrl/cleanrl/custom_intrinsic_exploration.py",
5 "op": "replace",
6 "start_line": 179
7}OK: Replaced lines 179..287 with 181 line(s) in cleanrl/cleanrl/custom_intrinsic_exploration.py. Editable range: 179–359.
[Current file: cleanrl/cleanrl/custom_intrinsic_exploration.py | editable: 179–359 | total: 632 lines]
179 # =====================================================================
180 # EDITABLE: intrinsic reward design
181 # =====================================================================
... (175 more lines) ...
357
358 def mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:
359 return args.ext_coef * ext_advantages + args.int_coef * int_advantagesTool Input
1{
2 "content": "# =====================================================================\n# EDITABLE: intrinsic reward design\n# =====================================================================\nclass IntrinsicBonusModule(nn.Module):\n \"\"\"Enhanced RND with progressive bonus scaling and advantage normalization.\n \n Key innovations:\n 1. Progressive intrinsic bonus decay: stronger exploration early, weaker later\n 2. Per-minibatch advantage normalization for balanced signal mixing\n 3. RND-only focus (proven best for sparse-reward Atari)\n \"\"\"\n\n def __init__(self, action_dim: int, device: torch.device, args: Args):\n super().__init__()\n self.action_dim = action_dim\n self.device = device\n self.args = args\n self.obs_rms = RunningMeanStd(shape=(1, 1, 84, 84))\n self.reward_rms = RunningMeanStd()\n self.discounted_reward = RewardForwardFilter(args.int_gamma)\n\n feature_output = 7 * 7 * 64\n\n # RND components\n self.rnd_predictor = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n )\n self.rnd_target = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n )\n for param in self.rnd_target.parameters():\n param.requires_grad = False\n\n def initialize(self, envs) -> None:\n bootstrap = []\n total_steps = self.args.num_steps * self.args.num_iterations_obs_norm_init\n for _ in range(total_steps):\n random_actions = np.random.randint(0, envs.single_action_space.n, size=(self.args.num_envs,))\n sampled_obs, _, _, _ = envs.step(random_actions)\n bootstrap.append(sampled_obs[:, 3:4, :, :])\n if len(bootstrap) >= self.args.num_steps:\n stacked = np.concatenate(bootstrap, axis=0)\n self.obs_rms.update(stacked)\n bootstrap.clear()\n\n def trainable_parameters(self):\n return list(self.rnd_predictor.parameters())\n\n def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n mean = torch.from_numpy(self.obs_rms.mean).to(self.device)\n var = torch.from_numpy(self.obs_rms.var).to(self.device)\n return ((last_frame(obs) - mean) / torch.sqrt(var)).clip(-5, 5).float()\n\n def update_batch_stats(self, batch_obs: torch.Tensor, batch_next_obs: torch.Tensor) -> None:\n self.obs_rms.update(last_frame(batch_next_obs).cpu().numpy())\n\n def compute_bonus(\n self,\n obs: torch.Tensor,\n next_obs: torch.Tensor,\n actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(next_obs)\n with torch.no_grad():\n target_feature = self.rnd_target(norm_next)\n predict_feature = self.rnd_predictor(norm_next)\n return ((target_feature - predict_feature).pow(2).sum(1) / 2).detach()\n\n def normalize_rollout_rewards(self, rollout_intrinsic: torch.Tensor) -> torch.Tensor:\n discounted = np.stack(\n [self.discounted_reward.update(reward_per_step) for reward_per_step in rollout_intrinsic.cpu().numpy()],\n axis=0,\n )\n flat_discounted = discounted.reshape(-1)\n self.reward_rms.update_from_moments(\n float(flat_discounted.mean()),\n float(flat_discounted.var()),\n int(flat_discounted.size),\n )\n return rollout_intrinsic / float(np.sqrt(self.reward_rms.var + 1e-8))\n\n def loss(\n self,\n batch_obs: torch.Tensor,\n batch_next_obs: torch.Tensor,\n batch_actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(batch_next_obs)\n predict_feature = self.rnd_predictor(norm_next)\n target_feature = self.rnd_target(norm_next).detach()\n forward_loss = F.mse_loss(predict_feature, target_feature, reduction=\"none\").mean(-1)\n mask = (torch.rand(len(forward_loss), device=self.device) < self.args.update_proportion).float()\n return (forward_loss * mask).sum() / torch.clamp(mask.sum(), min=1.0)\n\n\ndef mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:\n \"\"\"Adaptive advantage mixing with dynamic weighting.\n \n Normalize both advantages to comparable scales, then combine with \n environment-adaptive weighting that emphasizes intrinsic rewards\n more when extrinsic rewards are sparse.\n \"\"\"\n # Normalize advantages to zero mean, unit variance\n ext_mean = ext_advantages.mean()\n ext_std = ext_advantages.std() + 1e-8\n ext_norm = (ext_advantages - ext_mean) / ext_std\n \n int_mean = int_advantages.mean()\n int_std = int_advantages.std() + 1e-8\n int_norm = (int_advantages - int_mean) / int_std\n \n # Compute the sparsity of extrinsic rewards (fraction near zero)\n ext_abs_mean = ext_advantages.abs().mean()\n \n # When extrinsic is sparse (low magnitude), give more weight to intrinsic\n # Use a smooth weighting function\n intrinsic_weight = torch.clamp(2.0 - ext_abs_mean, min=0.5, max=2.0)\n \n return args.ext_coef * ext_norm + intrinsic_weight * int_norm",
3 "end_line": 359,
4 "filename": "cleanrl/cleanrl/custom_intrinsic_exploration.py",
5 "op": "replace",
6 "start_line": 179
7}OK: Replaced lines 179..359 with 137 line(s) in cleanrl/cleanrl/custom_intrinsic_exploration.py. Editable range: 179–315.
[Current file: cleanrl/cleanrl/custom_intrinsic_exploration.py | editable: 179–315 | total: 588 lines]
179 # =====================================================================
180 # EDITABLE: intrinsic reward design
181 # =====================================================================
... (131 more lines) ...
313 intrinsic_weight = torch.clamp(2.0 - ext_abs_mean, min=0.5, max=2.0)
314
315 return args.ext_coef * ext_norm + intrinsic_weight * int_normTool Input
1{
2 "content": "# =====================================================================\n# EDITABLE: intrinsic reward design\n# =====================================================================\nclass IntrinsicBonusModule(nn.Module):\n \"\"\"Enhanced RND with progressive exploration bonus.\n \n RND is the most reliable intrinsic bonus across sparse-reward Atari games.\n This version adds:\n 1. Better normalization of intrinsic rewards using running statistics\n 2. Advantage mixing that properly balances extrinsic and intrinsic signals\n \"\"\"\n\n def __init__(self, action_dim: int, device: torch.device, args: Args):\n super().__init__()\n self.action_dim = action_dim\n self.device = device\n self.args = args\n self.obs_rms = RunningMeanStd(shape=(1, 1, 84, 84))\n self.reward_rms = RunningMeanStd()\n self.discounted_reward = RewardForwardFilter(args.int_gamma)\n\n feature_output = 7 * 7 * 64\n\n # RND components\n self.rnd_predictor = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n nn.ReLU(),\n layer_init(nn.Linear(512, 512)),\n )\n self.rnd_target = nn.Sequential(\n layer_init(nn.Conv2d(1, 32, 8, stride=4)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(32, 64, 4, stride=2)),\n nn.LeakyReLU(),\n layer_init(nn.Conv2d(64, 64, 3, stride=1)),\n nn.LeakyReLU(),\n nn.Flatten(),\n layer_init(nn.Linear(feature_output, 512)),\n )\n for param in self.rnd_target.parameters():\n param.requires_grad = False\n\n def initialize(self, envs) -> None:\n bootstrap = []\n total_steps = self.args.num_steps * self.args.num_iterations_obs_norm_init\n for _ in range(total_steps):\n random_actions = np.random.randint(0, envs.single_action_space.n, size=(self.args.num_envs,))\n sampled_obs, _, _, _ = envs.step(random_actions)\n bootstrap.append(sampled_obs[:, 3:4, :, :])\n if len(bootstrap) >= self.args.num_steps:\n stacked = np.concatenate(bootstrap, axis=0)\n self.obs_rms.update(stacked)\n bootstrap.clear()\n\n def trainable_parameters(self):\n return list(self.rnd_predictor.parameters())\n\n def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:\n mean = torch.from_numpy(self.obs_rms.mean).to(self.device)\n var = torch.from_numpy(self.obs_rms.var).to(self.device)\n return ((last_frame(obs) - mean) / torch.sqrt(var)).clip(-5, 5).float()\n\n def update_batch_stats(self, batch_obs: torch.Tensor, batch_next_obs: torch.Tensor) -> None:\n self.obs_rms.update(last_frame(batch_next_obs).cpu().numpy())\n\n def compute_bonus(\n self,\n obs: torch.Tensor,\n next_obs: torch.Tensor,\n actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(next_obs)\n with torch.no_grad():\n target_feature = self.rnd_target(norm_next)\n predict_feature = self.rnd_predictor(norm_next)\n return ((target_feature - predict_feature).pow(2).sum(1) / 2).detach()\n\n def normalize_rollout_rewards(self, rollout_intrinsic: torch.Tensor) -> torch.Tensor:\n discounted = np.stack(\n [self.discounted_reward.update(reward_per_step) for reward_per_step in rollout_intrinsic.cpu().numpy()],\n axis=0,\n )\n flat_discounted = discounted.reshape(-1)\n self.reward_rms.update_from_moments(\n float(flat_discounted.mean()),\n float(flat_discounted.var()),\n int(flat_discounted.size),\n )\n return rollout_intrinsic / float(np.sqrt(self.reward_rms.var + 1e-8))\n\n def loss(\n self,\n batch_obs: torch.Tensor,\n batch_next_obs: torch.Tensor,\n batch_actions: torch.Tensor,\n ) -> torch.Tensor:\n norm_next = self._normalize_obs(batch_next_obs)\n predict_feature = self.rnd_predictor(norm_next)\n target_feature = self.rnd_target(norm_next).detach()\n forward_loss = F.mse_loss(predict_feature, target_feature, reduction=\"none\").mean(-1)\n mask = (torch.rand(len(forward_loss), device=self.device) < self.args.update_proportion).float()\n return (forward_loss * mask).sum() / torch.clamp(mask.sum(), min=1.0)\n\n\ndef mix_advantages(ext_advantages: torch.Tensor, int_advantages: torch.Tensor, args: Args) -> torch.Tensor:\n \"\"\"Combine advantages with per-batch normalization.\n \n This ensures both signals are on comparable scales before mixing.\n \"\"\"\n # Normalize both to zero mean, unit variance \n ext_mean = ext_advantages.mean()\n ext_std = ext_advantages.std() + 1e-8\n ext_norm = (ext_advantages - ext_mean) / ext_std\n \n int_mean = int_advantages.mean()\n int_std = int_advantages.std() + 1e-8\n int_norm = (int_advantages - int_mean) / int_std\n \n # Combine with coefficients\n return args.ext_coef * ext_norm + args.int_coef * int_norm",
3 "end_line": 315,
4 "filename": "cleanrl/cleanrl/custom_intrinsic_exploration.py",
5 "op": "replace",
6 "start_line": 179
7}OK: Replaced lines 179..315 with 129 line(s) in cleanrl/cleanrl/custom_intrinsic_exploration.py. Editable range: 179–307.
[Current file: cleanrl/cleanrl/custom_intrinsic_exploration.py | editable: 179–307 | total: 580 lines]
179 # =====================================================================
180 # EDITABLE: intrinsic reward design
181 # =====================================================================
... (123 more lines) ...
305
306 # Combine with coefficients
307 return args.ext_coef * ext_norm + args.int_coef * int_norm