Agent Conversation: deepseek-reasoner
Task: optimization-hyperparameter-search
Hyperparameter Optimization: Custom Search Strategy Design
Research Question
Design a novel hyperparameter optimization (HPO) strategy that achieves better final validation scores and faster convergence than standard approaches like Random Search, TPE, Hyperband, and their combinations (BOHB, DEHB).
Background
Hyperparameter optimization is a fundamental problem in machine learning: given a model and dataset, find the hyperparameter configuration that maximizes validation performance within a limited evaluation budget. This is a black-box optimization problem where each function evaluation (training + validation) is expensive.
Classic strategies include:
- Random Search: Samples configurations uniformly. Simple but surprisingly effective, especially when some hyperparameters are more important than others (Bergstra & Bengio, 2012).
- TPE (Tree-structured Parzen Estimator): Models p(x|y<y*) and p(x|y>=y*) using kernel density estimation and maximizes their ratio (Bergstra et al., 2011).
- Hyperband: Uses multi-fidelity evaluation (early stopping) with successive halving to allocate resources to promising configurations (Li et al., 2017).
State-of-the-art methods combine these ideas:
- BOHB: Replaces random sampling in Hyperband with TPE-guided suggestions (Falkner et al., 2018).
- DEHB: Uses Differential Evolution within Hyperband's multi-fidelity framework (Awad et al., 2021).
- CMA-ES: Adapts a full covariance matrix of a Gaussian distribution for efficient continuous optimization (Hansen & Ostermeier, 2001).
There is ongoing research into strategies that better adapt to the optimization landscape, leverage multi-fidelity evaluations more effectively, or combine model-based search with evolutionary approaches.
Task
Implement a custom HPO strategy by modifying the CustomHPOStrategy class in scikit-learn/custom_hpo.py. You should implement both __init__ and suggest methods. The class is called repeatedly in a sequential loop where each call proposes one configuration to evaluate.
Interface
class CustomHPOStrategy:
def __init__(self, seed: int = 42):
"""Initialize the strategy with a random seed."""
self.seed = seed
self.rng = np.random.RandomState(seed)
def suggest(
self,
space: SearchSpace,
history: List[Trial],
budget_left: int,
) -> Tuple[Dict[str, Any], float]:
"""Propose the next configuration to evaluate.
Args:
space: SearchSpace with .params (list of HParam), .dim,
.sample_uniform(rng), .clip(config)
history: list of Trial(config, score, budget) from past evals
budget_left: remaining budget in full-fidelity units
Returns:
config: dict mapping hyperparameter names to values
fidelity: float in (0, 1] for multi-fidelity evaluation
"""
The search space pro
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 255,
5 "end_line": 326,
6 "content": "class CustomHPOStrategy:\n \"\"\"Adaptive Density-Guided Evolution with Multi-Fidelity Scheduling (ADEM).\"\"\"\n\n def __init__(self, seed: int = 42):\n self.seed = seed\n self.rng = np.random.RandomState(seed)\n # Parameters\n self.n_startup = 10 # random evaluations before modeling\n self.gamma = 0.25 # fraction of good observations for KDE\n self.de_mutation_factor = 0.5\n self.de_crossover_prob = 0.5\n self.exploration_weight = 2.0 # beta for distance bonus\n self.population_size = 20 # keep top N vectors as population\n self.n_candidates = 30 # candidates to evaluate acquisition\n # State\n self._all_vectors = [] # list of encoded vectors\n self._all_scores = [] # corresponding scores\n self._all_fidelities = [] # fidelities used\n\n def _encode(self, config, space):\n \"\"\"Encode a config to a numeric vector in [0,1].\"\"\"\n vec = []\n for p in space.params:\n val = config[p.name]\n if p.type == \"categorical\":\n idx = p.choices.index(val)\n vec.append(idx / max(len(p.choices) - 1, 1))\n elif p.type in (\"float\", \"int\"):\n if p.log_scale:\n v = (np.log(val) - np.log(p.low)) / (np.log(p.high) - np.log(p.low))\n else:\n v = (val - p.low) / (p.high - p.low)\n vec.append(float(np.clip(v, 0, 1)))\n return np.array(vec)\n\n def _decode(self, vec, space):\n \"\"\"Decode vector back to config.\"\"\"\n config = {}\n for i, p in enumerate(space.params):\n v = float(np.clip(vec[i], 0, 1))\n if p.type == \"categorical\":\n idx = int(round(v * max(len(p.choices) - 1, 1)))\n idx = min(idx, len(p.choices) - 1)\n config[p.name] = p.choices[idx]\n elif p.type == \"float\":\n if p.log_scale:\n config[p.name] = float(np.exp(\n np.log(p.low) + v * (np.log(p.high) - np.log(p.low))))\n else:\n config[p.name] = float(p.low + v * (p.high - p.low))\n elif p.type == \"int\":\n if p.log_scale:\n config[p.name] = int(round(np.exp(\n np.log(p.low) + v * (np.log(p.high) - np.log(p.low)))))\n else:\n config[p.name] = int(round(p.low + v * (p.high - p.low)))\n return config\n\n def _kde_logpdf(self, x, samples, bw):\n \"\"\"Gaussian KDE log-density at x.\"\"\"\n if len(samples) == 0:\n return -np.inf\n diffs = x[None, :] - samples\n return float(np.log(np.mean(np.exp(-0.5 * np.sum(diffs**2 / bw**2, axis=1))) + 1e-30))\n\n def _min_distance(self, x, vectors):\n \"\"\"Minimum Euclidean distance to existing vectors.\"\"\"\n if len(vectors) == 0:\n return 0.0\n return float(np.min(np.linalg.norm(vectors - x, axis=1)))\n\n def _de_mutate(self, target_idx, population_vectors):\n \"\"\"DE/rand/1/bin mutation and crossover.\"\"\"\n n = len(population_vectors)\n if n < 4:\n return population_vectors[target_idx] + self.rng.randn(len(population_vectors[0])) * 0.1\n idxs = list(range(n))\n idxs.remove(target_idx)\n a, b, c = self.rng.choice(idxs, 3, replace=False)\n mutant = population_vectors[a] + self.de_mutation_factor * (population_vectors[b] - population_vectors[c])\n mutant = np.clip(mutant, 0, 1)\n # Crossover\n dim = len(mutant)\n cross_mask = self.rng.rand(dim) < self.de_crossover_prob\n j_rand = self.rng.randint(dim)\n cross_mask[j_rand] = True\n trial = np.where(cross_mask, mutant, population_vectors[target_idx])\n return trial\n\n def suggest(\n self,\n space: SearchSpace,\n history: List[Trial],\n budget_left: int,\n ) -> Tuple[Dict[str, Any], float]:\n # Update internal state from history\n if len(self._all_vectors) != len(history):\n self._all_vectors = []\n self._all_scores = []\n self._all_fidelities = []\n for t in history:\n self._all_vectors.append(self._encode(t.config, space))\n self._all_scores.append(t.score)\n self._all_fidelities.append(t.budget)\n\n # Phase 1: Random startup\n if len(history) < self.n_startup:\n config = space.sample_uniform(self.rng)\n # Start with low fidelity for exploration\n fidelity = 0.33 if len(history) < self.n_startup // 2 else 0.66\n return config, fidelity\n\n # Prepare data\n vecs = np.array(self._all_vectors)\n scores = np.array(self._all_scores)\n n_obs = len(scores)\n\n # Split good/bad for KDE\n n_good = max(1, int(self.gamma * n_obs))\n threshold = np.sort(scores)[-n_good]\n good_vecs = vecs[scores >= threshold]\n bad_vecs = vecs[scores < threshold]\n if len(bad_vecs) == 0:\n bad_vecs = good_vecs.copy()\n\n # Bandwidth using Silverman's rule of thumb (simplified)\n bw_good = max(0.05, np.std(good_vecs, axis=0).mean() + 1e-6)\n bw_bad = max(0.05, np.std(bad_vecs, axis=0).mean() + 1e-6)\n\n # Build population: top vectors by score\n pop_size = min(self.population_size, n_obs)\n top_indices = np.argsort(scores)[-pop_size:]\n population = vecs[top_indices]\n\n # Generate candidates: mixture of DE mutations and random uniform\n candidates = []\n for _ in range(self.n_candidates):\n if self.rng.rand() < 0.7 and len(population) >= 2:\n # DE mutation\n target = self.rng.randint(len(population))\n cand_vec = self._de_mutate(target, population)\n else:\n # Random uniform in [0,1]^dim\n cand_vec = self.rng.rand(space.dim)\n candidates.append(cand_vec)\n\n # Score candidates with acquisition function\n best_acq = -np.inf\n best_vec = None\n for cand_vec in candidates:\n # KDE ratio (expected improvement)\n log_l = self._kde_logpdf(cand_vec, good_vecs, bw_good)\n log_g = self._kde_logpdf(cand_vec, bad_vecs, bw_bad)\n ei = log_l - log_g\n # Exploration bonus: distance to nearest evaluated point\n dist = self._min_distance(cand_vec, vecs)\n # Adaptive exploration weight: decrease with budget spent\n spent = len(history) / (len(history) + budget_left)\n beta = self.exploration_weight * (1.0 - spent) # more exploration early\n acq = ei + beta * dist\n if acq > best_acq:\n best_acq = acq\n best_vec = cand_vec\n\n # Determine fidelity: higher for promising candidates, lower for exploratory\n # Use EI value as proxy for promise\n if best_acq > 0.5: # arbitrary threshold\n fidelity = 1.0\n elif best_acq > -1.0:\n fidelity = 0.66\n else:\n fidelity = 0.33\n\n # Ensure fidelity not too low when budget is limited\n if budget_left < 5:\n fidelity = min(1.0, fidelity * 1.5)\n\n # Decode vector to config\n config = self._decode(best_vec, space)\n return config, fidelity"
7}OK: Replaced lines 255..326 with 180 line(s) in scikit-learn/custom_hpo.py. Editable range: 255–434. [Current file: scikit-learn/custom_hpo.py | editable: 255–434 | total: 563 lines] 252| # ================================================================ 253| 254| 255 class CustomHPOStrategy: 256 """Adaptive Density-Guided Evolution with Multi-Fidelity Scheduling (ADEM).""" 257 258 def __init__(self, seed: int = 42): 259 self.seed = seed 260 self.rng = np.random.RandomState(seed) 261 # Parameters 262 self.n_startup = 10 # random evaluations before modeling 263 self.gamma = 0.25 # fraction of good observations for KDE 264 self.de_mutation_factor = 0.5 265 self.de_crossover_prob = 0.5 266 self.exploration_weight = 2.0 # beta for distance bonus 267 self.population_size = 20 # keep top N vectors as population 268 self.n_candidates
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 255,
5 "end_line": 326,
6 "content": "class CustomHPOStrategy:\n \"\"\"Model-Guided Differential Evolution with Adaptive Fidelity (MG-DEAF).\"\"\"\n\n def __init__(self, seed: int = 42):\n self.seed = seed\n self.rng = np.random.RandomState(seed)\n # Parameters\n self.population_size = 20\n self.n_startup = 10 # random before modelling\n self.gamma = 0.3 # fraction of good samples for KDE\n self.mutation_factor = 0.5\n self.crossover_prob = 0.7\n self.exploration_prob = 0.2 # chance to do random exploration\n # Internal state\n self.population = [] # list of (vector, score, fidelity) where vector is encoded\n self.all_trials = [] # list of (vector, score, fidelity)\n self.best_score = -np.inf\n self.best_vector = None\n self.best_fidelity = 1.0\n\n def _encode(self, config, space):\n \"\"\"Encode a config to a numeric vector in [0,1].\"\"\"\n vec = []\n for p in space.params:\n val = config[p.name]\n if p.type == \"categorical\":\n idx = p.choices.index(val)\n vec.append(idx / max(len(p.choices) - 1, 1))\n elif p.type in (\"float\", \"int\"):\n if p.log_scale:\n v = (np.log(val) - np.log(p.low)) / (np.log(p.high) - np.log(p.low))\n else:\n v = (val - p.low) / (p.high - p.low)\n vec.append(float(np.clip(v, 0, 1)))\n return np.array(vec)\n\n def _decode(self, vec, space):\n \"\"\"Decode vector back to config.\"\"\"\n config = {}\n for i, p in enumerate(space.params):\n v = float(np.clip(vec[i], 0, 1))\n if p.type == \"categorical\":\n idx = int(round(v * max(len(p.choices) - 1, 1)))\n idx = min(idx, len(p.choices) - 1)\n config[p.name] = p.choices[idx]\n elif p.type == \"float\":\n if p.log_scale:\n config[p.name] = float(np.exp(\n np.log(p.low) + v * (np.log(p.high) - np.log(p.low))))\n else:\n config[p.name] = float(p.low + v * (p.high - p.low))\n elif p.type == \"int\":\n if p.log_scale:\n config[p.name] = int(round(np.exp(\n np.log(p.low) + v * (np.log(p.high) - np.log(p.low)))))\n else:\n config[p.name] = int(round(p.low + v * (p.high - p.low)))\n return config\n\n def _kde_logpdf(self, x, samples, bw):\n \"\"\"Gaussian KDE log-density at x.\"\"\"\n if len(samples) == 0:\n return -np.inf\n diffs = x[None, :] - samples\n return float(np.log(\n np.mean(np.exp(-0.5 * np.sum(diffs**2 / bw**2, axis=1))) + 1e-30\n ))\n\n def _sample_from_model(self, space):\n \"\"\"Sample config using TPE-like model.\"\"\"\n if len(self.all_trials) < self.n_startup:\n return space.sample_uniform(self.rng)\n\n vecs = np.array([t[0] for t in self.all_trials])\n scores = np.array([t[1] for t in self.all_trials])\n n_good = max(1, int(self.gamma * len(scores)))\n threshold = np.sort(scores)[-n_good]\n\n good = vecs[scores >= threshold]\n bad = vecs[scores < threshold]\n if len(bad) == 0:\n bad = good.copy()\n\n bw_good = max(0.05, good.std() + 1e-6)\n bw_bad = max(0.05, bad.std() + 1e-6)\n\n best_ei = -np.inf\n best_cfg = None\n for _ in range(24):\n cfg = space.sample_uniform(self.rng)\n x = self._encode(cfg, space)\n log_l = self._kde_logpdf(x, good, bw_good)\n log_g = self._kde_logpdf(x, bad, bw_bad)\n ei = log_l - log_g\n if ei > best_ei:\n best_ei = ei\n best_cfg = cfg\n return best_cfg\n\n def _de_mutate(self, target_idx, population_vecs):\n \"\"\"DE/rand/1/bin mutation and crossover.\"\"\"\n n = len(population_vecs)\n if n < 4:\n return population_vecs[target_idx] + self.rng.randn(len(population_vecs[0])) * 0.1\n\n idxs = list(range(n))\n idxs.remove(target_idx)\n a, b, c = self.rng.choice(idxs, 3, replace=False)\n mutant = population_vecs[a] + self.mutation_factor * (population_vecs[b] - population_vecs[c])\n mutant = np.clip(mutant, 0, 1)\n\n dim = len(mutant)\n cross_mask = self.rng.rand(dim) < self.crossover_prob\n j_rand = self.rng.randint(dim)\n cross_mask[j_rand] = True\n trial = np.where(cross_mask, mutant, population_vecs[target_idx])\n return trial\n\n def _select_fidelity(self, budget_left, is_exploration):\n \"\"\"Choose fidelity based on remaining budget and phase.\"\"\"\n # Early budget: low fidelity for exploration\n # Later budget: higher fidelity for exploitation\n # Simple schedule: start with 0.25, increase linearly to 1.0 as budget depletes\n # But also adapt: if exploring, use lower fidelity; if exploiting, higher.\n if is_exploration:\n base = 0.25\n else:\n base = 0.5\n # Increase fidelity as budget decreases (more refinement)\n # budget_left is integer remaining full-fidelity evaluations\n # total budget unknown; we can approximate using history length?\n # Use a simple heuristic: fidelity = max(base, 1.0 - 0.5 * (budget_left / 50))\n # But budget_left varies per benchmark. Use 0.25 for first half, 0.5 for second half, 1.0 for last few.\n # Simpler: if budget_left > 30: fidelity = 0.25, elif budget_left > 15: 0.5, else 0.75\n # However we need to be careful with min fidelity 0.1.\n if budget_left > 30:\n fidelity = 0.25\n elif budget_left > 15:\n fidelity = 0.5\n else:\n fidelity = 0.75\n return max(0.1, min(1.0, fidelity))\n\n def suggest(\n self,\n space: SearchSpace,\n history: List[Trial],\n budget_left: int,\n ) -> Tuple[Dict[str, Any], float]:\n \"\"\"Propose the next configuration to evaluate.\"\"\"\n # Update internal state from history\n if len(self.all_trials) < len(history):\n for trial in history[len(self.all_trials):]:\n vec = self._encode(trial.config, space)\n self.all_trials.append((vec, trial.score, trial.budget))\n # Update population (keep recent ones)\n self.population.append((vec, trial.score, trial.budget))\n if len(self.population) > self.population_size:\n self.population.pop(0)\n if trial.score > self.best_score:\n self.best_score = trial.score\n self.best_vector = vec\n self.best_fidelity = trial.budget\n\n # Decide exploration vs exploitation\n explore = False\n if len(history) < self.n_startup:\n explore = True # random exploration\n else:\n # Occasionally explore\n if self.rng.rand() < self.exploration_prob:\n explore = True\n\n if explore:\n # Random or model-guided exploration\n if self.rng.rand() < 0.5 or len(self.all_trials) < self.n_startup:\n config = space.sample_uniform(self.rng)\n else:\n config = self._sample_from_model(space)\n fidelity = self._select_fidelity(budget_left, is_exploration=True)\n else:\n # Exploitation: use DE to generate candidate\n if len(self.population) >= 4:\n # Select a target index (worst performer)\n pop_vecs = [p[0] for p in self.population]\n scores = [p[1] for p in self.population]\n target_idx = np.argmin(scores)\n trial_vec = self._de_mutate(target_idx, pop_vecs)\n config = self._decode(trial_vec, space)\n else:\n # Fallback to model-guided\n config = self._sample_from_model(space)\n fidelity = self._select_fidelity(budget_left, is_exploration=False)\n\n # Ensure config is valid\n config = space.clip(config)\n return config, fidelity"
7}OK: Replaced lines 255..326 with 197 line(s) in scikit-learn/custom_hpo.py. Editable range: 255–451. [Current file: scikit-learn/custom_hpo.py | editable: 255–451 | total: 580 lines] 252| # ================================================================ 253| 254| 255 class CustomHPOStrategy: 256 """Model-Guided Differential Evolution with Adaptive Fidelity (MG-DEAF).""" 257 258 def __init__(self, seed: int = 42): 259 self.seed = seed 260 self.rng = np.random.RandomState(seed) 261 # Parameters 262 self.population_size = 20 263 self.n_startup = 10 # random before modelling 264 self.gamma = 0.3 # fraction of good samples for KDE 265 self.mutation_factor = 0.5 266 self.crossover_prob = 0.7 267 self.exploration_prob = 0.2 # chance to do random exploration 268 # Internal state 269 self.population = [] # list of (vector,
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 255,
5 "end_line": 326,
6 "content": "class CustomHPOStrategy:\n \"\"\"Multi-Fidelity Covariance Matrix Adaptation with Local Surrogate (MF-CMA-LSO).\"\"\"\n\n def __init__(self, seed: int = 42):\n self.seed = seed\n self.rng = np.random.RandomState(seed)\n # Multi-fidelity parameters\n self.eta = 3.0 # halving rate for fidelity scaling\n self.min_fidelity = 0.1 # minimum fidelity fraction\n # CMA-ES parameters\n self.sigma = 0.3 # initial step size\n self.mean = None # mean vector in [0,1]^d\n self.C = None # covariance matrix\n self.pc = None # evolution path for C\n self.ps = None # evolution path for sigma\n # Population parameters\n self.lambda_ = None # population size (offspring)\n self.mu = None # parent number\n self.weights = None # recombination weights\n self.mu_eff = None # effective mu\n # Adaptive parameters\n self.cc = None # time constant for C\n self.c1 = None # learning rate for rank-one update of C\n self.cmu = None # learning rate for rank-mu update of C\n self.cs = None # learning rate for sigma adaptation\n self.damps = None # damping for sigma adaptation\n # Surrogate model parameters\n self.surrogate_n_warmup = 5 # number of random evaluations before surrogate\n self.surrogate_active = False # whether surrogate is active\n self.local_surrogate_radius = 0.2 # radius for local region\n self.surrogate_exploit_prob = 0.3 # probability of using surrogate exploitation\n # Internal state\n self.population = [] # list of (config_vec, config_dict, fidelity, score)\n self.all_trials = [] # list of (vec, score, fidelity)\n self.fidelity_levels = [] # list of fidelity levels to explore\n self.current_fidelity_idx = 0\n self.initialized = False\n self.budget_spent = 0\n\n def _encode(self, config, space):\n \"\"\"Encode a config to a normalized vector in [0,1]^d.\"\"\"\n vec = []\n for p in space.params:\n val = config[p.name]\n if p.type == \"categorical\":\n idx = p.choices.index(val)\n vec.append(idx / max(len(p.choices) - 1, 1))\n elif p.type in (\"float\", \"int\"):\n if p.log_scale:\n v = (np.log(val) - np.log(p.low)) / (np.log(p.high) - np.log(p.low))\n else:\n v = (val - p.low) / (p.high - p.low)\n vec.append(float(np.clip(v, 0, 1)))\n return np.array(vec)\n\n def _decode(self, vec, space):\n \"\"\"Decode a normalized vector back to a config dict.\"\"\"\n config = {}\n for i, p in enumerate(space.params):\n v = float(np.clip(vec[i], 0, 1))\n if p.type == \"categorical\":\n idx = int(round(v * max(len(p.choices) - 1, 1)))\n idx = min(idx, len(p.choices) - 1)\n config[p.name] = p.choices[idx]\n elif p.type == \"float\":\n if p.log_scale:\n config[p.name] = float(np.exp(\n np.log(p.low) + v * (np.log(p.high) - np.log(p.low))))\n else:\n config[p.name] = float(p.low + v * (p.high - p.low))\n elif p.type == \"int\":\n if p.log_scale:\n config[p.name] = int(round(np.exp(\n np.log(p.low) + v * (np.log(p.high) - np.log(p.low)))))\n else:\n config[p.name] = int(round(p.low + v * (p.high - p.low)))\n return config\n\n def _init_cma(self, dim):\n \"\"\"Initialize CMA-ES parameters.\"\"\"\n # Population size\n self.lambda_ = 4 + int(3 * np.log(dim))\n self.mu = self.lambda_ // 2\n # Recombination weights\n weights = np.log(self.mu + 0.5) - np.log(np.arange(1, self.mu + 1))\n self.weights = weights / weights.sum()\n self.mu_eff = 1.0 / np.sum(self.weights ** 2)\n # Adaptation parameters\n self.cc = (4 + self.mu_eff / dim) / (dim + 4 + 2 * self.mu_eff / dim)\n self.cs = (self.mu_eff + 2) / (dim + self.mu_eff + 5)\n self.c1 = 2 / ((dim + 1.3) ** 2 + self.mu_eff)\n self.cmu = min(1 - self.c1, 2 * (self.mu_eff - 2 + 1 / self.mu_eff) / ((dim + 2) ** 2 + self.mu_eff))\n self.damps = 1 + 2 * max(0, np.sqrt((self.mu_eff - 1) / (dim + 1)) - 1) + self.cs\n # Initialize evolution paths\n self.mean = np.full(dim, 0.5) # start at center of hypercube\n self.pc = np.zeros(dim)\n self.ps = np.zeros(dim)\n self.C = np.eye(dim)\n # Initialize fidelity levels (geometric progression)\n max_fidelity = 1.0\n min_fidelity = self.min_fidelity\n fid = max_fidelity\n while fid >= min_fidelity:\n self.fidelity_levels.append(fid)\n fid /= self.eta\n self.fidelity_levels = sorted(self.fidelity_levels) # ascending\n self.current_fidelity_idx = 0\n\n def _sample_from_cma(self, space):\n \"\"\"Sample a new candidate using CMA-ES distribution.\"\"\"\n dim = space.dim\n # Eigendecomposition of C for sampling\n try:\n # Add small regularization for numerical stability\n C_reg = self.C + np.eye(dim) * 1e-8\n L = np.linalg.cholesky(C_reg)\n z = self.rng.randn(dim)\n vec = self.mean + self.sigma * L.dot(z)\n except np.linalg.LinAlgError:\n # Fallback to isotropic Gaussian\n vec = self.mean + self.sigma * self.rng.randn(dim)\n # Clip to [0,1] bounds\n vec = np.clip(vec, 0, 1)\n config = self._decode(vec, space)\n return config, vec\n\n def _update_cma(self, population):\n \"\"\"Update CMA-ES state given evaluated population.\"\"\"\n # population: list of (vec, score, fidelity)\n if len(population) < self.mu:\n return\n # Sort by score descending\n sorted_pop = sorted(population, key=lambda x: x[1], reverse=True)\n # Select top mu\n selected = sorted_pop[:self.mu]\n # Update mean\n old_mean = self.mean.copy()\n self.mean = np.zeros_like(self.mean)\n for i, (vec, score, fid) in enumerate(selected):\n self.mean += self.weights[i] * vec\n # Update evolution paths\n y = (self.mean - old_mean) / self.sigma\n self.ps = (1 - self.cs) * self.ps + np.sqrt(self.cs * (2 - self.cs) * self.mu_eff) * y / np.sqrt(np.dot(y, y) + 1e-20)\n # Update covariance matrix\n hs = 1.0 if np.linalg.norm(self.ps) / np.sqrt(1 - (1 - self.cs) ** (2 * len(population))) < 1.4 + 2 / (space.dim + 1) else 0\n self.pc = (1 - self.cc) * self.pc + hs * np.sqrt(self.cc * (2 - self.cc) * self.mu_eff) * y\n # Rank-one update\n delta_c1 = np.outer(self.pc, self.pc)\n # Rank-mu update\n delta_cmu = np.zeros_like(self.C)\n for i in range(self.mu):\n w = self.weights[i]\n z = (selected[i][0] - old_mean) / self.sigma\n delta_cmu += w * np.outer(z, z)\n # Combine updates\n self.C = (1 - self.c1 - self.cmu) * self.C + self.c1 * delta_c1 + self.cmu * delta_cmu\n # Update step size\n self.sigma *= np.exp((self.cs / self.damps) * (np.linalg.norm(self.ps) / np.sqrt(dim) - 1))\n\n def _build_local_surrogate(self, space, center_vec, radius):\n \"\"\"Build a simple local KDE surrogate model around a center.\"\"\"\n # Collect points within radius\n local_points = []\n local_scores = []\n for vec, score, fid in self.all_trials:\n dist = np.linalg.norm(vec - center_vec)\n if dist <= radius:\n local_points.append(vec)\n local_scores.append(score)\n if len(local_points) < 2:\n return None\n # Simple weighted average with inverse distance weighting\n def surrogate(x):\n weights = []\n for v, s in zip(local_points, local_scores):\n d = np.linalg.norm(x - v) + 1e-8\n weights.append(1.0 / d)\n weights = np.array(weights)\n weights /= weights.sum()\n return np.dot(weights, local_scores)\n return surrogate\n\n def _suggest_from_surrogate(self, space, center_vec, radius):\n \"\"\"Suggest a candidate using local surrogate optimization.\"\"\"\n surrogate = self._build_local_surrogate(space, center_vec, radius)\n if surrogate is None:\n return None\n # Generate random candidates in local region and pick best predicted\n best_pred = -np.inf\n best_vec = None\n for _ in range(20):\n offset = self.rng.randn(space.dim) * radius * 0.5\n vec = center_vec + offset\n vec = np.clip(vec, 0, 1)\n pred = surrogate(vec)\n if pred > best_pred:\n best_pred = pred\n best_vec = vec\n if best_vec is not None:\n config = self._decode(best_vec, space)\n return config\n return None\n\n def suggest(\n self,\n space: SearchSpace,\n history: List[Trial],\n budget_left: int,\n ) -> Tuple[Dict[str, Any], float]:\n # Update internal trial records from history\n if len(history) > len(self.all_trials):\n for i in range(len(self.all_trials), len(history)):\n trial = history[i]\n vec = self._encode(trial.config, space)\n self.all_trials.append((vec, trial.score, trial.budget))\n self.budget_spent += trial.budget\n\n # Initialize on first call\n if not self.initialized:\n self._init_cma(space.dim)\n self.initialized = True\n # First suggestion: random uniform at lowest fidelity\n config = space.sample_uniform(self.rng)\n fidelity = self.fidelity_levels[0]\n return config, fidelity\n\n # Decide exploration vs exploitation\n use_surrogate = False\n \n... (truncated)"
7}OK: Replaced lines 255..326 with 261 line(s) in scikit-learn/custom_hpo.py. Editable range: 255–515. [Current file: scikit-learn/custom_hpo.py | editable: 255–515 | total: 644 lines] 252| # ================================================================ 253| 254| 255 class CustomHPOStrategy: 256 """Multi-Fidelity Covariance Matrix Adaptation with Local Surrogate (MF-CMA-LSO).""" 257 258 def __init__(self, seed: int = 42): 259 self.seed = seed 260 self.rng = np.random.RandomState(seed) 261 # Multi-fidelity parameters 262 self.eta = 3.0 # halving rate for fidelity scaling 263 self.min_fidelity = 0.1 # minimum fidelity fraction 264 # CMA-ES parameters 265 self.sigma = 0.3 # initial step size 266 self.mean = None # mean vector in [0,1]^d 267 self.C = None # covariance matrix 268 self.pc = None # evolution path for C 269
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 381,
5 "end_line": 413,
6 "content": " def _update_cma(self, space):\n \"\"\"Update CMA-ES state given evaluated population.\"\"\"\n # Use all_trials as population\n population = self.all_trials\n if len(population) < self.mu:\n return\n # Sort by score descending\n sorted_pop = sorted(population, key=lambda x: x[1], reverse=True)\n # Select top mu\n selected = sorted_pop[:self.mu]\n # Update mean\n old_mean = self.mean.copy()\n self.mean = np.zeros_like(self.mean)\n for i, (vec, score, fid) in enumerate(selected):\n self.mean += self.weights[i] * vec\n # Update evolution paths\n y = (self.mean - old_mean) / self.sigma\n self.ps = (1 - self.cs) * self.ps + np.sqrt(self.cs * (2 - self.cs) * self.mu_eff) * y / np.sqrt(np.dot(y, y) + 1e-20)\n # Update covariance matrix\n hs = 1.0 if np.linalg.norm(self.ps) / np.sqrt(1 - (1 - self.cs) ** (2 * len(population))) < 1.4 + 2 / (space.dim + 1) else 0\n self.pc = (1 - self.cc) * self.pc + hs * np.sqrt(self.cc * (2 - self.cc) * self.mu_eff) * y\n # Rank-one update\n delta_c1 = np.outer(self.pc, self.pc)\n # Rank-mu update\n delta_cmu = np.zeros_like(self.C)\n for i in range(self.mu):\n w = self.weights[i]\n z = (selected[i][0] - old_mean) / self.sigma\n delta_cmu += w * np.outer(z, z)\n # Combine updates\n self.C = (1 - self.c1 - self.cmu) * self.C + self.c1 * delta_c1 + self.cmu * delta_cmu\n # Update step size\n dim = space.dim\n self.sigma *= np.exp((self.cs / self.damps) * (np.linalg.norm(self.ps) / np.sqrt(dim) - 1))"
7}OK: Replaced lines 381..413 with 34 line(s) in scikit-learn/custom_hpo.py. Editable range: 255–516. [Current file: scikit-learn/custom_hpo.py | editable: 255–516 | total: 645 lines] 252| # ================================================================ 253| 254| 255 class CustomHPOStrategy: 256 """Multi-Fidelity Covariance Matrix Adaptation with Local Surrogate (MF-CMA-LSO).""" 257 258 def __init__(self, seed: int = 42): 259 self.seed = seed 260 self.rng = np.random.RandomState(seed) 261 # Multi-fidelity parameters 262 self.eta = 3.0 # halving rate for fidelity scaling 263 self.min_fidelity = 0.1 # minimum fidelity fraction 264 # CMA-ES parameters 265 self.sigma = 0.3 # initial step size 266 self.mean = None # mean vector in [0,1]^d 267 self.C = None # covariance matrix 268 self.pc = None # evolution path for C 269
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 381,
5 "end_line": 414,
6 "content": " def _update_cma(self, space):\n \"\"\"Update CMA-ES state given evaluated population.\"\"\"\n # Use all_trials as population\n population = self.all_trials\n if len(population) < self.mu:\n return\n # Sort by score descending\n sorted_pop = sorted(population, key=lambda x: x[1], reverse=True)\n # Select top mu\n selected = sorted_pop[:self.mu]\n # Update mean\n old_mean = self.mean.copy()\n self.mean = np.zeros_like(self.mean)\n for i, (vec, score, fid) in enumerate(selected):\n self.mean += self.weights[i] * vec\n # Update evolution paths\n y = (self.mean - old_mean) / self.sigma\n self.ps = (1 - self.cs) * self.ps + np.sqrt(self.cs * (2 - self.cs) * self.mu_eff) * y / np.sqrt(np.dot(y, y) + 1e-20)\n # Update covariance matrix\n hs = 1.0 if np.linalg.norm(self.ps) / np.sqrt(1 - (1 - self.cs) ** (2 * len(population))) < 1.4 + 2 / (space.dim + 1) else 0\n self.pc = (1 - self.cc) * self.pc + hs * np.sqrt(self.cc * (2 - self.cc) * self.mu_eff) * y\n # Rank-one update\n delta_c1 = np.outer(self.pc, self.pc)\n # Rank-mu update\n delta_cmu = np.zeros_like(self.C)\n for i in range(self.mu):\n w = self.weights[i]\n z = (selected[i][0] - old_mean) / self.sigma\n delta_cmu += w * np.outer(z, z)\n # Combine updates\n self.C = (1 - self.c1 - self.cmu) * self.C + self.c1 * delta_c1 + self.cmu * delta_cmu\n # Update step size\n dim = space.dim\n self.sigma *= np.exp((self.cs / self.damps) * (np.linalg.norm(self.ps) / np.sqrt(dim) - 1))"
7}OK: Replaced lines 381..414 with 34 line(s) in scikit-learn/custom_hpo.py. Editable range: 255–516. [Current file: scikit-learn/custom_hpo.py | editable: 255–516 | total: 645 lines] 252| # ================================================================ 253| 254| 255 class CustomHPOStrategy: 256 """Multi-Fidelity Covariance Matrix Adaptation with Local Surrogate (MF-CMA-LSO).""" 257 258 def __init__(self, seed: int = 42): 259 self.seed = seed 260 self.rng = np.random.RandomState(seed) 261 # Multi-fidelity parameters 262 self.eta = 3.0 # halving rate for fidelity scaling 263 self.min_fidelity = 0.1 # minimum fidelity fraction 264 # CMA-ES parameters 265 self.sigma = 0.3 # initial step size 266 self.mean = None # mean vector in [0,1]^d 267 self.C = None # covariance matrix 268 self.pc = None # evolution path for C 269
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 381,
5 "end_line": 516,
6 "content": " def _update_cma(self, space):\n \"\"\"Update CMA-ES state given evaluated population.\"\"\"\n # Use all_trials as population\n population = self.all_trials\n if len(population) < self.mu:\n return\n # Sort by score descending\n sorted_pop = sorted(population, key=lambda x: x[1], reverse=True)\n # Select top mu\n selected = sorted_pop[:self.mu]\n # Update mean\n old_mean = self.mean.copy()\n self.mean = np.zeros_like(self.mean)\n for i, (vec, score, fid) in enumerate(selected):\n self.mean += self.weights[i] * vec\n # Update evolution paths\n y = (self.mean - old_mean) / self.sigma\n self.ps = (1 - self.cs) * self.ps + np.sqrt(self.cs * (2 - self.cs) * self.mu_eff) * y / np.sqrt(np.dot(y, y) + 1e-20)\n # Update covariance matrix\n hs = 1.0 if np.linalg.norm(self.ps) / np.sqrt(1 - (1 - self.cs) ** (2 * len(population))) < 1.4 + 2 / (space.dim + 1) else 0\n self.pc = (1 - self.cc) * self.pc + hs * np.sqrt(self.cc * (2 - self.cc) * self.mu_eff) * y\n # Rank-one update\n delta_c1 = np.outer(self.pc, self.pc)\n # Rank-mu update\n delta_cmu = np.zeros_like(self.C)\n for i in range(self.mu):\n w = self.weights[i]\n z = (selected[i][0] - old_mean) / self.sigma\n delta_cmu += w * np.outer(z, z)\n # Combine updates\n self.C = (1 - self.c1 - self.cmu) * self.C + self.c1 * delta_c1 + self.cmu * delta_cmu\n # Update step size\n dim = space.dim\n self.sigma *= np.exp((self.cs / self.damps) * (np.linalg.norm(self.ps) / np.sqrt(dim) - 1))\n\n def _build_local_surrogate(self, space, center_vec, radius):\n \"\"\"Build a simple local KDE surrogate model around a center.\"\"\"\n # Collect points within radius\n local_points = []\n local_scores = []\n for vec, score, fid in self.all_trials:\n dist = np.linalg.norm(vec - center_vec)\n if dist <= radius:\n local_points.append(vec)\n local_scores.append(score)\n if len(local_points) < 2:\n return None\n # Simple weighted average with inverse distance weighting\n def surrogate(x):\n weights = []\n for v, s in zip(local_points, local_scores):\n d = np.linalg.norm(x - v) + 1e-8\n weights.append(1.0 / d)\n weights = np.array(weights)\n weights /= weights.sum()\n return np.dot(weights, local_scores)\n return surrogate\n\n def _suggest_from_surrogate(self, space, center_vec, radius):\n \"\"\"Suggest a candidate using local surrogate optimization.\"\"\"\n surrogate = self._build_local_surrogate(space, center_vec, radius)\n if surrogate is None:\n return None\n # Generate random candidates in local region and pick best predicted\n best_pred = -np.inf\n best_vec = None\n for _ in range(20):\n offset = self.rng.randn(space.dim) * radius * 0.5\n vec = center_vec + offset\n vec = np.clip(vec, 0, 1)\n pred = surrogate(vec)\n if pred > best_pred:\n best_pred = pred\n best_vec = vec\n if best_vec is not None:\n config = self._decode(best_vec, space)\n return config\n return None\n\n def suggest(\n self,\n space: SearchSpace,\n history: List[Trial],\n budget_left: int,\n ) -> Tuple[Dict[str, Any], float]:\n # Update internal trial records from history\n if len(history) > len(self.all_trials):\n for i in range(len(self.all_trials), len(history)):\n trial = history[i]\n vec = self._encode(trial.config, space)\n self.all_trials.append((vec, trial.score, trial.budget))\n self.budget_spent += trial.budget\n\n # Initialize on first call\n if not self.initialized:\n self._init_cma(space.dim)\n self.initialized = True\n # First suggestion: random uniform at lowest fidelity\n config = space.sample_uniform(self.rng)\n fidelity = self.fidelity_levels[0]\n return config, fidelity\n\n # Update CMA-ES state periodically (every lambda evaluations)\n if len(self.all_trials) % max(1, self.lambda_ // 2) == 0:\n self._update_cma(space)\n\n # Decide exploration vs exploitation\n use_surrogate = False\n if len(self.all_trials) >= self.surrogate_n_warmup:\n # Increase exploitation probability as budget shrinks\n exploit_prob = self.surrogate_exploit_prob * (1.0 - budget_left / (self.budget_spent + budget_left))\n if self.rng.rand() < exploit_prob:\n # Find best point so far\n best_idx = np.argmax([t[1] for t in self.all_trials])\n best_vec = self.all_trials[best_idx][0]\n surrogate_config = self._suggest_from_surrogate(space, best_vec, self.local_surrogate_radius)\n if surrogate_config is not None:\n config = surrogate_config\n use_surrogate = True\n\n if not use_surrogate:\n # Use CMA-ES to sample new candidate\n config, vec = self._sample_from_cma(space)\n else:\n vec = self._encode(config, space)\n\n # Determine fidelity level: adaptive based on remaining budget and progress\n # Start with low fidelity, increase as budget decreases\n if len(self.all_trials) < 5:\n fidelity = self.fidelity_levels[0] # start low\n else:\n # Gradually increase fidelity as we spend budget\n spent_ratio = self.budget_spent / (self.budget_spent + budget_left)\n # Map to fidelity index: more spent → higher fidelity\n idx = int(spent_ratio * (len(self.fidelity_levels) - 1))\n idx = min(idx, len(self.fidelity_levels) - 1)\n fidelity = self.fidelity_levels[idx]\n\n # Ensure we don't exceed remaining budget\n if fidelity > budget_left:\n fidelity = max(self.min_fidelity, budget_left * 0.9)\n\n return config, fidelity"
7}OK: Replaced lines 381..516 with 143 line(s) in scikit-learn/custom_hpo.py. Editable range: 255–523. [Current file: scikit-learn/custom_hpo.py | editable: 255–523 | total: 652 lines] 252| # ================================================================ 253| 254| 255 class CustomHPOStrategy: 256 """Multi-Fidelity Covariance Matrix Adaptation with Local Surrogate (MF-CMA-LSO).""" 257 258 def __init__(self, seed: int = 42): 259 self.seed = seed 260 self.rng = np.random.RandomState(seed) 261 # Multi-fidelity parameters 262 self.eta = 3.0 # halving rate for fidelity scaling 263 self.min_fidelity = 0.1 # minimum fidelity fraction 264 # CMA-ES parameters 265 self.sigma = 0.3 # initial step size 266 self.mean = None # mean vector in [0,1]^d 267 self.C = None # covariance matrix 268 self.pc = None # evolution path for C 269
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 255,
5 "end_line": 523,
6 "content": "class CustomHPOStrategy:\n \"\"\"Adaptive Multi-Fidelity Bayesian Evolution (AMFBE).\"\"\"\n\n def __init__(self, seed: int = 42):\n self.seed = seed\n self.rng = np.random.RandomState(seed)\n # Hyperband parameters\n self.eta = 3.0 # halving rate\n self.min_fidelity = 0.1\n # TPE parameters\n self.gamma = 0.25 # fraction for good observations\n self.n_ei_candidates = 24\n # Evolution parameters\n self.mutation_factor = 0.5\n self.crossover_prob = 0.5\n # Internal state\n self.all_trials = [] # (vec, score, fidelity)\n self.brackets = [] # list of dict with configs, fidelities, scores\n self.queue = [] # (config, fidelity) to suggest next\n self.initialized = False\n self.budget_spent = 0\n self.total_budget = None\n\n def _encode(self, config, space):\n \"\"\"Encode config to normalized vector in [0,1]^d.\"\"\"\n vec = []\n for p in space.params:\n val = config[p.name]\n if p.type == \"categorical\":\n idx = p.choices.index(val)\n vec.append(idx / max(len(p.choices) - 1, 1))\n elif p.type in (\"float\", \"int\"):\n if p.log_scale:\n v = (np.log(val) - np.log(p.low)) / (np.log(p.high) - np.log(p.low))\n else:\n v = (val - p.low) / (p.high - p.low)\n vec.append(float(np.clip(v, 0, 1)))\n return np.array(vec)\n\n def _decode(self, vec, space):\n \"\"\"Decode normalized vector back to config.\"\"\"\n config = {}\n for i, p in enumerate(space.params):\n v = float(np.clip(vec[i], 0, 1))\n if p.type == \"categorical\":\n idx = int(round(v * max(len(p.choices) - 1, 1)))\n idx = min(idx, len(p.choices) - 1)\n config[p.name] = p.choices[idx]\n elif p.type == \"float\":\n if p.log_scale:\n config[p.name] = float(np.exp(\n np.log(p.low) + v * (np.log(p.high) - np.log(p.low))))\n else:\n config[p.name] = float(p.low + v * (p.high - p.low))\n elif p.type == \"int\":\n if p.log_scale:\n config[p.name] = int(round(np.exp(\n np.log(p.low) + v * (np.log(p.high) - np.log(p.low)))))\n else:\n config[p.name] = int(round(p.low + v * (p.high - p.low)))\n return config\n\n def _sample_from_tpe(self, space):\n \"\"\"Sample a configuration using TPE acquisition.\"\"\"\n if len(self.all_trials) < 10:\n return space.sample_uniform(self.rng)\n vecs = np.array([t[0] for t in self.all_trials])\n scores = np.array([t[1] for t in self.all_trials])\n n_good = max(1, int(self.gamma * len(scores)))\n threshold = np.sort(scores)[-n_good]\n good = vecs[scores >= threshold]\n bad = vecs[scores < threshold]\n if len(bad) == 0:\n bad = good.copy()\n # Bandwidth using Scott's rule\n bw_good = max(0.05, good.std() + 1e-6)\n bw_bad = max(0.05, bad.std() + 1e-6)\n best_ei = -np.inf\n best_cfg = None\n for _ in range(self.n_ei_candidates):\n cfg = space.sample_uniform(self.rng)\n x = self._encode(cfg, space)\n # Simple Gaussian KDE log-density\n log_l = np.log(np.mean(np.exp(-0.5 * np.sum((x - good)**2 / bw_good**2, axis=1))) + 1e-30)\n log_g = np.log(np.mean(np.exp(-0.5 * np.sum((x - bad)**2 / bw_bad**2, axis=1))) + 1e-30)\n ei = log_l - log_g\n if ei > best_ei:\n best_ei = ei\n best_cfg = cfg\n return best_cfg if best_cfg is not None else space.sample_uniform(self.rng)\n\n def _differential_evolution(self, space, population_vecs):\n \"\"\"Generate a new candidate via DE/rand/1/bin.\"\"\"\n n = len(population_vecs)\n if n < 4:\n # Not enough diversity, return random\n return space.sample_uniform(self.rng)\n idxs = list(range(n))\n target = self.rng.choice(idxs)\n idxs.remove(target)\n a, b, c = self.rng.choice(idxs, 3, replace=False)\n mutant = population_vecs[a] + self.mutation_factor * (population_vecs[b] - population_vecs[c])\n mutant = np.clip(mutant, 0, 1)\n # Crossover\n dim = len(mutant)\n cross_mask = self.rng.rand(dim) < self.crossover_prob\n j_rand = self.rng.randint(dim)\n cross_mask[j_rand] = True\n trial = np.where(cross_mask, mutant, population_vecs[target])\n config = self._decode(trial, space)\n return config\n\n def _initialize_brackets(self, space, total_budget):\n \"\"\"Set up Hyperband brackets.\"\"\"\n s_max = max(0, int(np.floor(np.log(total_budget) / np.log(self.eta))))\n s_max = min(s_max, 4) # cap brackets\n for s in range(s_max, -1, -1):\n n = int(np.ceil((s_max + 1) / (s + 1)) * self.eta ** s)\n n = min(n, total_budget)\n r = max(1.0 / self.eta ** s, self.min_fidelity)\n # Generate configs: mixture of random and model-based\n configs = []\n for i in range(n):\n if i < n // 2:\n configs.append(space.sample_uniform(self.rng))\n else:\n configs.append(self._sample_from_tpe(space))\n self.brackets.append({\n \"configs\": configs,\n \"fidelity\": r,\n \"scores\": [None] * len(configs),\n \"round\": 0,\n \"s\": s,\n })\n # Queue low-fidelity evaluations\n for cfg in configs:\n self.queue.append((cfg, r))\n\n def _advance_bracket(self, bracket):\n \"\"\"Promote top configurations to higher fidelity.\"\"\"\n configs = bracket[\"configs\"]\n scores = bracket[\"scores\"]\n paired = [(s, c) for s, c in zip(scores, configs) if s is not None]\n if not paired:\n return\n paired.sort(key=lambda x: x[0], reverse=True)\n n_keep = max(1, len(paired) // self.eta)\n survivors = paired[:n_keep]\n new_fidelity = min(bracket[\"fidelity\"] * self.eta, 1.0)\n # Update bracket\n bracket[\"configs\"] = [c for _, c in survivors]\n bracket[\"scores\"] = [None] * len(survivors)\n bracket[\"fidelity\"] = new_fidelity\n bracket[\"round\"] += 1\n # Queue higher-fidelity evaluations\n for cfg in bracket[\"configs\"]:\n self.queue.append((cfg, new_fidelity))\n # Also generate new configurations via evolution for diversity\n if len(survivors) >= 4:\n # Create population vectors from survivors\n survivor_vecs = [self._encode(c, space) for _, c in survivors]\n for _ in range(min(2, n_keep)):\n new_cfg = self._differential_evolution(space, survivor_vecs)\n self.queue.append((new_cfg, new_fidelity))\n\n def suggest(\n self,\n space: SearchSpace,\n history: List[Trial],\n budget_left: int,\n ) -> Tuple[Dict[str, Any], float]:\n # Update all_trials and budget_spent\n if len(history) > len(self.all_trials):\n for i in range(len(self.all_trials), len(history)):\n trial = history[i]\n vec = self._encode(trial.config, space)\n self.all_trials.append((vec, trial.score, trial.budget))\n self.budget_spent += trial.budget\n\n # Initialize brackets on first call\n if not self.initialized:\n self.total_budget = self.budget_spent + budget_left\n self._initialize_brackets(space, self.total_budget)\n self.initialized = True\n\n # Update bracket scores from the latest trial\n if history and self.brackets:\n last_trial = history[-1]\n for bracket in self.brackets:\n for i, cfg in enumerate(bracket[\"configs\"]):\n if bracket[\"scores\"][i] is None and cfg == last_trial.config:\n bracket[\"scores\"][i] = last_trial.score\n # If all scores in this bracket are filled, advance\n if all(s is not None for s in bracket[\"scores\"]):\n self._advance_bracket(bracket)\n\n # If queue is empty, generate new configurations via TPE\n if not self.queue:\n # Use TPE to sample a new configuration\n config = self._sample_from_tpe(space)\n # Determine fidelity: adaptive based on remaining budget\n spent_ratio = self.budget_spent / (self.budget_spent + budget_left)\n # Choose fidelity: higher spent ratio → higher fidelity\n if spent_ratio < 0.3:\n fidelity = self.min_fidelity\n elif spent_ratio < 0.6:\n fidelity = 0.33\n else:\n fidelity = 1.0\n # Ensure we don't exceed remaining budget\n if fidelity > budget_left:\n fidelity = max(self.min_fidelity, budget_left * 0.9)\n return config, fidelity\n\n # Pop from queue\n config, fidelity = self.queue.pop(0)\n # Ensure fidelity does not exceed budget\n if fidelity > budget_left:\n fidelity = max(self.min_fidelity, budget_left * 0.9)\n return config, fidelity"
7}OK: Replaced lines 255..523 with 220 line(s) in scikit-learn/custom_hpo.py. Editable range: 255–474. [Current file: scikit-learn/custom_hpo.py | editable: 255–474 | total: 603 lines] 252| # ================================================================ 253| 254| 255 class CustomHPOStrategy: 256 """Adaptive Multi-Fidelity Bayesian Evolution (AMFBE).""" 257 258 def __init__(self, seed: int = 42): 259 self.seed = seed 260 self.rng = np.random.RandomState(seed) 261 # Hyperband parameters 262 self.eta = 3.0 # halving rate 263 self.min_fidelity = 0.1 264 # TPE parameters 265 self.gamma = 0.25 # fraction for good observations 266 self.n_ei_candidates = 24 267 # Evolution parameters 268 self.mutation_factor = 0.5 269 self.crossover_prob = 0.5 270 # Internal state 271 self.all_trials = [] # (vec, sco
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 393,
5 "end_line": 418,
6 "content": " def _advance_bracket(self, bracket, space):\n \"\"\"Promote top configurations to higher fidelity.\"\"\"\n configs = bracket[\"configs\"]\n scores = bracket[\"scores\"]\n paired = [(s, c) for s, c in zip(scores, configs) if s is not None]\n if not paired:\n return\n paired.sort(key=lambda x: x[0], reverse=True)\n n_keep = max(1, len(paired) // self.eta)\n survivors = paired[:n_keep]\n new_fidelity = min(bracket[\"fidelity\"] * self.eta, 1.0)\n # Update bracket\n bracket[\"configs\"] = [c for _, c in survivors]\n bracket[\"scores\"] = [None] * len(survivors)\n bracket[\"fidelity\"] = new_fidelity\n bracket[\"round\"] += 1\n # Queue higher-fidelity evaluations\n for cfg in bracket[\"configs\"]:\n self.queue.append((cfg, new_fidelity))\n # Also generate new configurations via evolution for diversity\n if len(survivors) >= 4:\n # Create population vectors from survivors\n survivor_vecs = [self._encode(c, space) for _, c in survivors]\n for _ in range(min(2, n_keep)):\n new_cfg = self._differential_evolution(space, survivor_vecs)\n self.queue.append((new_cfg, new_fidelity))"
7}OK: Replaced lines 393..418 with 26 line(s) in scikit-learn/custom_hpo.py. Editable range: 255–474. [Current file: scikit-learn/custom_hpo.py | editable: 255–474 | total: 603 lines] 252| # ================================================================ 253| 254| 255 class CustomHPOStrategy: 256 """Adaptive Multi-Fidelity Bayesian Evolution (AMFBE).""" 257 258 def __init__(self, seed: int = 42): 259 self.seed = seed 260 self.rng = np.random.RandomState(seed) 261 # Hyperband parameters 262 self.eta = 3.0 # halving rate 263 self.min_fidelity = 0.1 264 # TPE parameters 265 self.gamma = 0.25 # fraction for good observations 266 self.n_ei_candidates = 24 267 # Evolution parameters 268 self.mutation_factor = 0.5 269 self.crossover_prob = 0.5 270 # Internal state 271 self.all_trials = [] # (vec, scor
Tool Input
1{
2 "op": "replace",
3 "filename": "scikit-learn/custom_hpo.py",
4 "start_line": 393,
5 "end_line": 418,
6 "content": " def _advance_bracket(self, bracket, space):\n \"\"\"Promote top configurations to higher fidelity.\"\"\"\n configs = bracket[\"configs\"]\n scores = bracket[\"scores\"]\n paired = [(s, c) for s, c in zip(scores, configs) if s is not None]\n if not paired:\n return\n paired.sort(key=lambda x: x[0], reverse=True)\n n_keep = max(1, len(paired) // self.eta)\n survivors = paired[:n_keep]\n new_fidelity = min(bracket[\"fidelity\"] * self.eta, 1.0)\n # Update bracket\n bracket[\"configs\"] = [c for _, c in survivors]\n bracket[\"scores\"] = [None] * len(survivors)\n bracket[\"fidelity\"] = new_fidelity\n bracket[\"round\"] += 1\n # Queue higher-fidelity evaluations\n for cfg in bracket[\"configs\"]:\n self.queue.append((cfg, new_fidelity))\n # Also generate new configurations via evolution for diversity\n if len(survivors) >= 4:\n # Create population vectors from survivors\n survivor_vecs = [self._encode(c, space) for _, c in survivors]\n for _ in range(min(2, n_keep)):\n new_cfg = self._differential_evolution(space, survivor_vecs)\n self.queue.append((new_cfg, new_fidelity))"
7}