Agent Conversation: deepseek-reasoner
Task: optimization-online-bandit
Online Bandits: Exploration-Exploitation Strategy Design
Objective
Design and implement a bandit policy that minimizes cumulative regret across diverse multi-armed bandit settings. Your code goes in custom_bandit.py. Three reference implementations (UCB1, Thompson Sampling, KL-UCB) are available as read-only in the SMPyBandits package.
Background
The multi-armed bandit problem is a fundamental model for the exploration-exploitation tradeoff in sequential decision-making. At each round, an agent selects one of K arms and observes a stochastic reward. The goal is to minimize cumulative regret -- the gap between the reward of the best arm (in hindsight) and the agent's actual reward.
Classic algorithms include:
- UCB1 (Auer et al., 2002): plays the arm with the highest upper confidence bound on its mean reward, achieving O(sqrt(KT log T)) minimax regret.
- Thompson Sampling (Thompson 1933; Agrawal & Goyal 2012): samples from a Bayesian posterior and plays the arm with the highest sample, achieving optimal Bayesian regret.
- KL-UCB (Garivier & Cappe 2011; Cappe et al. 2013): uses Kullback-Leibler divergence for tighter confidence bounds, provably optimal for Bernoulli bandits.
Key challenges include adapting to different reward distributions, handling contextual information, and detecting non-stationarity.
Task
Modify the BanditPolicy class in custom_bandit.py (the EDITABLE section). You must implement:
__init__(K, context_dim): initialize your policy for K arms with optional contextselect_arm(t, context): choose which arm to pull at timestep tupdate(arm, reward, context): update internal state after observing a rewardreset(): reset state for a new run
Interface
class BanditPolicy:
def __init__(self, K: int, context_dim: int = 0): ...
def reset(self): ...
def select_arm(self, t: int, context: np.ndarray | None = None) -> int: ...
def update(self, arm: int, reward: float, context: np.ndarray | None = None): ...
Available utilities (in the FIXED section):
kl_bernoulli(p, q): KL divergence between Bernoulli distributionskl_ucb_bound(mu_hat, n, t, c): computes the KL-UCB upper confidence bound
Evaluation
Evaluated on three bandit settings (lower regret is better):
- Stochastic MAB: 10-armed Bernoulli bandit, T=10,000 rounds. Arms have fixed reward probabilities.
- Contextual: 5-armed linear contextual bandit with d=10 features, T=10,000 rounds. Expected reward is a linear function of the context.
- Non-stationary: 5-armed piece-wise stationary Bernoulli bandit with 4 abrupt changepoints, T=10,000 rounds. The best arm changes over time.
Metric: normalized cumulative regret = (cumulative regret) / T.
SMPyBandits/custom_bandit.py [EDITABLE — lines 261–321 only]
1: # Custom online bandit algorithm for MLS-Bench
2: #
3: # EDITABLE section: BanditPolicy class — the exploration-Tool Input
1{
2 "op": "replace",
3 "filename": "SMPyBandits/custom_bandit.py",
4 "start_line": 261,
5 "end_line": 321,
6 "content": "class BanditPolicy:\n \"\"\"Hybrid KL-UCB + Linear Thompson Sampling with change detection.\n \n For contextual bandits (context_dim > 0): uses Linear Thompson Sampling (LinTS).\n For stochastic bandits: uses KL-UCB with sliding-window change detection.\n \"\"\"\n\n def __init__(self, K: int, context_dim: int = 0):\n self.K = K\n self.context_dim = context_dim\n self.counts = np.zeros(K, dtype=np.float64)\n self.rewards = np.zeros(K, dtype=np.float64)\n \n # For contextual bandits (LinTS)\n if context_dim > 0:\n self._lambda = 1.0 # regularization\n self._v2 = 0.25 # variance scaling\n # Sufficient statistics for each arm: B_inv, f, theta_hat\n self._B_inv = np.array([np.eye(context_dim) / self._lambda \n for _ in range(K)])\n self._f = np.zeros((K, context_dim), dtype=np.float64)\n self._theta_hat = np.zeros((K, context_dim), dtype=np.float64)\n self._rng = np.random.default_rng(42)\n \n # For non-stationary detection (sliding window)\n self._W = 800 # window size\n self._buf_arms = np.zeros(self._W, dtype=np.int32)\n self._buf_rewards = np.zeros(self._W, dtype=np.float64)\n self._buf_ptr = 0\n self._buf_full = False\n self._sw_counts = np.zeros(K, dtype=np.float64)\n self._sw_rewards = np.zeros(K, dtype=np.float64)\n \n # Change detection\n self._best_arm_history = []\n self._change_detected = False\n self._reset_threshold = 0.1 # threshold for mean drop\n \n # For KL-UCB\n self._c = 1.0 # exploration parameter\n\n def reset(self):\n \"\"\"Reset internal state for a new run.\"\"\"\n self.counts[:] = 0\n self.rewards[:] = 0\n if self.context_dim > 0:\n d = self.context_dim\n for a in range(self.K):\n self._B_inv[a] = np.eye(d) / self._lambda\n self._f[a] = np.zeros(d)\n self._theta_hat[a] = np.zeros(d)\n \n # Reset sliding window\n self._buf_ptr = 0\n self._buf_full = False\n self._sw_counts[:] = 0\n self._sw_rewards[:] = 0\n \n self._best_arm_history = []\n self._change_detected = False\n\n def select_arm(self, t: int, context: np.ndarray | None = None) -> int:\n \"\"\"Select which arm to pull at timestep t.\"\"\"\n # Initial round-robin exploration\n if t < self.K:\n return t\n \n # Contextual bandit: use LinTS\n if context is not None and self.context_dim > 0:\n return self._lints_select(context)\n \n # Non-stationary detection: if change detected, use sliding window stats\n if self._change_detected or self._buf_full:\n return self._sw_select(t)\n \n # Standard KL-UCB index\n return self._kl_ucb_select(t)\n\n def _kl_ucb_select(self, t: int) -> int:\n \"\"\"Select arm using KL-UCB index.\"\"\"\n best_arm = 0\n best_idx = -np.inf\n for a in range(self.K):\n if self.counts[a] == 0:\n return a # explore unexplored arm\n mu_hat = self.rewards[a] / self.counts[a]\n idx = kl_ucb_bound(mu_hat, int(self.counts[a]), t, self._c)\n if idx > best_idx:\n best_idx = idx\n best_arm = a\n return best_arm\n\n def _sw_select(self, t: int) -> int:\n \"\"\"Sliding-window KL-UCB selection.\"\"\"\n # If any arm not pulled in window, explore it\n unpulled = self._sw_counts == 0\n if unpulled.any():\n return int(np.argmax(unpulled))\n \n best_arm = 0\n best_idx = -np.inf\n for a in range(self.K):\n mu_hat = self._sw_rewards[a] / self._sw_counts[a]\n idx = kl_ucb_bound(mu_hat, int(self._sw_counts[a]), t, self._c)\n if idx > best_idx:\n best_idx = idx\n best_arm = a\n return best_arm\n\n def _lints_select(self, context: np.ndarray) -> int:\n \"\"\"Linear Thompson Sampling for contextual bandits.\"\"\"\n best_arm = 0\n best_val = -np.inf\n for a in range(self.K):\n # Sample theta from posterior N(theta_hat, v2 * B_inv)\n # Use Cholesky decomposition for sampling\n try:\n L = np.linalg.cholesky(self._B_inv[a] * self._v2)\n except np.linalg.LinAlgError:\n L = np.eye(self.context_dim) * np.sqrt(self._v2)\n z = self._rng.standard_normal(self.context_dim)\n theta_sample = self._theta_hat[a] + L @ z\n val = theta_sample @ context\n if val > best_val:\n best_val = val\n best_arm = a\n return best_arm\n\n def update(self, arm: int, reward: float, context: np.ndarray | None = None):\n \"\"\"Update internal state after observing a reward.\"\"\"\n # Update cumulative statistics\n self.counts[arm] += 1\n self.rewards[arm] += reward\n \n # Update sliding window buffer\n if self._buf_full:\n old_arm = int(self._buf_arms[self._buf_ptr])\n old_reward = self._buf_rewards[self._buf_ptr]\n self._sw_counts[old_arm] -= 1\n self._sw_rewards[old_arm] -= old_reward\n \n self._buf_arms[self._buf_ptr] = arm\n self._buf_rewards[self._buf_ptr] = reward\n self._sw_counts[arm] += 1\n self._sw_rewards[arm] += reward\n \n self._buf_ptr += 1\n if self._buf_ptr >= self._W:\n self._buf_ptr = 0\n self._buf_full = True\n \n # Update contextual statistics if needed\n if context is not None and self.context_dim > 0:\n self._update_lints(arm, reward, context)\n \n # Change detection (simple heuristic)\n self._detect_change(arm, reward)\n\n def _update_lints(self, arm: int, reward: float, context: np.ndarray):\n \"\"\"Update LinTS statistics via Sherman-Morrison.\"\"\"\n B_inv_a = self._B_inv[arm]\n f_a = self._f[arm]\n \n # Sherman-Morrison update: B_inv = B_inv - (B_inv x x^T B_inv) / (1 + x^T B_inv x)\n x = context.reshape(-1, 1)\n Bx = B_inv_a @ x\n denom = 1.0 + (x.T @ Bx).item()\n self._B_inv[arm] = B_inv_a - (Bx @ Bx.T) / denom\n \n # Update f and theta_hat\n self._f[arm] = f_a + reward * context\n self._theta_hat[arm] = self._B_inv[arm] @ self._f[arm]\n\n def _detect_change(self, arm: int, reward: float):\n \"\"\"Simple change detection based on best arm performance.\"\"\"\n if self.counts[arm] < 100:\n return\n \n # Track recent performance of current best arm\n best_arm = np.argmax(self.rewards / np.maximum(self.counts, 1e-10))\n if arm == best_arm:\n self._best_arm_history.append(reward)\n if len(self._best_arm_history) > 100:\n self._best_arm_history.pop(0)\n \n if len(self._best_arm_history) == 100:\n recent_mean = np.mean(self._best_arm_history[-50:])\n old_mean = np.mean(self._best_arm_history[:50])\n if old_mean - recent_mean > self._reset_threshold:\n self._change_detected = True\n # Reset sliding window stats to force fresh exploration\n self._sw_counts[:] = 0\n self._sw_rewards[:] = 0\n self._buf_full = False\n self._buf_ptr = 0"
7}OK: Replaced lines 261..321 with 195 line(s) in SMPyBandits/custom_bandit.py. Editable range: 261–455. [Current file: SMPyBandits/custom_bandit.py | editable: 261–455 | total: 529 lines] 258| # ===================================================================== 259| # EDITABLE: BanditPolicy 260| # ===================================================================== 261 class BanditPolicy: 262 """Hybrid KL-UCB + Linear Thompson Sampling with change detection. 263 264 For contextual bandits (context_dim > 0): uses Linear Thompson Sampling (LinTS). 265 For stochastic bandits: uses KL-UCB with sliding-window change detection. 266 """ 267 268 def __init__(self, K: int, context_dim: int = 0): 269 self.K = K 270 self.context_dim = context_dim 271 self.counts = np.zeros(K, dtype=np.float64) 272 self.rewards = np.zeros(K, dtype=np.float64) 273 274 # For
Tool Input
1{
2 "op": "replace",
3 "filename": "SMPyBandits/custom_bandit.py",
4 "start_line": 261,
5 "end_line": 455,
6 "content": "class BanditPolicy:\n \"\"\"Adaptive KL-UCB with Linear Contextual Extension (AKL-Lin).\n \n For contextual bandits: uses linear UCB with ridge regression.\n For stochastic bandits: uses KL-UCB with sliding window for non-stationarity.\n \"\"\"\n\n def __init__(self, K: int, context_dim: int = 0):\n self.K = K\n self.context_dim = context_dim\n self.counts = np.zeros(K)\n self.rewards = np.zeros(K)\n \n if context_dim > 0:\n # Linear UCB: ridge regression\n self._lambda = 1.0\n self._A = np.array([np.eye(context_dim) * self._lambda for _ in range(K)])\n self._b = np.zeros((K, context_dim))\n self._theta = np.zeros((K, context_dim))\n else:\n # Sliding window for non-stationary detection\n self._W = 800\n self._buf_arms = np.zeros(self._W, dtype=int)\n self._buf_rewards = np.zeros(self._W)\n self._buf_ptr = 0\n self._buf_full = False\n self._sw_counts = np.zeros(K)\n self._sw_rewards = np.zeros(K)\n\n def reset(self):\n self.counts[:] = 0\n self.rewards[:] = 0\n if self.context_dim > 0:\n d = self.context_dim\n for a in range(self.K):\n self._A[a] = np.eye(d) * self._lambda\n self._b[a] = np.zeros(d)\n self._theta[a] = np.zeros(d)\n else:\n self._buf_ptr = 0\n self._buf_full = False\n self._sw_counts[:] = 0\n self._sw_rewards[:] = 0\n\n def select_arm(self, t: int, context: np.ndarray | None = None) -> int:\n # Initial round-robin\n if t < self.K:\n return t\n \n if context is not None and self.context_dim > 0:\n return self._linucb_select(context, t)\n else:\n return self._klucb_select(t)\n\n def _linucb_select(self, context: np.ndarray, t: int) -> int:\n best_arm = 0\n best_val = -np.inf\n for a in range(self.K):\n theta_a = self._theta[a]\n # Ridge regression estimate\n A_inv = np.linalg.inv(self._A[a])\n theta = A_inv @ self._b[a]\n self._theta[a] = theta # cache\n # UCB: theta^T x + alpha * sqrt(x^T A_inv x)\n x = context\n mean = theta @ x\n var = x @ A_inv @ x\n alpha = np.sqrt(2 * np.log(t+1))\n val = mean + alpha * np.sqrt(var)\n if val > best_val:\n best_val = val\n best_arm = a\n return best_arm\n\n def _klucb_select(self, t: int) -> int:\n # Use sliding window if full\n if self._buf_full:\n counts = self._sw_counts\n rewards = self._sw_rewards\n else:\n counts = self.counts\n rewards = self.rewards\n \n best_arm = 0\n best_idx = -np.inf\n for a in range(self.K):\n if counts[a] == 0:\n return a\n mu_hat = rewards[a] / counts[a]\n idx = kl_ucb_bound(mu_hat, int(counts[a]), t, c=1.0)\n if idx > best_idx:\n best_idx = idx\n best_arm = a\n return best_arm\n\n def update(self, arm: int, reward: float, context: np.ndarray | None = None):\n self.counts[arm] += 1\n self.rewards[arm] += reward\n \n if context is not None and self.context_dim > 0:\n # Update ridge regression\n x = context.reshape(-1, 1)\n self._A[arm] += x @ x.T\n self._b[arm] += reward * context\n else:\n # Update sliding window\n if self._buf_full:\n old_arm = int(self._buf_arms[self._buf_ptr])\n old_reward = self._buf_rewards[self._buf_ptr]\n self._sw_counts[old_arm] -= 1\n self._sw_rewards[old_arm] -= old_reward\n \n self._buf_arms[self._buf_ptr] = arm\n self._buf_rewards[self._buf_ptr] = reward\n self._sw_counts[arm] += 1\n self._sw_rewards[arm] += reward\n \n self._buf_ptr += 1\n if self._buf_ptr >= self._W:\n self._buf_ptr = 0\n self._buf_full = True"
7}OK: Replaced lines 261..455 with 121 line(s) in SMPyBandits/custom_bandit.py. Editable range: 261–381. [Current file: SMPyBandits/custom_bandit.py | editable: 261–381 | total: 455 lines] 258| # ===================================================================== 259| # EDITABLE: BanditPolicy 260| # ===================================================================== 261 class BanditPolicy: 262 """Adaptive KL-UCB with Linear Contextual Extension (AKL-Lin). 263 264 For contextual bandits: uses linear UCB with ridge regression. 265 For stochastic bandits: uses KL-UCB with sliding window for non-stationarity. 266 """ 267 268 def __init__(self, K: int, context_dim: int = 0): 269 self.K = K 270 self.context_dim = context_dim 271 self.counts = np.zeros(K) 272 self.rewards = np.zeros(K) 273 274 if context_dim > 0: 275 # Linear UCB: ridge
[Test #1] (2 tests remaining; call submit(n=1) to submit this result) ### stochastic-mab (scripts/stochastic_mab.sh) Training metrics (last steps): TRAIN_METRICS step=6000 cumulative_regret=457.3000 normalized_regret=0.076217 TRAIN_METRICS step=7000 cumulative_regret=546.0000 normalized_regret=0.078000 TRAIN_METRICS step=8000 cumulative_regret=609.5500 normalized_regret=0.076194 TRAIN_METRICS step=9000 cumulative_regret=685.4500 normalized_regret=0.076161 TRAIN_METRICS step=10000 cumulative_regret=791.0500 normalized_regret=0.079105 Final metrics (stochastic-mab): cumulative_regret_stochastic_mab: 791.050000 normalized_regret_stochastic_mab: 0.079105 ### contextual (scripts/contextual.sh) Training metrics (last steps): TRAIN_METRICS step=6000 cumulative_regret=47.7226 normalized_regret=0.007954 TRAIN_METRICS step=7000 cumulative_regret=48.7761 normalized_regret=0.006968 TRAIN_METRICS step=8000 cumulative_regret=49.5759 normalized_regret=0.006197 TRAIN_METRICS step=9000 cumulative_reg
Tool Input
1{
2 "op": "replace",
3 "filename": "SMPyBandits/custom_bandit.py",
4 "start_line": 261,
5 "end_line": 381,
6 "content": "class BanditPolicy:\n \"\"\"Adaptive KL-UCB with Change Detection (AKL-CD).\n \n For contextual bandits: linear UCB (LinUCB).\n For stochastic bandits: KL-UCB with sliding window and change detection.\n \"\"\"\n\n def __init__(self, K: int, context_dim: int = 0):\n self.K = K\n self.context_dim = context_dim\n self.counts = np.zeros(K)\n self.rewards = np.zeros(K)\n \n if context_dim > 0:\n # LinUCB\n self._lambda = 1.0\n self._A = [np.eye(context_dim) * self._lambda for _ in range(K)]\n self._b = [np.zeros(context_dim) for _ in range(K)]\n else:\n # Sliding window\n self._W = 800\n self._buf_arms = np.zeros(self._W, dtype=int)\n self._buf_rewards = np.zeros(self._W)\n self._buf_ptr = 0\n self._buf_full = False\n self._sw_counts = np.zeros(K)\n self._sw_rewards = np.zeros(K)\n # Change detection\n self._best_arm = 0\n self._cusum = 0.0\n self._threshold = 5.0\n self._reset_flag = False\n\n def reset(self):\n self.counts[:] = 0\n self.rewards[:] = 0\n if self.context_dim > 0:\n d = self.context_dim\n for a in range(self.K):\n self._A[a] = np.eye(d) * self._lambda\n self._b[a] = np.zeros(d)\n else:\n self._buf_ptr = 0\n self._buf_full = False\n self._sw_counts[:] = 0\n self._sw_rewards[:] = 0\n self._best_arm = 0\n self._cusum = 0.0\n self._reset_flag = False\n\n def select_arm(self, t: int, context: np.ndarray | None = None) -> int:\n if t < self.K:\n return t\n \n if context is not None and self.context_dim > 0:\n return self._linucb_select(context, t)\n else:\n return self._klucb_select(t)\n\n def _linucb_select(self, context: np.ndarray, t: int) -> int:\n best_arm = 0\n best_val = -np.inf\n for a in range(self.K):\n A = self._A[a]\n b = self._b[a]\n # Solve ridge regression: theta = A^{-1} b\n try:\n theta = np.linalg.solve(A, b)\n except np.linalg.LinAlgError:\n theta = np.zeros(self.context_dim)\n mean = theta @ context\n var = context @ np.linalg.inv(A) @ context\n alpha = np.sqrt(2 * np.log(t+1))\n val = mean + alpha * np.sqrt(var)\n if val > best_val:\n best_val = val\n best_arm = a\n return best_arm\n\n def _klucb_select(self, t: int) -> int:\n # Use sliding window if change detected or buffer full\n if self._reset_flag or self._buf_full:\n counts = self._sw_counts\n rewards = self._sw_rewards\n else:\n counts = self.counts\n rewards = self.rewards\n \n best_arm = 0\n best_idx = -np.inf\n for a in range(self.K):\n if counts[a] == 0:\n return a\n mu_hat = rewards[a] / counts[a]\n idx = kl_ucb_bound(mu_hat, int(counts[a]), t, c=1.0)\n if idx > best_idx:\n best_idx = idx\n best_arm = a\n self._best_arm = best_arm\n return best_arm\n\n def update(self, arm: int, reward: float, context: np.ndarray | None = None):\n self.counts[arm] += 1\n self.rewards[arm] += reward\n \n if context is not None and self.context_dim > 0:\n x = context.reshape(-1, 1)\n self._A[arm] += x @ x.T\n self._b[arm] += reward * context\n else:\n # Update sliding window\n if self._buf_full:\n old_arm = int(self._buf_arms[self._buf_ptr])\n old_reward = self._buf_rewards[self._buf_ptr]\n self._sw_counts[old_arm] -= 1\n self._sw_rewards[old_arm] -= old_reward\n \n self._buf_arms[self._buf_ptr] = arm\n self._buf_rewards[self._buf_ptr] = reward\n self._sw_counts[arm] += 1\n self._sw_rewards[arm] += reward\n \n self._buf_ptr += 1\n if self._buf_ptr >= self._W:\n self._buf_ptr = 0\n self._buf_full = True\n \n # Change detection via CUSUM on best arm\n if arm == self._best_arm:\n mu = self.rewards[arm] / self.counts[arm]\n # Expected reward if stationary\n expected = mu\n deviation = reward - expected\n self._cusum = max(0, self._cusum + deviation)\n if self._cusum > self._threshold:\n self._reset_flag = True\n # Reset sliding window to clear old data\n self._sw_counts[:] = 0\n self._sw_rewards[:] = 0\n self._buf_full = False\n self._buf_ptr = 0\n self._cusum = 0.0"
7}OK: Replaced lines 261..381 with 142 line(s) in SMPyBandits/custom_bandit.py. Editable range: 261–402. [Current file: SMPyBandits/custom_bandit.py | editable: 261–402 | total: 476 lines] 258| # ===================================================================== 259| # EDITABLE: BanditPolicy 260| # ===================================================================== 261 class BanditPolicy: 262 """Adaptive KL-UCB with Change Detection (AKL-CD). 263 264 For contextual bandits: linear UCB (LinUCB). 265 For stochastic bandits: KL-UCB with sliding window and change detection. 266 """ 267 268 def __init__(self, K: int, context_dim: int = 0): 269 self.K = K 270 self.context_dim = context_dim 271 self.counts = np.zeros(K) 272 self.rewards = np.zeros(K) 273 274 if context_dim > 0: 275 # LinUCB 276 self._lambda = 1.0 277
[Test #2] (1 test remaining; call submit(n=2) to submit this result) ### stochastic-mab (scripts/stochastic_mab.sh) Training metrics (last steps): TRAIN_METRICS step=6000 cumulative_regret=885.4500 normalized_regret=0.147575 TRAIN_METRICS step=7000 cumulative_regret=1006.5000 normalized_regret=0.143786 TRAIN_METRICS step=8000 cumulative_regret=1138.1000 normalized_regret=0.142263 TRAIN_METRICS step=9000 cumulative_regret=1234.4000 normalized_regret=0.137156 TRAIN_METRICS step=10000 cumulative_regret=1399.6500 normalized_regret=0.139965 Final metrics (stochastic-mab): cumulative_regret_stochastic_mab: 1399.650000 normalized_regret_stochastic_mab: 0.139965 ### contextual (scripts/contextual.sh) Training metrics (last steps): TRAIN_METRICS step=6000 cumulative_regret=47.7226 normalized_regret=0.007954 TRAIN_METRICS step=7000 cumulative_regret=48.7761 normalized_regret=0.006968 TRAIN_METRICS step=8000 cumulative_regret=49.5759 normalized_regret=0.006197 TRAIN_METRICS step=9000 cumulative
Tool Input
1{
2 "op": "replace",
3 "filename": "SMPyBandits/custom_bandit.py",
4 "start_line": 261,
5 "end_line": 402,
6 "content": "class BanditPolicy:\n \"\"\"Exponentially Weighted KL-UCB with Linear UCB (EW-KL-UCB).\"\"\"\n \n def __init__(self, K: int, context_dim: int = 0):\n self.K = K\n self.context_dim = context_dim\n self.counts = np.zeros(K)\n self.rewards = np.zeros(K)\n # Discounted statistics for non-stationarity\n self._gamma = 0.995\n self._disc_counts = np.zeros(K)\n self._disc_rewards = np.zeros(K)\n \n if context_dim > 0:\n # LinUCB\n self._lambda = 1.0\n self._A = [np.eye(context_dim) * self._lambda for _ in range(K)]\n self._b = [np.zeros(context_dim) for _ in range(K)]\n\n def reset(self):\n self.counts[:] = 0\n self.rewards[:] = 0\n self._disc_counts[:] = 0\n self._disc_rewards[:] = 0\n if self.context_dim > 0:\n d = self.context_dim\n for a in range(self.K):\n self._A[a] = np.eye(d) * self._lambda\n self._b[a] = np.zeros(d)\n\n def select_arm(self, t: int, context: np.ndarray | None = None) -> int:\n if t < self.K:\n return t\n \n if context is not None and self.context_dim > 0:\n return self._linucb_select(context, t)\n else:\n return self._klucb_select(t)\n\n def _linucb_select(self, context: np.ndarray, t: int) -> int:\n best_arm = 0\n best_val = -np.inf\n for a in range(self.K):\n A = self._A[a]\n b = self._b[a]\n try:\n theta = np.linalg.solve(A, b)\n except np.linalg.LinAlgError:\n theta = np.zeros(self.context_dim)\n mean = theta @ context\n var = context @ np.linalg.inv(A) @ context\n alpha = np.sqrt(2 * np.log(t+1))\n val = mean + alpha * np.sqrt(var)\n if val > best_val:\n best_val = val\n best_arm = a\n return best_arm\n\n def _klucb_select(self, t: int) -> int:\n # Use discounted statistics after initial phase (t > 1000)\n if t > 1000:\n counts = self._disc_counts\n rewards = self._disc_rewards\n else:\n counts = self.counts\n rewards = self.rewards\n \n best_arm = 0\n best_idx = -np.inf\n for a in range(self.K):\n if counts[a] == 0:\n return a\n mu_hat = rewards[a] / counts[a]\n # Use effective sample size for confidence\n n_eff = counts[a] * (1 - self._gamma) / (1 - self._gamma ** counts[a]) if self._gamma < 1 else counts[a]\n idx = kl_ucb_bound(mu_hat, int(n_eff), t, c=1.0)\n if idx > best_idx:\n best_idx = idx\n best_arm = a\n return best_arm\n\n def update(self, arm: int, reward: float, context: np.ndarray | None = None):\n self.counts[arm] += 1\n self.rewards[arm] += reward\n \n # Update discounted statistics\n self._disc_counts *= self._gamma\n self._disc_rewards *= self._gamma\n self._disc_counts[arm] += 1\n self._disc_rewards[arm] += reward\n \n if context is not None and self.context_dim > 0:\n x = context.reshape(-1, 1)\n self._A[arm] += x @ x.T\n self._b[arm] += reward * context"
7}OK: Replaced lines 261..402 with 95 line(s) in SMPyBandits/custom_bandit.py. Editable range: 261–355. [Current file: SMPyBandits/custom_bandit.py | editable: 261–355 | total: 429 lines] 258| # ===================================================================== 259| # EDITABLE: BanditPolicy 260| # ===================================================================== 261 class BanditPolicy: 262 """Exponentially Weighted KL-UCB with Linear UCB (EW-KL-UCB).""" 263 264 def __init__(self, K: int, context_dim: int = 0): 265 self.K = K 266 self.context_dim = context_dim 267 self.counts = np.zeros(K) 268 self.rewards = np.zeros(K) 269 # Discounted statistics for non-stationarity 270 self._gamma = 0.995 271 self._disc_counts = np.zeros(K) 272 self._disc_rewards = np.zeros(K) 273 274 if context_dim > 0: 275 # LinUCB 27
[Test #3] (0 tests remaining)
### stochastic-mab (scripts/stochastic_mab.sh)
/srv/home/bohanlyu/MLS-Bench/vendor/workspace/optimization-online-bandit/deepseek-reasoner_20260406_185106/SMPyBandits/custom_bandit.py:335: RuntimeWarning: divide by zero encountered in scalar divide
n_eff = counts[a] * (1 - self._gamma) / (1 - self._gamma ** counts[a]) if self._gamma < 1 else counts[a]
Traceback (most recent call last):
File "/srv/home/bohanlyu/MLS-Bench/vendor/workspace/optimization-online-bandit/deepseek-reasoner_20260406_185106/SMPyBandits/custom_bandit.py", line 429, in <module>
evaluate(args.env, seed=args.seed, output_dir=args.output_dir)
File "/srv/home/bohanlyu/MLS-Bench/vendor/workspace/optimization-online-bandit/deepseek-reasoner_20260406_185106/SMPyBandits/custom_bandit.py", line 394, in evaluate
result = run_bandit(env, policy, horizon)
File "/srv/home/bohanlyu/MLS-Bench/vendor/workspace/optimization-online-bandit/deepseek-reasoner_20260406_185106/SMPyBandits/cus