Agent Conversation: deepseek-reasoner
Task: optimization-multi-objective
Multi-Objective Optimization: Custom Evolutionary Strategy Design
Research Question
Design a novel multi-objective evolutionary algorithm (MOEA) strategy that achieves better convergence, diversity, and spread on standard benchmark problems than classic approaches like NSGA-II, MOEA/D, and SPEA2.
Background
Multi-objective optimization aims to find a set of Pareto-optimal solutions that represent the best trade-offs among conflicting objectives. Evolutionary algorithms are the dominant approach, differing primarily in three components:
- Parent selection: How to choose individuals for mating (e.g., tournament with crowding distance, reference-vector-based).
- Variation: How to produce offspring via crossover and mutation operators.
- Environmental selection (survival): How to prune the combined parent+offspring pool back to population size (e.g., non-dominated sorting + crowding, decomposition into subproblems, indicator-based selection).
Classic algorithms include:
- NSGA-II (Deb et al., 2002): Non-dominated sorting + crowding distance for diversity.
- MOEA/D (Zhang & Li, 2007): Decomposes the problem into scalar subproblems using weight vectors.
- SPEA2 (Zitzler et al., 2001): Strength-based fitness with density estimation via k-nearest neighbors.
State-of-the-art methods include:
- NSGA-III (Deb & Jain, 2014): Reference-point-based selection for many-objective problems.
- RVEA (Cheng et al., 2016): Angle-penalized distance with adaptive reference vectors.
- AGE-MOEA (Panichella, 2019): Adaptive geometry estimation for survival selection.
There is active research into strategies that combine ideas across these paradigms, adapt to problem geometry, or use novel diversity maintenance mechanisms.
Task
Implement a custom multi-objective evolutionary strategy by modifying the CustomMOEA class in deap/custom_moea.py. You should implement the select, vary, survive, and optionally on_generation methods. The algorithm must work for both 2-objective and 3-objective problems.
Interface
class CustomMOEA:
def __init__(self, pop_size, n_obj, n_var, bounds, cx_eta=20.0, mut_eta=20.0, mut_prob=None):
"""Initialize the MOEA with problem parameters."""
def select(self, population: list, k: int) -> list:
"""Select k parents from the population for mating.
Returns: list of k selected individuals."""
def vary(self, parents: list) -> list:
"""Apply crossover and mutation to produce offspring.
Returns: list of offspring (fitness invalidated)."""
def survive(self, population: list, offspring: list) -> list:
"""Environmental selection: choose pop_size individuals from combined pool.
Returns: list of pop_size individuals for next generation."""
def on_generation(self, gen: int, population: list):
"""Optional per-generation callback for adaptive strategies."""
Ind
Tool Input
1{
2 "op": "replace",
3 "filename": "deap/custom_moea.py",
4 "start_line": 297,
5 "end_line": 441,
6 "content": "class CustomMOEA:\n \"\"\"Adaptive Hybrid MOEA (AH-MOEA): Combines decomposition, dominance, and geometry adaptation.\n\n Novel contributions:\n 1. Adaptive mating selection balancing archive diversity and decomposition convergence.\n 2. Dual survival: decomposition-based selection for convergence + archive for diversity.\n 3. Geometry-aware reference vector adaptation for different front shapes.\n 4. Hybrid variation with occasional differential evolution for exploration.\n \"\"\"\n\n def __init__(\n self,\n pop_size: int,\n n_obj: int,\n n_var: int,\n bounds: Tuple[float, float],\n cx_eta: float = 20.0,\n mut_eta: float = 20.0,\n mut_prob: Optional[float] = None,\n ):\n self.pop_size = pop_size\n self.n_obj = n_obj\n self.n_var = n_var\n self.bounds = bounds\n self.cx_eta = cx_eta\n self.mut_eta = mut_eta\n self.mut_prob = mut_prob if mut_prob is not None else 1.0 / n_var\n \n # Algorithm hyperparameters (hardcoded as per instructions)\n self.de_ratio = 0.3 # probability of using DE instead of SBX\n self.archive_size = pop_size # external archive size\n self.T = 20 # neighborhood size for decomposition\n self.delta = 0.9 # probability of selecting from neighborhood\n self.alpha = 2.0 # penalty parameter for angle-penalized distance\n self.fr = 0.1 # frequency of reference vector adaptation\n \n # Generate reference vectors\n self._generate_reference_vectors()\n \n # Compute neighborhoods based on vector similarity\n self._compute_neighborhoods()\n \n # State variables\n self.archive = [] # external archive of non-dominated solutions\n self.z_star = None # ideal point\n self.z_nad = None # nadir point (estimated)\n self.current_gen = 0\n \n # Adaptive selection parameter\n self.beta = 0.5 # balance between archive and population selection\n\n def _generate_reference_vectors(self):\n \"\"\"Generate uniformly distributed reference vectors.\"\"\"\n if self.n_obj == 2:\n # For 2 objectives: equally spaced weight vectors\n p = self.pop_size - 1\n self.ref_vectors = np.array(tools.uniform_reference_points(self.n_obj, p=p))\n else:\n # For 3+ objectives: use DEAP's uniform reference points\n self.ref_vectors = np.array(tools.uniform_reference_points(self.n_obj, p=12))\n \n # Normalize to unit length\n norms = np.linalg.norm(self.ref_vectors, axis=1, keepdims=True)\n norms[norms < 1e-12] = 1e-12\n self.ref_vectors = self.ref_vectors / norms\n \n # Store initial vectors for scaling\n self.ref_vectors_initial = self.ref_vectors.copy()\n\n def _compute_neighborhoods(self):\n \"\"\"Compute T-nearest neighborhoods based on angular distance.\"\"\"\n from scipy.spatial.distance import cdist\n # Cosine distance between reference vectors\n dist_matrix = cdist(self.ref_vectors, self.ref_vectors, metric='cosine')\n self.neighbors = []\n for i in range(len(self.ref_vectors)):\n idx = np.argsort(dist_matrix[i])[:self.T]\n self.neighbors.append(idx.tolist())\n\n def _tchebycheff(self, fitness_values, weight, z_star):\n \"\"\"Tchebycheff scalarization.\"\"\"\n return max(weight[j] * abs(fitness_values[j] - z_star[j])\n for j in range(self.n_obj))\n\n def _angle_penalized_distance(self, fitness_values):\n \"\"\"Compute angle-penalized distance for RVEA-inspired selection.\"\"\"\n if self.z_star is None or self.z_nad is None:\n return np.zeros(len(fitness_values))\n \n F = np.array(fitness_values)\n n = len(F)\n \n # Translate and normalize\n F_translated = F - self.z_star\n # Avoid division by zero\n scale = self.z_nad - self.z_star\n scale[scale < 1e-12] = 1.0\n F_normalized = F_translated / scale\n \n # Compute angles to reference vectors\n F_norms = np.linalg.norm(F_normalized, axis=1, keepdims=True)\n F_norms[F_norms < 1e-12] = 1e-12\n F_unit = F_normalized / F_norms\n \n # Cosine similarity with reference vectors\n cos_angles = F_unit @ self.ref_vectors.T\n cos_angles = np.clip(cos_angles, -1.0, 1.0)\n angles = np.arccos(cos_angles)\n \n # Associate each individual with closest reference vector\n associations = np.argmin(angles, axis=1)\n min_angles = angles[np.arange(n), associations]\n \n # Convergence distance (L2 norm)\n convergence = F_norms.flatten()\n \n # Angle penalty that increases over generations\n max_gen = 400 # approximate max generations\n gamma = self.alpha * (self.current_gen / max_gen) ** 2\n \n # APD = convergence * (1 + gamma * angle)\n apd = convergence * (1.0 + gamma * min_angles)\n return apd, associations\n\n def _estimate_geometry(self, front_values):\n \"\"\"Estimate front geometry parameter p (Lp-norm).\"\"\"\n if len(front_values) < 2:\n return 2.0 # default spherical\n \n F = np.array(front_values)\n # Normalize\n z_min = np.min(F, axis=0)\n z_max = np.max(F, axis=0)\n scale = z_max - z_min\n scale[scale < 1e-12] = 1.0\n F_norm = (F - z_min) / scale\n \n # Sample a point in the middle of the front\n median_idx = len(F_norm) // 2\n median_point = np.sort(F_norm, axis=0)[median_idx]\n median_point = np.maximum(median_point, 1e-8)\n \n # Binary search for p where sum(median_point^p) ≈ 1\n p_low, p_high = 0.1, 20.0\n for _ in range(30):\n p_mid = (p_low + p_high) / 2\n lp_val = np.sum(median_point ** p_mid)\n if lp_val > 1.0:\n p_low = p_mid\n else:\n p_high = p_mid\n p = (p_low + p_high) / 2\n return max(0.5, min(p, 10.0))\n\n def select(self, population: list, k: int) -> list:\n \"\"\"Adaptive mating selection balancing archive diversity and decomposition convergence.\n \n With probability beta, select from archive (diversity focus).\n Otherwise, select using decomposition-based tournament.\n \"\"\"\n # Update ideal and nadir points\n self._update_reference_points(population + self.archive)\n \n # Ensure archive is not empty\n if not self.archive:\n self.archive = get_nondominated(population)\n \n # Adaptive beta: increase archive usage if diversity is low\n if len(self.archive) > 10:\n # Measure diversity using crowding distance\n compute_crowding_distance(self.archive[:min(50, len(self.archive))])\n avg_crowding = np.mean([ind.fitness.crowding_dist for ind in self.archive[:50] \n if hasattr(ind.fitness, 'crowding_dist')])\n # Adjust beta based on diversity\n self.beta = max(0.2, min(0.8, 0.5 * (1.0 + avg_crowding)))\n \n selected = []\n for _ in range(k):\n # Choose pool based on beta\n if random.random() < self.beta:\n pool = self.archive\n else:\n pool = population\n \n if len(pool) < 2:\n selected.append(deepcopy(random.choice(population)))\n continue\n \n # Binary tournament with decomposition fitness\n i1, i2 = random.sample(range(len(pool)), 2)\n a, b = pool[i1], pool[i2]\n \n # Compute decomposition fitness for each\n if self.z_star is not None:\n # Find closest reference vector for each individual\n a_vals = np.array(a.fitness.values)\n b_vals = np.array(b.fitness.values)\n \n # Normalize\n a_norm = (a_vals - self.z_star) / (self.z_nad - self.z_star + 1e-12)\n b_norm = (b_vals - self.z_star) / (self.z_nad - self.z_star + 1e-12)\n \n # Angles to reference vectors\n cos_a = a_norm @ self.ref_vectors.T / (np.linalg.norm(a_norm) + 1e-12)\n cos_b = b_norm @ self.ref_vectors.T / (np.linalg.norm(b_norm) + 1e-12)\n idx_a = np.argmax(cos_a)\n idx_b = np.argmax(cos_b)\n \n # Tchebycheff scalarization\n g_a = self._tchebycheff(a.fitness.values, self.ref_vectors[idx_a], self.z_star)\n g_b = self._tchebycheff(b.fitness.values, self.ref_vectors[idx_b], self.z_star)\n \n if g_a < g_b:\n selected.append(deepcopy(a))\n elif g_b < g_a:\n selected.append(deepcopy(b))\n else:\n # Tie-break by crowding distance if available\n if hasattr(a.fitness, 'crowding_dist') and hasattr(b.fitness, 'crowding_dist'):\n if a.fitness.crowding_dist > b.fitness.crowding_dist:\n selected.append(deepcopy(a))\n else:\n selected.append(deepcopy(b))\n else:\n selected.append(deepcopy(random.choice([a, b])))\n else:\n # Fallback to dominance tournament\n if a.fitness.dominates(b.fitness):\n selected.append(deepcopy(a))\n elif b.fitness.dominates(a.fitness):\n selected.append(deepcopy(b))\n else:\n selected.append(deepcopy(random.choice([a, b])))\n \n return selected\n\n def vary(self, parents: list) -> list:\n \"\"\"Hybrid variation: SBX crossover + polynomial mutation + occasional DE.\n \n With \n... (truncated)"
7}OK: Replaced lines 297..441 with 474 line(s) in deap/custom_moea.py. Editable range: 297–770. [Current file: deap/custom_moea.py | editable: 297–770 | total: 919 lines] 294| # ================================================================ 295| 296| 297 class CustomMOEA: 298 """Adaptive Hybrid MOEA (AH-MOEA): Combines decomposition, dominance, and geometry adaptation. 299 300 Novel contributions: 301 1. Adaptive mating selection balancing archive diversity and decomposition convergence. 302 2. Dual survival: decomposition-based selection for convergence + archive for diversity. 303 3. Geometry-aware reference vector adaptation for different front shapes. 304 4. Hybrid variation with occasional differential evolution for exploration. 305 """ 306 307 def __init__( 308 self, 309 pop_size: int, 310 n_obj: int, 311 n_var: int, 312 bounds: Tup
[Test #1] (2 tests remaining; call submit(n=1) to submit this result) ## Seed 42 ### zdt1 (scripts/zdt1.sh) Training progress (last generations): TRAIN_METRICS gen=120 hv=0.860897 igd=0.009058 spread=0.330355 front_size=99 TRAIN_METRICS gen=140 hv=0.862020 igd=0.008528 spread=0.374336 front_size=98 TRAIN_METRICS gen=160 hv=0.862188 igd=0.008472 spread=0.377132 front_size=96 TRAIN_METRICS gen=180 hv=0.869116 igd=0.005552 spread=0.368806 front_size=100 TRAIN_METRICS gen=200 hv=0.862560 igd=0.008784 spread=0.420037 front_size=97 Test results (zdt1): hv: 0.862560 igd: 0.008784 spread: 0.420037 ### zdt3 (scripts/zdt3.sh) Training progress (last generations): TRAIN_METRICS gen=120 hv=1.303577 igd=0.010491 spread=0.540827 front_size=73 TRAIN_METRICS gen=140 hv=1.301860 igd=0.012297 spread=0.636962 front_size=75 TRAIN_METRICS gen=160 hv=1.303662 igd=0.011451 spread=0.553550 front_size=72 TRAIN_METRICS gen=180 hv=1.304018 igd=0.011917 spread=0.636737 front_size=73 TRAIN_METRICS gen=20
Tool Input
1{
2 "op": "replace",
3 "filename": "deap/custom_moea.py",
4 "start_line": 297,
5 "end_line": 770,
6 "content": "class CustomMOEA:\n \"\"\"AGED-MOEA: Adaptive Geometry Estimation and Decomposition for MOEA.\n\n Novel contributions:\n 1. Geometry-aware selection: switches between decomposition and dominance\n based on estimated front shape (Lp-norm parameter).\n 2. Adaptive reference vectors: adjust weight distribution based on geometry.\n 3. Dual archive: maintains both convergence and diversity archives.\n 4. Smart variation: uses DE only when diversity is low.\n \"\"\"\n\n def __init__(\n self,\n pop_size: int,\n n_obj: int,\n n_var: int,\n bounds: Tuple[float, float],\n cx_eta: float = 20.0,\n mut_eta: float = 20.0,\n mut_prob: Optional[float] = None,\n ):\n self.pop_size = pop_size\n self.n_obj = n_obj\n self.n_var = n_var\n self.bounds = bounds\n self.cx_eta = cx_eta\n self.mut_eta = mut_eta\n self.mut_prob = mut_prob if mut_prob is not None else 1.0 / n_var\n \n # Hyperparameters\n self.de_prob = 0.2 # DE probability (lower than before)\n self.convergence_archive_size = pop_size // 2\n self.diversity_archive_size = pop_size // 2\n self.geometry_estimation_freq = 10 # generations\n \n # State\n self.convergence_archive = []\n self.diversity_archive = []\n self.z_star = None\n self.z_nad = None\n self.current_gen = 0\n self.estimated_p = 2.0 # default spherical\n self.selection_mode = 'hybrid' # 'decomposition', 'dominance', or 'hybrid'\n \n # Generate reference vectors\n self._generate_reference_vectors()\n \n # Performance tracking\n self.diversity_history = []\n\n def _generate_reference_vectors(self):\n \"\"\"Generate reference vectors based on number of objectives.\"\"\"\n if self.n_obj == 2:\n # For 2 objectives: equally spaced weight vectors\n p = self.pop_size - 1\n self.ref_vectors = np.array(tools.uniform_reference_points(self.n_obj, p=p))\n else:\n # For 3+ objectives: use DEAP's uniform reference points\n self.ref_vectors = np.array(tools.uniform_reference_points(self.n_obj, p=12))\n \n # Normalize\n norms = np.linalg.norm(self.ref_vectors, axis=1, keepdims=True)\n norms[norms < 1e-12] = 1e-12\n self.ref_vectors = self.ref_vectors / norms\n\n def _estimate_geometry(self, front_values):\n \"\"\"Estimate Lp-norm parameter of Pareto front.\n Returns p where lower p (~1) indicates linear front, p~2 spherical.\"\"\"\n if len(front_values) < 3:\n return 2.0\n \n F = np.array(front_values)\n # Normalize to [0,1] per objective\n z_min = np.min(F, axis=0)\n z_max = np.max(F, axis=0)\n scale = z_max - z_min\n scale[scale < 1e-12] = 1.0\n F_norm = (F - z_min) / scale\n \n # Sample several points from the front\n n_samples = min(10, len(F_norm))\n indices = np.linspace(0, len(F_norm)-1, n_samples, dtype=int)\n sample_points = F_norm[indices]\n \n # For each sample, find p that satisfies sum(|x_i|^p) ≈ 1\n p_estimates = []\n for point in sample_points:\n point = np.maximum(point, 1e-8)\n # Binary search for p\n p_low, p_high = 0.5, 10.0\n for _ in range(20):\n p_mid = (p_low + p_high) / 2\n lp_val = np.sum(point ** p_mid)\n if lp_val > 1.0:\n p_low = p_mid\n else:\n p_high = p_mid\n p_estimates.append((p_low + p_high) / 2)\n \n return np.median(p_estimates)\n\n def _update_reference_points(self, individuals):\n \"\"\"Update ideal and nadir points.\"\"\"\n if not individuals:\n return\n \n fitness_values = [ind.fitness.values for ind in individuals if ind.fitness.valid]\n if not fitness_values:\n return\n \n F = np.array(fitness_values)\n \n # Ideal point\n if self.z_star is None:\n self.z_star = np.min(F, axis=0)\n else:\n self.z_star = np.minimum(self.z_star, np.min(F, axis=0))\n \n # Nadir point from non-dominated solutions\n nd_front = get_nondominated(individuals)\n if nd_front:\n nd_values = np.array([ind.fitness.values for ind in nd_front])\n self.z_nad = np.max(nd_values, axis=0)\n else:\n self.z_nad = np.max(F, axis=0)\n \n # Ensure nadir > ideal\n for j in range(self.n_obj):\n if self.z_nad[j] <= self.z_star[j]:\n self.z_nad[j] = self.z_star[j] + 1e-6\n\n def _update_archives(self, individuals):\n \"\"\"Update convergence and diversity archives.\"\"\"\n nd_front = get_nondominated(individuals)\n if not nd_front:\n return\n \n # Convergence archive: keep best solutions per reference vector\n if self.z_star is not None and self.z_nad is not None:\n # Normalize fitness values\n nd_values = np.array([ind.fitness.values for ind in nd_front])\n F_norm = (nd_values - self.z_star) / (self.z_nad - self.z_star + 1e-12)\n \n # Associate each solution with closest reference vector\n cos_sim = F_norm @ self.ref_vectors.T\n cos_sim = cos_sim / (np.linalg.norm(F_norm, axis=1, keepdims=True) + 1e-12)\n associations = np.argmax(cos_sim, axis=1)\n \n # Keep best per reference vector (closest to ideal)\n convergence_candidates = []\n for v in range(len(self.ref_vectors)):\n mask = np.where(associations == v)[0]\n if len(mask) > 0:\n # Select solution with smallest distance to ideal\n distances = np.linalg.norm(F_norm[mask], axis=1)\n best_idx = mask[np.argmin(distances)]\n convergence_candidates.append(nd_front[best_idx])\n \n # Limit size\n if len(convergence_candidates) > self.convergence_archive_size:\n # Sort by distance to ideal\n conv_values = np.array([ind.fitness.values for ind in convergence_candidates])\n conv_dist = np.linalg.norm((conv_values - self.z_star) / (self.z_nad - self.z_star + 1e-12), axis=1)\n sorted_idx = np.argsort(conv_dist)\n self.convergence_archive = [convergence_candidates[i] for i in \n sorted_idx[:self.convergence_archive_size]]\n else:\n self.convergence_archive = [deepcopy(ind) for ind in convergence_candidates]\n \n # Diversity archive: maintain diverse solutions\n self.diversity_archive = [deepcopy(ind) for ind in nd_front]\n if len(self.diversity_archive) > self.diversity_archive_size:\n compute_crowding_distance(self.diversity_archive)\n self.diversity_archive.sort(key=lambda x: x.fitness.crowding_dist, reverse=True)\n self.diversity_archive = self.diversity_archive[:self.diversity_archive_size]\n\n def _compute_diversity(self, population):\n \"\"\"Compute population diversity metric.\"\"\"\n if len(population) < 2:\n return 0.0\n \n fitness_values = np.array([ind.fitness.values for ind in population])\n # Average nearest neighbor distance\n from scipy.spatial.distance import cdist\n dist_matrix = cdist(fitness_values, fitness_values)\n np.fill_diagonal(dist_matrix, np.inf)\n min_dists = np.min(dist_matrix, axis=1)\n return np.mean(min_dists)\n\n def select(self, population: list, k: int) -> list:\n \"\"\"Geometry-aware selection.\n \n If p < 1.5 (linear front): use decomposition-based selection.\n If p > 1.8 (spherical front): use decomposition-based selection.\n Otherwise: hybrid selection mixing decomposition and dominance.\n \"\"\"\n self._update_reference_points(population)\n \n # Update geometry estimation periodically\n if self.current_gen % self.geometry_estimation_freq == 0:\n nd_front = get_nondominated(population)\n if len(nd_front) >= 3:\n front_values = np.array([ind.fitness.values for ind in nd_front])\n self.estimated_p = self._estimate_geometry(front_values)\n \n # Determine selection mode\n if self.estimated_p < 1.3:\n self.selection_mode = 'decomposition' # linear front\n elif self.estimated_p > 1.7:\n self.selection_mode = 'decomposition' # spherical front\n else:\n self.selection_mode = 'hybrid' # intermediate/disconnected\n \n # Update archives\n self._update_archives(population)\n \n # Choose selection pool based on mode\n if self.selection_mode == 'decomposition' or not self.diversity_archive:\n pool = population\n elif self.selection_mode == 'dominance':\n pool = self.diversity_archive if self.diversity_archive else population\n else: # hybrid\n # Mix convergence and diversity archives\n pool = self.convergence_archive + self.diversity_archive\n if not pool:\n pool = population\n \n # Binary tournament selection\n selected = []\n for _ in range(k):\n if len(pool) < 2:\n selected.append(deepcopy(random.choice(population)))\n continue\n \n i1, i2 = random.sample(range(len(pool)), 2)\n a, b = pool[i1], pool[i2]\n \n # Selection criteria depends on mode\n if self.selection_mode == 'decomposition' and self.z_star is not None:\n # Decomposition-based: prefer smaller Tchebyc\n... (truncated)"
7}OK: Replaced lines 297..770 with 462 line(s) in deap/custom_moea.py. Editable range: 297–758. [Current file: deap/custom_moea.py | editable: 297–758 | total: 907 lines] 294| # ================================================================ 295| 296| 297 class CustomMOEA: 298 """AGED-MOEA: Adaptive Geometry Estimation and Decomposition for MOEA. 299 300 Novel contributions: 301 1. Geometry-aware selection: switches between decomposition and dominance 302 based on estimated front shape (Lp-norm parameter). 303 2. Adaptive reference vectors: adjust weight distribution based on geometry. 304 3. Dual archive: maintains both convergence and diversity archives. 305 4. Smart variation: uses DE only when diversity is low. 306 """ 307 308 def __init__( 309 self, 310 pop_size: int, 311 n_obj: int, 312 n_var: int, 313 bounds: Tuple[float,
[Test #2] (1 test remaining; call submit(n=2) to submit this result) ### zdt1 (scripts/zdt1.sh) Training progress (last generations): TRAIN_METRICS gen=120 hv=0.756900 igd=0.157408 spread=0.517127 front_size=88 TRAIN_METRICS gen=140 hv=0.745312 igd=0.178481 spread=0.420923 front_size=91 TRAIN_METRICS gen=160 hv=0.745037 igd=0.178501 spread=0.364183 front_size=86 TRAIN_METRICS gen=180 hv=0.764978 igd=0.150259 spread=0.478419 front_size=89 TRAIN_METRICS gen=200 hv=0.772851 igd=0.134499 spread=0.432505 front_size=85 Test results (zdt1): hv: 0.772851 igd: 0.134499 spread: 0.432505 ### zdt3 (scripts/zdt3.sh) Training progress (last generations): TRAIN_METRICS gen=120 hv=1.284060 igd=0.018733 spread=0.671488 front_size=64 TRAIN_METRICS gen=140 hv=1.295185 igd=0.013641 spread=0.622479 front_size=72 TRAIN_METRICS gen=160 hv=1.312078 igd=0.009921 spread=0.568075 front_size=70 TRAIN_METRICS gen=180 hv=1.310079 igd=0.011021 spread=0.590134 front_size=70 TRAIN_METRICS gen=200 hv=1.306876 i
Tool Input
1{
2 "op": "replace",
3 "filename": "deap/custom_moea.py",
4 "start_line": 297,
5 "end_line": 758,
6 "content": "class CustomMOEA:\n \"\"\"ARV-MOEA: Adaptive Reference Vector based MOEA with Archive.\n \n Combines RVEA's angle-penalized distance with an external archive.\n Novel aspects:\n 1. Dual selection: decomposition for convergence, archive for diversity.\n 2. Adaptive penalty parameter based on front convergence.\n 3. Archive injection to maintain diversity in disconnected regions.\n 4. Simplified yet effective variation (SBX + polynomial mutation).\n \"\"\"\n\n def __init__(\n self,\n pop_size: int,\n n_obj: int,\n n_var: int,\n bounds: Tuple[float, float],\n cx_eta: float = 20.0,\n mut_eta: float = 20.0,\n mut_prob: Optional[float] = None,\n ):\n self.pop_size = pop_size\n self.n_obj = n_obj\n self.n_var = n_var\n self.bounds = bounds\n self.cx_eta = cx_eta\n self.mut_eta = mut_eta\n self.mut_prob = mut_prob if mut_prob is not None else 1.0 / n_var\n \n # Hyperparameters\n self.archive_size = pop_size\n self.alpha = 2.0 # penalty parameter for APD\n self.selection_ratio = 0.7 # probability of selecting from population vs archive\n \n # Generate reference vectors\n self._generate_reference_vectors()\n \n # State\n self.archive = []\n self.z_star = None\n self.z_nad = None\n self.current_gen = 0\n \n def _generate_reference_vectors(self):\n \"\"\"Generate uniformly distributed reference vectors.\"\"\"\n if self.n_obj == 2:\n # For 2 objectives: equally spaced weight vectors\n p = self.pop_size - 1\n self.ref_vectors = np.array(tools.uniform_reference_points(self.n_obj, p=p))\n else:\n # For 3+ objectives: use DEAP's uniform reference points\n self.ref_vectors = np.array(tools.uniform_reference_points(self.n_obj, p=12))\n \n # Normalize to unit length\n norms = np.linalg.norm(self.ref_vectors, axis=1, keepdims=True)\n norms[norms < 1e-12] = 1e-12\n self.ref_vectors = self.ref_vectors / norms\n \n def _update_reference_points(self, individuals):\n \"\"\"Update ideal and nadir points.\"\"\"\n if not individuals:\n return\n \n fitness_values = [ind.fitness.values for ind in individuals if ind.fitness.valid]\n if not fitness_values:\n return\n \n F = np.array(fitness_values)\n \n # Ideal point\n if self.z_star is None:\n self.z_star = np.min(F, axis=0)\n else:\n self.z_star = np.minimum(self.z_star, np.min(F, axis=0))\n \n # Nadir point from non-dominated solutions\n nd_front = get_nondominated(individuals)\n if nd_front:\n nd_values = np.array([ind.fitness.values for ind in nd_front])\n self.z_nad = np.max(nd_values, axis=0)\n else:\n self.z_nad = np.max(F, axis=0)\n \n # Ensure nadir > ideal\n for j in range(self.n_obj):\n if self.z_nad[j] <= self.z_star[j]:\n self.z_nad[j] = self.z_star[j] + 1e-6\n \n def _update_archive(self, individuals):\n \"\"\"Update external archive with non-dominated solutions.\"\"\"\n nd_front = get_nondominated(individuals)\n if not nd_front:\n return\n \n # Merge with archive\n candidate = nd_front + self.archive\n candidate_nd = get_nondominated(candidate)\n \n # Limit size using crowding distance\n if len(candidate_nd) <= self.archive_size:\n self.archive = [deepcopy(ind) for ind in candidate_nd]\n else:\n compute_crowding_distance(candidate_nd)\n candidate_nd.sort(key=lambda x: x.fitness.crowding_dist, reverse=True)\n self.archive = [deepcopy(ind) for ind in candidate_nd[:self.archive_size]]\n \n def _angle_penalized_distance(self, fitness_values):\n \"\"\"Compute angle-penalized distance (APD) for selection.\"\"\"\n if self.z_star is None or self.z_nad is None:\n return np.zeros(len(fitness_values)), np.zeros(len(fitness_values), dtype=int)\n \n F = np.array(fitness_values)\n n = len(F)\n \n # Translate and normalize\n F_translated = F - self.z_star\n scale = self.z_nad - self.z_star\n scale[scale < 1e-12] = 1.0\n F_normalized = F_translated / scale\n \n # Compute angles to reference vectors\n F_norms = np.linalg.norm(F_normalized, axis=1, keepdims=True)\n F_norms[F_norms < 1e-12] = 1e-12\n F_unit = F_normalized / F_norms\n \n cos_angles = F_unit @ self.ref_vectors.T\n cos_angles = np.clip(cos_angles, -1.0, 1.0)\n angles = np.arccos(cos_angles)\n \n # Associate each individual with closest reference vector\n associations = np.argmin(angles, axis=1)\n min_angles = angles[np.arange(n), associations]\n \n # Convergence distance (L2 norm)\n convergence = F_norms.flatten()\n \n # Adaptive penalty: increase as generations progress\n max_gen = 400\n gamma = self.alpha * (self.current_gen / max_gen) ** 2\n \n # APD = convergence * (1 + gamma * angle)\n apd = convergence * (1.0 + gamma * min_angles)\n return apd, associations\n \n def select(self, population: list, k: int) -> list:\n \"\"\"Dual selection: with probability selection_ratio, choose from population\n using decomposition fitness; otherwise from archive for diversity.\"\"\"\n self._update_reference_points(population + self.archive)\n self._update_archive(population)\n \n # Ensure archive is not empty\n if not self.archive:\n self.archive = get_nondominated(population)\n \n selected = []\n for _ in range(k):\n # Choose pool\n if random.random() < self.selection_ratio:\n pool = population\n else:\n pool = self.archive\n \n if len(pool) < 2:\n selected.append(deepcopy(random.choice(population)))\n continue\n \n # Binary tournament based on decomposition fitness\n i1, i2 = random.sample(range(len(pool)), 2)\n a, b = pool[i1], pool[i2]\n \n if self.z_star is not None:\n # Compute APD for each\n fitness_list = [a.fitness.values, b.fitness.values]\n apd, _ = self._angle_penalized_distance(fitness_list)\n if apd[0] < apd[1]:\n selected.append(deepcopy(a))\n elif apd[1] < apd[0]:\n selected.append(deepcopy(b))\n else:\n # Tie-break by crowding distance if available\n if (hasattr(a.fitness, 'crowding_dist') and \n hasattr(b.fitness, 'crowding_dist')):\n if a.fitness.crowding_dist > b.fitness.crowding_dist:\n selected.append(deepcopy(a))\n else:\n selected.append(deepcopy(b))\n else:\n selected.append(deepcopy(random.choice([a, b])))\n else:\n # Fallback to dominance\n if a.fitness.dominates(b.fitness):\n selected.append(deepcopy(a))\n elif b.fitness.dominates(a.fitness):\n selected.append(deepcopy(b))\n else:\n selected.append(deepcopy(random.choice([a, b])))\n \n return selected\n \n def vary(self, parents: list) -> list:\n \"\"\"Standard SBX crossover + polynomial mutation.\"\"\"\n offspring = [deepcopy(ind) for ind in parents]\n lo, hi = self.bounds\n \n # Pairwise crossover\n for i in range(0, len(offspring) - 1, 2):\n if random.random() < 0.9:\n tools.cxSimulatedBinaryBounded(\n offspring[i], offspring[i + 1],\n eta=self.cx_eta, low=lo, up=hi,\n )\n del offspring[i].fitness.values\n del offspring[i + 1].fitness.values\n \n # Mutation\n for ind in offspring:\n if random.random() < 1.0:\n tools.mutPolynomialBounded(\n ind, eta=self.mut_eta, low=lo, up=hi, indpb=self.mut_prob,\n )\n del ind.fitness.values\n \n return offspring\n \n def survive(self, population: list, offspring: list) -> list:\n \"\"\"RVEA-inspired survival with archive injection.\n \n Uses angle-penalized distance to select individuals, but also\n injects archive members to maintain diversity in disconnected regions.\n \"\"\"\n combined = population + offspring\n valid = [ind for ind in combined if ind.fitness.valid]\n \n if not valid:\n return population[:self.pop_size]\n \n self._update_reference_points(valid)\n self._update_archive(valid)\n \n # Compute APD for all individuals\n fitness_values = [ind.fitness.values for ind in valid]\n apd, associations = self._angle_penalized_distance(fitness_values)\n \n # Select one individual per reference vector (lowest APD)\n selected_indices = set()\n n_ref = len(self.ref_vectors)\n \n for v in range(n_ref):\n mask = np.where(associations == v)[0]\n if len(mask) > 0:\n best_idx = mask[np.argmin(apd[mask])]\n selected_indices.add(best_idx)\n \n # If not enough, fill with best remaining by APD\n if len(selected_indices) < self.pop_size:\n remaining = [i for i in range(len(valid)) if i not in selected_indices]\n remaining.sort(key=lambda i: apd[i])\n for i in remaining:\n select\n... (truncated)"
7}OK: Replaced lines 297..758 with 294 line(s) in deap/custom_moea.py. Editable range: 297–590. [Current file: deap/custom_moea.py | editable: 297–590 | total: 739 lines] 294| # ================================================================ 295| 296| 297 class CustomMOEA: 298 """ARV-MOEA: Adaptive Reference Vector based MOEA with Archive. 299 300 Combines RVEA's angle-penalized distance with an external archive. 301 Novel aspects: 302 1. Dual selection: decomposition for convergence, archive for diversity. 303 2. Adaptive penalty parameter based on front convergence. 304 3. Archive injection to maintain diversity in disconnected regions. 305 4. Simplified yet effective variation (SBX + polynomial mutation). 306 """ 307 308 def __init__( 309 self, 310 pop_size: int, 311 n_obj: int, 312 n_var: int, 313 bounds: Tuple[float, float
[Test #3] (0 tests remaining) ## Seed 42 ### zdt1 (scripts/zdt1.sh) Training progress (last generations): TRAIN_METRICS gen=120 hv=0.779242 igd=0.119538 spread=0.651390 front_size=72 TRAIN_METRICS gen=140 hv=0.724591 igd=0.202625 spread=0.573024 front_size=78 TRAIN_METRICS gen=160 hv=0.755815 igd=0.160691 spread=0.462179 front_size=90 TRAIN_METRICS gen=180 hv=0.759528 igd=0.157880 spread=0.477062 front_size=88 TRAIN_METRICS gen=200 hv=0.787568 igd=0.116766 spread=0.529487 front_size=89 Test results (zdt1): hv: 0.787568 igd: 0.116766 spread: 0.529487 ### zdt3 (scripts/zdt3.sh) Training progress (last generations): TRAIN_METRICS gen=120 hv=1.017651 igd=0.237348 spread=0.634405 front_size=62 TRAIN_METRICS gen=140 hv=1.031169 igd=0.229367 spread=0.608566 front_size=70 TRAIN_METRICS gen=160 hv=1.078413 igd=0.200276 spread=0.612846 front_size=68 TRAIN_METRICS gen=180 hv=1.083101 igd=0.196032 spread=0.545567 front_size=66 TRAIN_METRICS gen=200 hv=1.083365 igd=0.195934 spread=0.50345
Tool Input
1{
2 "op": "replace",
3 "filename": "deap/custom_moea.py",
4 "start_line": 297,
5 "end_line": 441,
6 "content": "class CustomMOEA:\n \"\"\"Adaptive Reference Vector Guided Evolutionary Algorithm with Decomposition and Non-dominated Sorting (ARV-NS).\"\"\"\n\n def __init__(\n self,\n pop_size: int,\n n_obj: int,\n n_var: int,\n bounds: Tuple[float, float],\n cx_eta: float = 20.0,\n mut_eta: float = 20.0,\n mut_prob: Optional[float] = None,\n ):\n self.pop_size = pop_size\n self.n_obj = n_obj\n self.n_var = n_var\n self.bounds = bounds\n self.cx_eta = cx_eta\n self.mut_eta = mut_eta\n self.mut_prob = mut_prob if mut_prob is not None else 1.0 / n_var\n\n # Adaptive parameters\n self.alpha = 2.0 # penalty parameter for angle-penalized distance\n self.fr = 0.1 # frequency of reference vector adaptation\n self.p_cross = 0.9 # crossover probability (adaptive)\n self.p_mut = 1.0 # mutation probability (adaptive)\n \n # Generate reference vectors (unit vectors)\n self.ref_vectors = self._generate_reference_vectors(pop_size, n_obj)\n self.ref_vectors_initial = self.ref_vectors.copy()\n \n # Ideal and nadir points\n self.z_star = None # ideal point (min values)\n self.z_nadir = None # nadir point (max values)\n \n # Archive for elite solutions\n self.archive = []\n self.archive_size = pop_size // 2\n \n # Geometry estimation\n self.geometry_p = 1.0 # Lp norm parameter, estimated\n \n def _generate_reference_vectors(self, n, n_obj):\n \"\"\"Generate uniformly distributed unit reference vectors.\"\"\"\n if n_obj == 2:\n # For 2 objectives, generate evenly spaced vectors\n angles = np.linspace(0, np.pi/2, n)\n vectors = np.column_stack([np.cos(angles), np.sin(angles)])\n else:\n # For 3+ objectives, use DEAP's uniform reference points\n ref_points = tools.uniform_reference_points(n_obj, p=12)\n vectors = np.array(ref_points)\n \n # Normalize to unit length\n norms = np.linalg.norm(vectors, axis=1, keepdims=True)\n norms[norms < 1e-12] = 1e-12\n vectors = vectors / norms\n return vectors\n \n def _estimate_geometry(self, front_values):\n \"\"\"Estimate the geometry parameter p of the Pareto front.\n \n Uses the relationship between Lp-norm and front shape:\n p=1: linear front (like DTLZ1)\n p=2: spherical front (like DTLZ2)\n p->inf: rectangular front\n \"\"\"\n if len(front_values) < 2:\n return 1.0\n \n F = np.array(front_values)\n \n # Normalize objectives\n z_min = np.min(F, axis=0)\n z_max = np.max(F, axis=0)\n scale = z_max - z_min\n scale[scale < 1e-12] = 1.0\n F_norm = (F - z_min) / scale\n \n # Avoid zero values\n F_norm = np.maximum(F_norm, 1e-8)\n \n # Use median point to estimate p\n median_idx = len(F_norm) // 2\n median_point = F_norm[median_idx]\n \n # Binary search for p that satisfies sum(median_point^p) ≈ 1\n p_low, p_high = 0.1, 20.0\n for _ in range(30):\n p_mid = (p_low + p_high) / 2\n lp_val = np.sum(median_point ** p_mid)\n if lp_val > 1.0:\n p_low = p_mid\n else:\n p_high = p_mid\n \n p = (p_low + p_high) / 2\n return max(0.1, min(p, 20.0))\n \n def _associate(self, fitness_values):\n \"\"\"Associate each solution to its closest reference vector.\n \n Returns:\n associations: index of closest reference vector for each solution\n angles: angle to closest reference vector\n \"\"\"\n F = np.array(fitness_values)\n n = len(F)\n n_ref = len(self.ref_vectors)\n \n if n == 0:\n return np.array([]), np.array([])\n \n # Translate objectives (subtract ideal point)\n if self.z_star is None:\n self.z_star = np.min(F, axis=0)\n F_translated = F - self.z_star + 1e-12\n \n # Normalize translated fitness vectors\n F_norms = np.linalg.norm(F_translated, axis=1, keepdims=True)\n F_norms[F_norms < 1e-12] = 1e-12\n F_normalized = F_translated / F_norms\n \n # Compute cosine similarity with reference vectors\n cos_angles = F_normalized @ self.ref_vectors.T # (n, n_ref)\n cos_angles = np.clip(cos_angles, -1.0, 1.0)\n angles = np.arccos(cos_angles) # (n, n_ref)\n \n # Find closest reference vector for each solution\n associations = np.argmin(angles, axis=1)\n min_angles = angles[np.arange(n), associations]\n \n return associations, min_angles\n \n def _compute_apd(self, fitness_values, associations, angles, gen, max_gen):\n \"\"\"Compute angle-penalized distance for each solution.\"\"\"\n F = np.array(fitness_values)\n \n # Translate objectives\n if self.z_star is None:\n self.z_star = np.min(F, axis=0)\n F_translated = F - self.z_star + 1e-12\n \n # Convergence distance (norm)\n convergence = np.linalg.norm(F_translated, axis=1)\n \n # Angle penalty that increases over generations\n gamma = self.alpha * (gen / max(max_gen, 1)) ** 2\n \n # APD = convergence * (1 + gamma * angle)\n apd = convergence * (1.0 + gamma * angles)\n return apd\n \n def _decomposition_score(self, fitness_values, weight_vector):\n \"\"\"Compute decomposition score using Tchebycheff scalarization.\"\"\"\n if self.z_star is None:\n self.z_star = np.min(fitness_values, axis=0)\n \n # Normalize objectives\n if self.z_nadir is None:\n self.z_nadir = np.max(fitness_values, axis=0)\n \n # Avoid division by zero\n norm_range = self.z_nadir - self.z_star\n norm_range[norm_range < 1e-12] = 1.0\n \n F_norm = (fitness_values - self.z_star) / norm_range\n \n # Tchebycheff scalarization\n return np.max(weight_vector * F_norm, axis=1)\n \n def select(self, population: list, k: int) -> list:\n \"\"\"Hybrid selection: tournament based on dominance rank and decomposition score.\"\"\"\n # First, compute non-dominated fronts\n fronts = tools.sortNondominated(population, len(population), first_front_only=False)\n \n # Assign rank to each individual\n for rank, front in enumerate(fronts):\n for ind in front:\n ind._rank = rank\n \n # Generate weight vectors for decomposition (same as reference vectors)\n weights = self.ref_vectors\n \n # Compute decomposition score for each individual\n fitness_values = np.array([ind.fitness.values for ind in population])\n for i, ind in enumerate(population):\n # Find closest weight vector\n F_norm = (fitness_values[i] - self.z_star) if self.z_star is not None else fitness_values[i]\n if self.z_star is not None and self.z_nadir is not None:\n norm_range = self.z_nadir - self.z_star\n norm_range[norm_range < 1e-12] = 1.0\n F_norm = F_norm / norm_range\n \n # Cosine similarity with all weight vectors\n cos_sim = F_norm @ weights.T\n closest_idx = np.argmax(cos_sim)\n weight = weights[closest_idx]\n \n # Tchebycheff scalarization\n techeby = np.max(weight * F_norm)\n ind._decomp_score = techeby\n \n # Binary tournament selection\n selected = []\n for _ in range(k):\n i1, i2 = random.sample(range(len(population)), 2)\n a, b = population[i1], population[i2]\n \n # Compare by rank first\n if a._rank < b._rank:\n selected.append(deepcopy(a))\n elif b._rank < a._rank:\n selected.append(deepcopy(b))\n else:\n # Equal rank: compare decomposition score (lower is better)\n if a._decomp_score < b._decomp_score:\n selected.append(deepcopy(a))\n elif b._decomp_score < a._decomp_score:\n selected.append(deepcopy(b))\n else:\n selected.append(deepcopy(random.choice([a, b])))\n \n return selected\n \n def vary(self, parents: list) -> list:\n \"\"\"Adaptive variation with differential evolution crossover and polynomial mutation.\"\"\"\n offspring = [deepcopy(ind) for ind in parents]\n lo, hi = self.bounds\n \n # Adaptive crossover probability based on population diversity\n # If geometry p is close to 2 (spherical), use higher crossover for exploration\n # If p is far from 2, use more mutation\n if self.geometry_p > 1.5 and self.geometry_p < 2.5:\n # Spherical front: favor crossover\n p_cross = 0.9\n p_mut = 0.8\n else:\n # Non-spherical front: favor mutation\n p_cross = 0.7\n p_mut = 1.0\n \n # Pairwise SBX crossover\n for i in range(0, len(offspring) - 1, 2):\n if random.random() < p_cross:\n tools.cxSimulatedBinaryBounded(\n offspring[i], offspring[i + 1],\n eta=self.cx_eta, low=lo, up=hi,\n )\n del offspring[i].fitness.values\n del offspring[i + 1].fitness.values\n \n # Differential evolution operator for additional exploration\n # Apply to a subset of offspring\n de_count = int(len(offspring) * 0.3)\n for _ in range(de_count):\n idx = random.randint(0, len(offspring)-1)\n if len(parents) >= 3:\n a, b, c = random.sample(parents, 3)\n child = deepcopy(offspring[id\n... (truncated)"
7}OK: Replaced lines 297..441 with 401 line(s) in deap/custom_moea.py. Editable range: 297–697. [Current file: deap/custom_moea.py | editable: 297–697 | total: 846 lines] 294| # ================================================================ 295| 296| 297 class CustomMOEA: 298 """Adaptive Reference Vector Guided Evolutionary Algorithm with Decomposition and Non-dominated Sorting (ARV-NS).""" 299 300 def __init__( 301 self, 302 pop_size: int, 303 n_obj: int, 304 n_var: int, 305 bounds: Tuple[float, float], 306 cx_eta: float = 20.0, 307 mut_eta: float = 20.0, 308 mut_prob: Optional[float] = None, 309 ): 310 self.pop_size = pop_size 311 self.n_obj = n_obj 312 self.n_var = n_var 313 self.bounds = bounds 314 self.cx_eta = cx_eta 315 self.mut_eta = mut_eta 316 self.mut_pr
[Test #1] (2 tests remaining; call submit(n=1) to submit this result) ### zdt1 (scripts/zdt1.sh) Training progress (last generations): TRAIN_METRICS gen=120 hv=0.863017 igd=0.008517 spread=0.551657 front_size=100 TRAIN_METRICS gen=140 hv=0.865758 igd=0.006317 spread=0.478386 front_size=100 TRAIN_METRICS gen=160 hv=0.867265 igd=0.005561 spread=0.466313 front_size=100 TRAIN_METRICS gen=180 hv=0.867750 igd=0.005342 spread=0.454128 front_size=100 TRAIN_METRICS gen=200 hv=0.868058 igd=0.005269 spread=0.441928 front_size=100 Test results (zdt1): hv: 0.868058 igd: 0.005269 spread: 0.441928 ### zdt3 (scripts/zdt3.sh) Training progress (last generations): TRAIN_METRICS gen=120 hv=1.319946 igd=0.008808 spread=0.966956 front_size=100 TRAIN_METRICS gen=140 hv=1.323417 igd=0.007738 spread=0.851517 front_size=100 TRAIN_METRICS gen=160 hv=1.324431 igd=0.007770 spread=0.832172 front_size=100 TRAIN_METRICS gen=180 hv=1.325198 igd=0.007385 spread=0.794150 front_size=100 TRAIN_METRICS gen=200 hv=
Tool Input
1{
2 "op": "replace",
3 "filename": "deap/custom_moea.py",
4 "start_line": 297,
5 "end_line": 441,
6 "content": "class CustomMOEA:\n \"\"\"Hybrid Adaptive Reference Vector Evolutionary Algorithm with Dominance Sorting (HARVEA-DS).\n \n Combines decomposition-based diversity (reference vectors) with dominance-based\n convergence (non-dominated sorting). Features:\n - Adaptive reference vectors that adjust to Pareto front geometry\n - Non-dominated sorting for convergence pressure\n - Angle-penalized distance for diversity maintenance\n - External archive for elitism\n - Differential evolution variation operator for exploration\n \"\"\"\n\n def __init__(\n self,\n pop_size: int,\n n_obj: int,\n n_var: int,\n bounds: Tuple[float, float],\n cx_eta: float = 20.0,\n mut_eta: float = 20.0,\n mut_prob: Optional[float] = None,\n ):\n self.pop_size = pop_size\n self.n_obj = n_obj\n self.n_var = n_var\n self.bounds = bounds\n self.cx_eta = cx_eta\n self.mut_eta = mut_eta\n self.mut_prob = mut_prob if mut_prob is not None else 1.0 / n_var\n \n # Parameters for adaptive reference vectors\n self.alpha = 2.0 # Penalty parameter for angle-penalized distance\n self.fr = 0.1 # Frequency of reference vector adaptation\n self.p_ref = 12 # Divisions for reference point generation (for n_obj=3)\n \n # Generate reference vectors\n self.ref_vectors = self._generate_reference_vectors()\n self.initial_ref_vectors = self.ref_vectors.copy()\n \n # Ideal point and nadir point for normalization\n self.z_star = None # ideal (min) point\n self.z_nad = None # nadir (max) point\n \n # External archive for elitism\n self.archive = []\n self.archive_size = pop_size // 2\n \n # Differential evolution parameters\n self.F = 0.5 # scaling factor\n self.CR = 0.9 # crossover rate\n \n # Track generation for adaptation\n self.current_gen = 0\n \n def _generate_reference_vectors(self):\n \"\"\"Generate uniformly distributed reference vectors.\"\"\"\n if self.n_obj == 2:\n # For 2 objectives, generate evenly spaced weight vectors\n p = self.pop_size - 1\n weights = np.array([(i/p, 1.0 - i/p) for i in range(p + 1)])\n else:\n # For 3+ objectives, use DEAP's uniform reference points\n ref_points = tools.uniform_reference_points(self.n_obj, p=self.p_ref)\n weights = np.array(ref_points)\n \n # Normalize to unit length\n norms = np.linalg.norm(weights, axis=1, keepdims=True)\n norms[norms < 1e-12] = 1e-12\n return weights / norms\n \n def _update_ideal_nadir(self, population):\n \"\"\"Update ideal and nadir points from population.\"\"\"\n if not population:\n return\n \n fitness_values = np.array([ind.fitness.values for ind in population if ind.fitness.valid])\n if len(fitness_values) == 0:\n return\n \n if self.z_star is None:\n self.z_star = np.min(fitness_values, axis=0)\n self.z_nad = np.max(fitness_values, axis=0)\n else:\n self.z_star = np.minimum(self.z_star, np.min(fitness_values, axis=0))\n self.z_nad = np.maximum(self.z_nad, np.max(fitness_values, axis=0))\n \n def _normalize_fitness(self, fitness_values):\n \"\"\"Normalize fitness values using ideal and nadir points.\"\"\"\n if self.z_star is None or self.z_nad is None:\n return fitness_values\n \n # Avoid division by zero\n scale = self.z_nad - self.z_star\n scale[scale < 1e-12] = 1.0\n \n return (fitness_values - self.z_star) / scale\n \n def _angle_penalized_distance(self, fitness_values):\n \"\"\"Compute angle-penalized distance for each individual.\"\"\"\n F = np.array(fitness_values)\n n = len(F)\n n_ref = len(self.ref_vectors)\n \n if n == 0:\n return np.array([]), np.array([])\n \n # Normalize fitness values\n F_norm = self._normalize_fitness(F)\n \n # Compute angles between each individual and each reference vector\n F_norms = np.linalg.norm(F_norm, axis=1, keepdims=True)\n F_norms[F_norms < 1e-12] = 1e-12\n F_unit = F_norm / F_norms\n \n # Cosine similarity\n cos_angles = F_unit @ self.ref_vectors.T # (n, n_ref)\n cos_angles = np.clip(cos_angles, -1.0, 1.0)\n angles = np.arccos(cos_angles) # (n, n_ref)\n \n # Associate each individual with closest reference vector\n associations = np.argmin(angles, axis=1) # (n,)\n min_angles = angles[np.arange(n), associations] # (n,)\n \n # Compute convergence (distance along reference vector)\n convergence = F_norms.flatten()\n \n # Angle penalty that increases over generations\n max_gen = 400 # approximate max generations\n gamma = self.alpha * (self.current_gen / max(max_gen, 1)) ** 2\n \n # APD = convergence * (1 + gamma * angle)\n apd = convergence * (1.0 + gamma * min_angles)\n \n return apd, associations\n \n def _differential_evolution_variation(self, parents):\n \"\"\"Apply differential evolution mutation and crossover.\"\"\"\n offspring = []\n lo, hi = self.bounds\n \n for i in range(len(parents)):\n # Select three distinct parents\n idxs = random.sample(range(len(parents)), 3)\n a, b, c = parents[idxs[0]], parents[idxs[1]], parents[idxs[2]]\n \n # Create trial vector\n trial = deepcopy(a)\n for j in range(self.n_var):\n if random.random() < self.CR:\n # DE/rand/1 mutation\n mutated = a[j] + self.F * (b[j] - c[j])\n # Ensure bounds\n trial[j] = max(lo, min(hi, mutated))\n \n # Polynomial mutation with lower probability\n if random.random() < 0.5:\n tools.mutPolynomialBounded(\n trial, eta=self.mut_eta, low=lo, up=hi, indpb=self.mut_prob\n )\n \n del trial.fitness.values\n offspring.append(trial)\n \n return offspring\n \n def select(self, population: list, k: int) -> list:\n \"\"\"Hybrid selection: tournament based on dominance rank and APD.\n \n Uses non-dominated sorting to assign ranks, then selects individuals\n with better rank and diversity (APD).\n \"\"\"\n # Update ideal/nadir points\n self._update_ideal_nadir(population)\n \n # Non-dominated sorting\n fronts = tools.sortNondominated(population, len(population), first_front_only=False)\n \n # Assign ranks\n for rank, front in enumerate(fronts):\n for ind in front:\n ind._rank = rank\n \n # Compute APD for diversity\n fitness_values = [ind.fitness.values for ind in population if ind.fitness.valid]\n if len(fitness_values) > 0:\n apd, _ = self._angle_penalized_distance(fitness_values)\n for i, ind in enumerate(population):\n if ind.fitness.valid:\n ind._apd = apd[i]\n else:\n ind._apd = float('inf')\n else:\n for ind in population:\n ind._apd = float('inf')\n \n # Binary tournament selection\n selected = []\n for _ in range(k):\n i1, i2 = random.sample(range(len(population)), 2)\n a, b = population[i1], population[i2]\n \n # Compare by rank first, then by APD (lower APD is better)\n if a._rank < b._rank:\n selected.append(deepcopy(a))\n elif b._rank < a._rank:\n selected.append(deepcopy(b))\n else:\n if a._apd < b._apd:\n selected.append(deepcopy(a))\n else:\n selected.append(deepcopy(b))\n \n return selected\n \n def vary(self, parents: list) -> list:\n \"\"\"Hybrid variation: SBX crossover + differential evolution.\n \n Uses SBX crossover for exploitation and DE mutation for exploration.\n \"\"\"\n offspring = []\n lo, hi = self.bounds\n \n # Use DE for half the offspring\n n_de = len(parents) // 2\n if n_de > 0:\n de_offspring = self._differential_evolution_variation(parents[:n_de])\n offspring.extend(de_offspring)\n \n # Use SBX + polynomial mutation for the rest\n sbx_parents = parents[n_de:] if n_de > 0 else parents\n sbx_offspring = [deepcopy(ind) for ind in sbx_parents]\n \n for i in range(0, len(sbx_offspring) - 1, 2):\n if random.random() < 0.9: # SBX crossover probability\n tools.cxSimulatedBinaryBounded(\n sbx_offspring[i], sbx_offspring[i + 1],\n eta=self.cx_eta, low=lo, up=hi,\n )\n del sbx_offspring[i].fitness.values\n del sbx_offspring[i + 1].fitness.values\n \n for ind in sbx_offspring:\n if random.random() < 1.0: # Mutation probability\n tools.mutPolynomialBounded(\n ind, eta=self.mut_eta, low=lo, up=hi, indpb=self.mut_prob,\n )\n del ind.fitness.values\n \n offspring.extend(sbx_offspring)\n return offspring\n \n def survive(self, population: list, offspring: list) -> list:\n \"\"\"Hybrid survival: dominance sorting + reference vector assignment.\n \n 1. Combine population and offspring\n 2. Perform non-dominated sorting\n 3. Fill next generation with individuals from best fronts\n 4. For the last front, use reference vector assignment and APD\n to mainta\n... (truncated)"
7}OK: Replaced lines 297..441 with 362 line(s) in deap/custom_moea.py. Editable range: 297–658. [Current file: deap/custom_moea.py | editable: 297–658 | total: 807 lines] 294| # ================================================================ 295| 296| 297 class CustomMOEA: 298 """Hybrid Adaptive Reference Vector Evolutionary Algorithm with Dominance Sorting (HARVEA-DS). 299 300 Combines decomposition-based diversity (reference vectors) with dominance-based 301 convergence (non-dominated sorting). Features: 302 - Adaptive reference vectors that adjust to Pareto front geometry 303 - Non-dominated sorting for convergence pressure 304 - Angle-penalized distance for diversity maintenance 305 - External archive for elitism 306 - Differential evolution variation operator for exploration 307 """ 308 309 def __init__( 310 self, 311 pop_size: int, 312 n