Agent Conversation: gpt-5.4-pro
Model: gpt-5.4-pro
Total steps: 5
Messages: 14
UserStep 0
Task: ts-anomaly-detection
Time Series Anomaly Detection: Custom Model Design
Objective
Design and implement a custom deep learning model for unsupervised time series anomaly detection via reconstruction. Your code goes in the Model class in models/Custom.py. Three reference implementations (DLinear, TimesNet, PatchTST) are provided as read-only.
Evaluation
Trained and evaluated on three anomaly detection datasets:
- PSM (25 variables, server machine dataset)
- MSL (55 variables, Mars Science Laboratory)
- SMAP (25 variables, Soil Moisture Active Passive satellite)
All use seq_len=100, anomaly_ratio=1. Metric: F-score (higher is better).
Time-Series-Library/models/Custom.py [EDITABLE — entire file only]
1: import torch
2: import torch.nn as nn
3:
4:
5: class Model(nn.Module):
6: """
7: Custom model for time series anomaly detection.
8:
9: Forward signature: forward(x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None)
10: - x_enc: [batch, seq_len, enc_in] — input time series
11: - x_mark_enc: not used for anomaly detection (None)
12: - x_dec: not used for anomaly detection (None)
13: - x_mark_dec: not used for anomaly detection (None)
14:
15: Must return: [batch, seq_len, c_out] — reconstructed sequence
16: The framework computes MSE between input and output for anomaly scoring.
17: """
18:
19: def __init__(self, configs):
20: super(Model, self).__init__()
21: self.task_name = configs.task_name
22: self.seq_len = configs.seq_len
23: self.pred_len = configs.seq_len # For anomaly detection, pred_len = seq_len
24: self.enc_in = configs.enc_in
25: self.c_out = configs.c_out
26: # TODO: Define your model architecture here
27:
28: def anomaly_detection(self, x_enc):
29: """
30: Anomaly detection: reconstruct the input sequence.
31: Input: x_enc [batch, seq_len, enc_in]
32: Output: [batch, seq_len, c_out]
33: """
34: # TODO: Implement your reconstruction logic
35: return x_enc # Placeholder: identity reconstruction
36:
37: def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):
38: if self.task_name == 'anomaly_detection':
39: dec_out = self.anomaly_detection(x_enc)
40: return dec_out
41: return None
Time-Series-Library/models/DLinear.py [READ-ONLY — do not edit]
1: import torch
2: import torch.nn as nn
3: import torch.nn.functional as F
4: from layers.Autoformer_EncDec import series_decomp
5:
6:
7: class Model(nn.Module):
8: """
9: Paper link: https://arxiv.org/pdf/2205.13504.pdf
10: """
11:
12: def __init__(self, configs, iAssistantStep 1edit
Tool Input
1{
2 "op": "replace",
3 "filename": "Time-Series-Library/models/Custom.py",
4 "content": "import torch\nimport torch.nn as nn\nimport torch.nn.functional as F\n\nfrom layers.Embed import PatchEmbedding\nfrom layers.SelfAttention_Family import AttentionLayer, FullAttention\nfrom layers.Transformer_EncDec import Encoder, EncoderLayer\n\n\nclass Transpose(nn.Module):\n def __init__(self, *dims, contiguous=False):\n super().__init__()\n self.dims = dims\n self.contiguous = contiguous\n\n def forward(self, x):\n x = x.transpose(*self.dims)\n return x.contiguous() if self.contiguous else x\n\n\nclass FlattenHead(nn.Module):\n def __init__(self, n_vars, nf, target_window, head_dropout=0.0):\n super().__init__()\n self.n_vars = n_vars\n self.flatten = nn.Flatten(start_dim=-2)\n self.linear = nn.Linear(nf, target_window)\n self.dropout = nn.Dropout(head_dropout)\n\n def forward(self, x):\n x = self.flatten(x)\n x = self.linear(x)\n x = self.dropout(x)\n return x\n\n\nclass MovingAverage(nn.Module):\n def __init__(self, kernel_size):\n super().__init__()\n self.kernel_size = kernel_size\n self.pad = (kernel_size - 1) // 2\n\n def forward(self, x):\n # x: [B, L, C]\n x = x.transpose(1, 2)\n x = F.pad(x, (self.pad, self.pad), mode='replicate')\n x = F.avg_pool1d(x, kernel_size=self.kernel_size, stride=1)\n return x.transpose(1, 2)\n\n\nclass CrossChannelBottleneck(nn.Module):\n \"\"\"\n Low-rank cross-variable projector. It starts from zero contribution so the\n model first behaves like a pure temporal reconstructor, then learns to use\n cross-channel structure only when helpful.\n \"\"\"\n\n def __init__(self, channels, rank):\n super().__init__()\n self.norm = nn.LayerNorm(channels)\n self.compress = nn.Linear(channels, rank, bias=False)\n self.expand = nn.Linear(rank, channels, bias=False)\n self.act = nn.GELU()\n nn.init.zeros_(self.expand.weight)\n\n def forward(self, x):\n return self.expand(self.act(self.compress(self.norm(x))))\n\n\nclass ResidualPatchDecoder(nn.Module):\n \"\"\"\n Patch-based decoder applied only to the residual stream. This keeps the\n trend path simple while giving the anomaly-sensitive residual path a strong\n contextual model.\n \"\"\"\n\n def __init__(self, seq_len, enc_in, d_model, n_heads, e_layers, d_ff, dropout, factor, activation):\n super().__init__()\n patch_len = 16\n stride = 8\n padding = stride\n\n self.patch_embedding = PatchEmbedding(d_model, patch_len, stride, padding, dropout)\n self.encoder = Encoder(\n [\n EncoderLayer(\n AttentionLayer(\n FullAttention(False, factor, attention_dropout=dropout, output_attention=False),\n d_model,\n n_heads,\n ),\n d_model,\n d_ff,\n dropout=dropout,\n activation=activation,\n )\n for _ in range(e_layers)\n ],\n norm_layer=nn.Sequential(\n Transpose(1, 2),\n nn.BatchNorm1d(d_model),\n Transpose(1, 2),\n ),\n )\n\n self.head_nf = d_model * int((seq_len - patch_len) / stride + 2)\n self.head = FlattenHead(enc_in, self.head_nf, seq_len, head_dropout=dropout)\n self.out_refine = nn.Linear(seq_len, seq_len)\n self._init_identity(self.out_refine)\n\n @staticmethod\n def _init_identity(layer):\n with torch.no_grad():\n nn.init.eye_(layer.weight)\n if layer.bias is not None:\n layer.bias.zero_()\n\n def forward(self, x):\n # x: [B, L, C]\n x = x.permute(0, 2, 1)\n enc_out, n_vars = self.patch_embedding(x)\n enc_out, _ = self.encoder(enc_out)\n enc_out = torch.reshape(enc_out, (-1, n_vars, enc_out.shape[-2], enc_out.shape[-1]))\n enc_out = enc_out.permute(0, 1, 3, 2)\n out = self.head(enc_out).permute(0, 2, 1)\n out = self.out_refine(out.transpose(1, 2)).transpose(1, 2)\n return out\n\n\nclass Model(nn.Module):\n \"\"\"\n Context-Suppressed Residual Patch Autoencoder (CSR-PAE).\n\n Core idea:\n 1) Split each window into a multi-scale trend context and a residual.\n 2) Compute a surprise gate from the residual magnitude / derivative.\n High-surprise points are softly suppressed before the encoder sees them.\n 3) Reconstruct only the residual with a patch Transformer, while trend is\n reconstructed by a light temporal linear map.\n 4) Apply the same surprise gate again when fusing residual reconstruction,\n so anomalous residuals are not copied back through the decoder.\n\n This directly targets a common failure mode of reconstruction models:\n learning an identity shortcut that reconstructs anomalies too well.\n \"\"\"\n\n def __init__(self, configs):\n super(Model, self).__init__()\n self.task_name = configs.task_name\n self.seq_len = configs.seq_len\n self.pred_len = configs.seq_len\n self.enc_in = configs.enc_in\n self.c_out = configs.c_out\n\n d_model = getattr(configs, 'd_model', 128)\n n_heads = getattr(configs, 'n_heads', 4)\n e_layers = getattr(configs, 'e_layers', 3)\n d_ff = getattr(configs, 'd_ff', 4 * d_model)\n dropout = getattr(configs, 'dropout', 0.1)\n factor = getattr(configs, 'factor', 3)\n activation = getattr(configs, 'activation', 'gelu')\n\n kernels = self._build_kernels(self.seq_len)\n self.context_filters = nn.ModuleList([MovingAverage(k) for k in kernels])\n\n cross_rank = max(4, min(16, max(1, self.enc_in // 2)))\n self.cross_channel = CrossChannelBottleneck(self.enc_in, cross_rank)\n self.residual_decoder = ResidualPatchDecoder(\n self.seq_len,\n self.enc_in,\n d_model,\n n_heads,\n e_layers,\n d_ff,\n dropout,\n factor,\n activation,\n )\n\n self.trend_refine = nn.Linear(self.seq_len, self.seq_len)\n self._init_identity(self.trend_refine)\n\n # Hard-coded algorithmic hyperparameters.\n self.gate_center = 1.60\n self.gate_temperature = 2.25\n self.detail_floor = 0.10\n self.residual_mask_ratio = 0.20\n\n @staticmethod\n def _init_identity(layer):\n with torch.no_grad():\n nn.init.eye_(layer.weight)\n if layer.bias is not None:\n layer.bias.zero_()\n\n @staticmethod\n def _build_kernels(seq_len):\n kernels = []\n max_kernel = seq_len if seq_len % 2 == 1 else seq_len - 1\n for k in [5, 11, 21, 41]:\n k = min(k, max_kernel)\n if k % 2 == 0:\n k -= 1\n if k >= 3 and k not in kernels:\n kernels.append(k)\n return kernels if len(kernels) > 0 else [3]\n\n def _normalize(self, x):\n means = x.mean(1, keepdim=True).detach()\n x = x - means\n stdev = torch.sqrt(torch.var(x, dim=1, keepdim=True, unbiased=False) + 1e-5)\n x = x / stdev\n return x, means, stdev\n\n def _multi_scale_context(self, x):\n context = 0.0\n for filt in self.context_filters:\n context = context + filt(x)\n return context / len(self.context_filters)\n\n def _surprise_gate(self, x, context):\n residual = x - context\n\n res_scale = torch.sqrt(residual.pow(2).mean(dim=1, keepdim=True) + 1e-4)\n res_score = residual.abs() / res_scale\n\n delta = torch.zeros_like(x)\n delta[:, 1:, :] = x[:, 1:, :] - x[:, :-1, :]\n delta_scale = torch.sqrt(delta.pow(2).mean(dim=1, keepdim=True) + 1e-4)\n delta_score = delta.abs() / delta_scale\n\n score = 0.75 * res_score + 0.25 * delta_score\n gate = torch.sigmoid(self.gate_temperature * (self.gate_center - score))\n return residual, gate\n\n def anomaly_detection(self, x_enc):\n x, means, stdev = self._normalize(x_enc)\n\n context = self._multi_scale_context(x)\n residual, gate = self._surprise_gate(x, context)\n detail_gate = self.detail_floor + (1.0 - self.detail_floor) * gate\n\n residual_input = detail_gate * residual\n if self.training and self.residual_mask_ratio > 0:\n keep_mask = (\n torch.rand(residual_input.shape[0], residual_input.shape[1], 1, device=residual_input.device)\n > self.residual_mask_ratio\n ).type_as(residual_input)\n residual_input = residual_input * keep_mask\n\n residual_input = residual_input + self.cross_channel(residual_input)\n\n trend_recon = self.trend_refine(context.transpose(1, 2)).transpose(1, 2)\n residual_recon = self.residual_decoder(residual_input)\n\n dec_out = trend_recon + detail_gate * residual_recon\n dec_out = dec_out * stdev + means\n return dec_out\n\n def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):\n if self.task_name == 'anomaly_detection':\n return self.anomaly_detection(x_enc)\n return None\n",
5 "after_line": 41,
6 "start_line": 1,
7 "end_line": 41
8}Tool ResultStep 1
OK: Replaced lines 1..41 with 262 line(s) in Time-Series-Library/models/Custom.py. Editable range: entire file.
[Current file: Time-Series-Library/models/Custom.py | editable: 1–262 | total: 262 lines]
1 import torch
2 import torch.nn as nn
3 import torch.nn.functional as F
4
5 from layers.Embed import PatchEmbedding
6 from layers.SelfAttention_Family import AttentionLayer, FullAttention
7 from layers.Transformer_EncDec import Encoder, EncoderLayer
8
9
10 class Transpose(nn.Module):
11 def __init__(self, *dims, contiguous=False):
12 super().__init__()
13 self.dims = dims
14 self.contiguous = contiguous
15
16 def forward(self, x):
17 x = x.transpose(*self.dims)
18 return x.contiguous() if self.contiguous else x
19
20
21 class FlattenHead(nn.Module):
22 def __init__(self, n_vars, nf, target_window, head_dropout=0
AssistantStep 2test
Tool ResultStep 2
[Test #1] (2 tests remaining; call submit(n=1) to submit this result) ## Seed 42 ### PSM (scripts/psm.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.0333182 vali_loss=0.0242580 test_loss=0.0313696 TRAIN_METRICS epoch=2 train_loss=0.0292517 vali_loss=0.0226084 test_loss=0.0279147 TRAIN_METRICS epoch=3 train_loss=0.0270065 vali_loss=0.0220191 test_loss=0.0263242 Test results (PSM): F-score: 0.9675, Precision: 0.9854, Recall: 0.9502, Accuracy: 0.9823 ### MSL (scripts/msl.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.4434422 vali_loss=0.2836906 test_loss=0.8357677 TRAIN_METRICS epoch=2 train_loss=0.3342082 vali_loss=0.1368401 test_loss=0.4619433 TRAIN_METRICS epoch=3 train_loss=0.2344106 vali_loss=0.0951600 test_loss=0.3362262 Test results (MSL): F-score: 0.7782, Precision: 0.8795, Recall: 0.6979, Accuracy: 0.9581 ## Seed 123 ### PSM (scripts/psm.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.0334320 vali_l
AssistantStep 3edit
Tool Input
1{
2 "op": "replace",
3 "filename": "Time-Series-Library/models/Custom.py",
4 "content": "import torch\nimport torch.nn as nn\nimport torch.nn.functional as F\n\nfrom layers.Embed import PatchEmbedding\nfrom layers.SelfAttention_Family import AttentionLayer, FullAttention\nfrom layers.Transformer_EncDec import Encoder, EncoderLayer\n\n\nclass Transpose(nn.Module):\n def __init__(self, *dims, contiguous=False):\n super().__init__()\n self.dims = dims\n self.contiguous = contiguous\n\n def forward(self, x):\n x = x.transpose(*self.dims)\n return x.contiguous() if self.contiguous else x\n\n\nclass FlattenHead(nn.Module):\n def __init__(self, n_vars, nf, target_window, head_dropout=0.0):\n super().__init__()\n self.n_vars = n_vars\n self.flatten = nn.Flatten(start_dim=-2)\n self.linear = nn.Linear(nf, target_window)\n self.dropout = nn.Dropout(head_dropout)\n\n def forward(self, x):\n x = self.flatten(x)\n x = self.linear(x)\n x = self.dropout(x)\n return x\n\n\nclass MovingAverage(nn.Module):\n def __init__(self, kernel_size):\n super().__init__()\n self.kernel_size = kernel_size\n self.pad = (kernel_size - 1) // 2\n\n def forward(self, x):\n # x: [B, L, C]\n x = x.transpose(1, 2)\n x = F.pad(x, (self.pad, self.pad), mode='replicate')\n x = F.avg_pool1d(x, kernel_size=self.kernel_size, stride=1)\n return x.transpose(1, 2)\n\n\nclass CrossChannelBottleneck(nn.Module):\n \"\"\"\n Low-rank cross-variable projector. It starts from zero contribution so the\n model first behaves like a pure temporal reconstructor, then learns to use\n cross-channel structure only when helpful.\n \"\"\"\n\n def __init__(self, channels, rank):\n super().__init__()\n self.norm = nn.LayerNorm(channels)\n self.compress = nn.Linear(channels, rank, bias=False)\n self.expand = nn.Linear(rank, channels, bias=False)\n self.act = nn.GELU()\n nn.init.zeros_(self.expand.weight)\n\n def forward(self, x):\n return self.expand(self.act(self.compress(self.norm(x))))\n\n\nclass ResidualPatchDecoder(nn.Module):\n \"\"\"\n Patch-based decoder applied only to the residual stream. This keeps the\n trend path simple while giving the anomaly-sensitive residual path a strong\n contextual model.\n \"\"\"\n\n def __init__(self, seq_len, enc_in, d_model, n_heads, e_layers, d_ff, dropout, factor, activation):\n super().__init__()\n patch_len = 16\n stride = 8\n padding = stride\n\n self.patch_embedding = PatchEmbedding(d_model, patch_len, stride, padding, dropout)\n self.encoder = Encoder(\n [\n EncoderLayer(\n AttentionLayer(\n FullAttention(False, factor, attention_dropout=dropout, output_attention=False),\n d_model,\n n_heads,\n ),\n d_model,\n d_ff,\n dropout=dropout,\n activation=activation,\n )\n for _ in range(e_layers)\n ],\n norm_layer=nn.Sequential(\n Transpose(1, 2),\n nn.BatchNorm1d(d_model),\n Transpose(1, 2),\n ),\n )\n\n self.head_nf = d_model * int((seq_len - patch_len) / stride + 2)\n self.head = FlattenHead(enc_in, self.head_nf, seq_len, head_dropout=dropout)\n self.out_refine = nn.Linear(seq_len, seq_len)\n self._init_identity(self.out_refine)\n\n @staticmethod\n def _init_identity(layer):\n with torch.no_grad():\n nn.init.eye_(layer.weight)\n if layer.bias is not None:\n layer.bias.zero_()\n\n def forward(self, x):\n # x: [B, L, C]\n x = x.permute(0, 2, 1)\n enc_out, n_vars = self.patch_embedding(x)\n enc_out, _ = self.encoder(enc_out)\n enc_out = torch.reshape(enc_out, (-1, n_vars, enc_out.shape[-2], enc_out.shape[-1]))\n enc_out = enc_out.permute(0, 1, 3, 2)\n out = self.head(enc_out).permute(0, 2, 1)\n out = self.out_refine(out.transpose(1, 2)).transpose(1, 2)\n return out\n\n\nclass Model(nn.Module):\n \"\"\"\n Consensus-Suppressed Residual Patch Autoencoder (CSR-PAE).\n\n Main contribution:\n - Build several temporal contexts at different scales.\n - Estimate a scale-consensus score: when scales disagree, fall back to the\n smooth long-scale context instead of trusting short-term detail.\n - Estimate both point-wise and time-shared surprise gates. The time-shared\n gate couples channels so a suspicious event in one variable suppresses\n detail reconstruction for the whole timestamp, which matches benchmark\n labels that are timestamp-level rather than channel-level.\n - Reconstruct only the consensus-filtered residual with a patch encoder.\n\n This reduces the tendency of reconstruction models to copy collective or\n contextual anomalies back into the output.\n \"\"\"\n\n def __init__(self, configs):\n super(Model, self).__init__()\n self.task_name = configs.task_name\n self.seq_len = configs.seq_len\n self.pred_len = configs.seq_len\n self.enc_in = configs.enc_in\n self.c_out = configs.c_out\n\n d_model = getattr(configs, 'd_model', 128)\n n_heads = getattr(configs, 'n_heads', 4)\n e_layers = getattr(configs, 'e_layers', 3)\n d_ff = getattr(configs, 'd_ff', 4 * d_model)\n dropout = getattr(configs, 'dropout', 0.1)\n factor = getattr(configs, 'factor', 3)\n activation = getattr(configs, 'activation', 'gelu')\n\n kernels = self._build_kernels(self.seq_len)\n self.context_filters = nn.ModuleList([MovingAverage(k) for k in kernels])\n\n cross_rank = max(4, min(16, max(1, self.enc_in // 2)))\n self.cross_channel = CrossChannelBottleneck(self.enc_in, cross_rank)\n self.residual_decoder = ResidualPatchDecoder(\n self.seq_len,\n self.enc_in,\n d_model,\n n_heads,\n e_layers,\n d_ff,\n dropout,\n factor,\n activation,\n )\n\n self.trend_refine = nn.Linear(self.seq_len, self.seq_len)\n self._init_identity(self.trend_refine)\n\n # Hard-coded algorithmic hyperparameters.\n self.gate_center = 1.60\n self.gate_temperature = 2.25\n self.time_gate_center = 1.45\n self.time_gate_temperature = 2.00\n self.scale_gate_center = 0.90\n self.scale_gate_temperature = 1.75\n self.detail_floor = 0.10\n self.residual_mask_ratio = 0.20\n\n @staticmethod\n def _init_identity(layer):\n with torch.no_grad():\n nn.init.eye_(layer.weight)\n if layer.bias is not None:\n layer.bias.zero_()\n\n @staticmethod\n def _build_kernels(seq_len):\n kernels = []\n max_kernel = seq_len if seq_len % 2 == 1 else seq_len - 1\n for k in [5, 11, 21, 41]:\n k = min(k, max_kernel)\n if k % 2 == 0:\n k -= 1\n if k >= 3 and k not in kernels:\n kernels.append(k)\n return kernels if len(kernels) > 0 else [3]\n\n def _normalize(self, x):\n means = x.mean(1, keepdim=True).detach()\n x = x - means\n stdev = torch.sqrt(torch.var(x, dim=1, keepdim=True, unbiased=False) + 1e-5)\n x = x / stdev\n return x, means, stdev\n\n def _multi_scale_context(self, x):\n contexts = torch.stack([filt(x) for filt in self.context_filters], dim=0)\n context_mean = contexts.mean(dim=0)\n\n sorted_contexts, _ = torch.sort(contexts, dim=0)\n mid = sorted_contexts.shape[0] // 2\n if sorted_contexts.shape[0] % 2 == 1:\n context_median = sorted_contexts[mid]\n else:\n context_median = 0.5 * (sorted_contexts[mid - 1] + sorted_contexts[mid])\n\n robust_context = 0.5 * (context_mean + context_median)\n dispersion = torch.sqrt(((contexts - robust_context.unsqueeze(0)) ** 2).mean(dim=0) + 1e-4)\n dispersion_scale = torch.sqrt(dispersion.pow(2).mean(dim=1, keepdim=True) + 1e-4)\n dispersion_score = dispersion / dispersion_scale\n\n consensus = torch.sigmoid(\n self.scale_gate_temperature\n * (self.scale_gate_center - dispersion_score.mean(dim=-1, keepdim=True))\n )\n\n long_context = contexts[-1]\n context = consensus * robust_context + (1.0 - consensus) * long_context\n return context, dispersion_score, consensus, long_context\n\n def _surprise_gate(self, x, context, dispersion_score):\n residual = x - context\n\n res_scale = torch.sqrt(residual.pow(2).mean(dim=1, keepdim=True) + 1e-4)\n res_score = residual.abs() / res_scale\n\n delta = torch.zeros_like(x)\n delta[:, 1:, :] = x[:, 1:, :] - x[:, :-1, :]\n delta_scale = torch.sqrt(delta.pow(2).mean(dim=1, keepdim=True) + 1e-4)\n delta_score = delta.abs() / delta_scale\n\n score = 0.55 * res_score + 0.15 * delta_score + 0.30 * dispersion_score\n point_gate = torch.sigmoid(self.gate_temperature * (self.gate_center - score))\n\n time_score = 0.5 * score.mean(dim=-1, keepdim=True) + 0.5 * score.max(dim=-1, keepdim=True).values\n time_gate = torch.sigmoid(self.time_gate_temperature * (self.time_gate_center - time_score))\n return residual, point_gate, time_gate\n\n def anomaly_detection(self, x_enc):\n x, means, stdev = self._normalize(x_enc)\n\n context, dispersion_score, consensus, long_context = self._multi_scale_context(x)\n residual, point_gate, time_gate = self._surprise_gate(x, context, dispersion_score)\n detail_gate = self.detail_floor + (1.0 - self.detail_floor) * point_gate * time_gate * consensus\n\n residual_input = detail_gate * residual\n if self.training and self.residual_mask_ratio > 0:\n keep_mask = (\n torch.rand(residual_input.shape[0], re\n... (truncated)",
5 "after_line": 262,
6 "start_line": 1,
7 "end_line": 262
8}Tool ResultStep 3
OK: Replaced lines 1..262 with 290 line(s) in Time-Series-Library/models/Custom.py. Editable range: entire file.
[Current file: Time-Series-Library/models/Custom.py | editable: 1–290 | total: 290 lines]
1 import torch
2 import torch.nn as nn
3 import torch.nn.functional as F
4
5 from layers.Embed import PatchEmbedding
6 from layers.SelfAttention_Family import AttentionLayer, FullAttention
7 from layers.Transformer_EncDec import Encoder, EncoderLayer
8
9
10 class Transpose(nn.Module):
11 def __init__(self, *dims, contiguous=False):
12 super().__init__()
13 self.dims = dims
14 self.contiguous = contiguous
15
16 def forward(self, x):
17 x = x.transpose(*self.dims)
18 return x.contiguous() if self.contiguous else x
19
20
21 class FlattenHead(nn.Module):
22 def __init__(self, n_vars, nf, target_window, head_dropout=
AssistantStep 4test
Tool ResultStep 4
[Test #2] (1 test remaining; call submit(n=2) to submit this result)
### PSM (scripts/psm.sh)
Training metrics (last epochs):
TRAIN_METRICS epoch=1 train_loss=0.0416654 vali_loss=0.0315534 test_loss=0.0332721
TRAIN_METRICS epoch=2 train_loss=0.0356363 vali_loss=0.0270190 test_loss=0.0233252
TRAIN_METRICS epoch=3 train_loss=0.0325716 vali_loss=0.0261002 test_loss=0.0213421
Test results (PSM):
F-score: 0.9685, Precision: 0.9890, Recall: 0.9488, Accuracy: 0.9829
### MSL (scripts/msl.sh)
Training metrics (last epochs):
TRAIN_METRICS epoch=1 train_loss=0.4009801 vali_loss=0.0998189 test_loss=0.3081117
TRAIN_METRICS epoch=2 train_loss=0.2187662 vali_loss=0.0557677 test_loss=0.2029034
TRAIN_METRICS epoch=3 train_loss=0.1831391 vali_loss=0.0422521 test_loss=0.1599643
Test results (MSL):
F-score: 0.7549, Precision: 0.8732, Recall: 0.6649, Accuracy: 0.9545
[Leaderboard] Results saved: {'f_score_PSM': 0.9685, 'precision_PSM': 0.989, 'recall_PSM': 0.9488, 'elapsed_PSM': 1024, 'f_score_MSL':
AssistantStep 5submit
Tool Input
1{
2 "n": 1
3}Tool ResultStep 5
[submit] Submitting result from test #1 as final. ## Seed 42 ### PSM (scripts/psm.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.0333182 vali_loss=0.0242580 test_loss=0.0313696 TRAIN_METRICS epoch=2 train_loss=0.0292517 vali_loss=0.0226084 test_loss=0.0279147 TRAIN_METRICS epoch=3 train_loss=0.0270065 vali_loss=0.0220191 test_loss=0.0263242 Test results (PSM): F-score: 0.9675, Precision: 0.9854, Recall: 0.9502, Accuracy: 0.9823 ### MSL (scripts/msl.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.4434422 vali_loss=0.2836906 test_loss=0.8357677 TRAIN_METRICS epoch=2 train_loss=0.3342082 vali_loss=0.1368401 test_loss=0.4619433 TRAIN_METRICS epoch=3 train_loss=0.2344106 vali_loss=0.0951600 test_loss=0.3362262 Test results (MSL): F-score: 0.7782, Precision: 0.8795, Recall: 0.6979, Accuracy: 0.9581 ### SMAP (scripts/smap.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.6518705 vali_loss=0.1892521 test_loss=0.390146
AssistantStep 1edit
Tool Input
1{
2 "op": "replace",
3 "filename": "Time-Series-Library/models/Custom.py",
4 "content": "import torch\nimport torch.nn as nn\nimport torch.nn.functional as F\n\n\nclass PastConv1d(nn.Module):\n \"\"\"Causal 1D convolution with optional exclusion of the current timestep.\"\"\"\n\n def __init__(self, in_channels, out_channels, kernel_size, dilation=1, groups=1,\n bias=True, exclude_current=False):\n super().__init__()\n self.kernel_size = kernel_size\n self.dilation = dilation\n self.exclude_current = exclude_current\n self.conv = nn.Conv1d(\n in_channels,\n out_channels,\n kernel_size=kernel_size,\n dilation=dilation,\n padding=0,\n groups=groups,\n bias=bias,\n )\n\n def forward(self, x):\n # x: [B, C, L]\n if self.exclude_current:\n shift = self.dilation\n x = F.pad(x, (shift, 0), mode='replicate')[:, :, :-shift]\n pad = (self.kernel_size - 1) * self.dilation\n x = F.pad(x, (pad, 0), mode='replicate')\n return self.conv(x)\n\n\nclass DirectionalConv1d(nn.Module):\n \"\"\"Past-only or future-only convolution.\"\"\"\n\n def __init__(self, in_channels, out_channels, kernel_size, dilation=1, groups=1,\n bias=True, exclude_current=False, direction='past'):\n super().__init__()\n self.direction = direction\n self.past_conv = PastConv1d(\n in_channels=in_channels,\n out_channels=out_channels,\n kernel_size=kernel_size,\n dilation=dilation,\n groups=groups,\n bias=bias,\n exclude_current=exclude_current,\n )\n\n def forward(self, x):\n if self.direction == 'past':\n return self.past_conv(x)\n x = torch.flip(x, dims=[-1])\n x = self.past_conv(x)\n return torch.flip(x, dims=[-1])\n\n\nclass DirectionalBlock(nn.Module):\n \"\"\"Directional context block. Safe residual connections because its inputs are already blind.\"\"\"\n\n def __init__(self, hidden_dim, ff_dim, dilation, dropout, direction):\n super().__init__()\n self.depthwise = DirectionalConv1d(\n hidden_dim,\n hidden_dim,\n kernel_size=3,\n dilation=dilation,\n groups=hidden_dim,\n bias=False,\n exclude_current=False,\n direction=direction,\n )\n self.norm = nn.GroupNorm(1, hidden_dim)\n self.pw1 = nn.Conv1d(hidden_dim, ff_dim, kernel_size=1)\n self.pw2 = nn.Conv1d(ff_dim, hidden_dim, kernel_size=1)\n self.dropout = nn.Dropout(dropout)\n self.se = nn.Sequential(\n nn.AdaptiveAvgPool1d(1),\n nn.Conv1d(hidden_dim, hidden_dim, kernel_size=1),\n nn.Sigmoid(),\n )\n\n def forward(self, x):\n y = self.depthwise(x)\n y = self.norm(y)\n y = F.gelu(y)\n y = self.pw1(y)\n y = F.gelu(y)\n y = self.dropout(y)\n y = self.pw2(y)\n y = y * self.se(y)\n y = self.dropout(y)\n return x + y\n\n\nclass Model(nn.Module):\n \"\"\"\n Consistency-Gated Bidirectional Blind Reconstructor (CGBR).\n\n Key idea:\n 1) Build a multi-scale leave-one-out context baseline, so each point is first predicted\n from neighboring context instead of copied from itself.\n 2) Predict only the residual with past-only and future-only directional branches,\n preventing direct identity leakage.\n 3) Use a consistency gate that suppresses residual reconstruction on context-inconsistent\n points, increasing anomaly contrast at test time.\n \"\"\"\n\n def __init__(self, configs):\n super(Model, self).__init__()\n self.task_name = configs.task_name\n self.seq_len = configs.seq_len\n self.pred_len = configs.seq_len\n self.enc_in = configs.enc_in\n self.c_out = configs.c_out\n\n hidden_dim = getattr(configs, 'd_model', 64)\n ff_dim = getattr(configs, 'd_ff', hidden_dim * 2)\n dropout = getattr(configs, 'dropout', 0.1)\n\n self.context_kernels = [3, 7, 15, 31]\n self.scale_selector = nn.Conv1d(\n len(self.context_kernels) * self.enc_in,\n len(self.context_kernels),\n kernel_size=1,\n )\n\n self.trend_time_mix = nn.Linear(self.seq_len, self.seq_len)\n self.trend_channel_mix = nn.Conv1d(self.enc_in, self.enc_in, kernel_size=1)\n self.trend_to_hidden = nn.Conv1d(self.enc_in, hidden_dim, kernel_size=1)\n\n self.past_in = DirectionalConv1d(\n self.enc_in,\n hidden_dim,\n kernel_size=5,\n dilation=1,\n exclude_current=True,\n direction='past',\n )\n self.future_in = DirectionalConv1d(\n self.enc_in,\n hidden_dim,\n kernel_size=5,\n dilation=1,\n exclude_current=True,\n direction='future',\n )\n\n dilations = [1, 2, 4, 8]\n self.past_blocks = nn.ModuleList([\n DirectionalBlock(hidden_dim, ff_dim, dilation=d, dropout=dropout, direction='past')\n for d in dilations\n ])\n self.future_blocks = nn.ModuleList([\n DirectionalBlock(hidden_dim, ff_dim, dilation=d, dropout=dropout, direction='future')\n for d in dilations\n ])\n\n self.dir_gate = nn.Conv1d(hidden_dim * 2, hidden_dim, kernel_size=1)\n self.fuse = nn.Sequential(\n nn.Conv1d(hidden_dim * 3, hidden_dim, kernel_size=1),\n nn.GELU(),\n nn.GroupNorm(1, hidden_dim),\n nn.Dropout(dropout),\n )\n self.out_proj = nn.Conv1d(hidden_dim, self.c_out, kernel_size=1)\n\n # Hard-coded algorithmic hyperparameters.\n self.mask_seed_prob = 0.04\n self.mask_channel_prob = 0.25\n self.mask_span = 9\n self.gate_beta = 2.0\n self.min_gate = 0.05\n\n self._init_parameters()\n\n def _init_parameters(self):\n nn.init.eye_(self.trend_time_mix.weight)\n nn.init.zeros_(self.trend_time_mix.bias)\n\n with torch.no_grad():\n self.trend_channel_mix.weight.zero_()\n diag = min(self.trend_channel_mix.weight.size(0), self.trend_channel_mix.weight.size(1))\n for i in range(diag):\n self.trend_channel_mix.weight[i, i, 0] = 1.0\n if self.trend_channel_mix.bias is not None:\n self.trend_channel_mix.bias.zero_()\n\n def _leave_one_out_mean(self, x, kernel_size):\n # x: [B, C, L]\n pad = kernel_size // 2\n pooled = F.avg_pool1d(F.pad(x, (pad, pad), mode='replicate'), kernel_size=kernel_size, stride=1)\n return (pooled * kernel_size - x) / float(kernel_size - 1)\n\n def _build_multiscale_context(self, x):\n contexts = [self._leave_one_out_mean(x, k) for k in self.context_kernels]\n context_cat = torch.cat(contexts, dim=1)\n scale_logits = self.scale_selector(context_cat)\n scale_weight = torch.softmax(scale_logits, dim=1)\n\n fused = 0.0\n for i, ctx in enumerate(contexts):\n fused = fused + ctx * scale_weight[:, i:i + 1, :]\n return fused\n\n def _corrupt_residual(self, residual):\n if (not self.training) or self.mask_seed_prob <= 0:\n return residual\n\n bsz, channels, length = residual.shape\n seed = (torch.rand(bsz, 1, length, device=residual.device) < self.mask_seed_prob).float()\n span = F.max_pool1d(seed, kernel_size=self.mask_span, stride=1, padding=self.mask_span // 2)\n span = (span > 0).to(residual.dtype)\n channel_mask = (torch.rand(bsz, channels, 1, device=residual.device) < self.mask_channel_prob).to(residual.dtype)\n mask = span * channel_mask\n return residual * (1.0 - mask)\n\n def anomaly_detection(self, x_enc):\n \"\"\"\n Input: x_enc [B, L, C]\n Output: [B, L, C]\n \"\"\"\n means = x_enc.mean(1, keepdim=True).detach()\n x = x_enc - means\n stdev = torch.sqrt(torch.var(x, dim=1, keepdim=True, unbiased=False) + 1e-5).detach()\n x = x / stdev\n\n x = x.transpose(1, 2) # [B, C, L]\n\n # Blind multi-scale baseline.\n trend = self._build_multiscale_context(x)\n trend = self.trend_time_mix(trend)\n trend = self.trend_channel_mix(trend)\n\n # Residual denoising / imputation with directional blind branches.\n residual = x - trend\n residual_in = self._corrupt_residual(residual)\n\n past = F.gelu(self.past_in(residual_in))\n future = F.gelu(self.future_in(residual_in))\n\n for block in self.past_blocks:\n past = block(past)\n for block in self.future_blocks:\n future = block(future)\n\n direction_gate = torch.sigmoid(self.dir_gate(torch.cat([past, future], dim=1)))\n directional_context = direction_gate * past + (1.0 - direction_gate) * future\n branch_disagreement = torch.abs(past - future)\n trend_hidden = F.gelu(self.trend_to_hidden(trend))\n\n fused = self.fuse(torch.cat([directional_context, branch_disagreement, trend_hidden], dim=1))\n residual_hat = self.out_proj(fused)\n\n # Deterministic anti-copy gate: context-inconsistent points get less residual fitting.\n discrepancy = (x - trend).pow(2).detach()\n trust = self.min_gate + (1.0 - self.min_gate) * torch.exp(-self.gate_beta * discrepancy)\n\n recon = trend + trust * residual_hat\n recon = recon.transpose(1, 2)\n recon = recon * stdev + means\n return recon\n\n def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):\n if self.task_name == 'anomaly_detection':\n return self.anomaly_detection(x_enc)\n return None\n",
5 "after_line": 0,
6 "start_line": 1,
7 "end_line": 41
8}Tool ResultStep 1
OK: Replaced lines 1..41 with 269 line(s) in Time-Series-Library/models/Custom.py. Editable range: entire file.
[Current file: Time-Series-Library/models/Custom.py | editable: 1–269 | total: 269 lines]
1 import torch
2 import torch.nn as nn
3 import torch.nn.functional as F
4
5
6 class PastConv1d(nn.Module):
7 """Causal 1D convolution with optional exclusion of the current timestep."""
8
9 def __init__(self, in_channels, out_channels, kernel_size, dilation=1, groups=1,
10 bias=True, exclude_current=False):
11 super().__init__()
12 self.kernel_size = kernel_size
13 self.dilation = dilation
14 self.exclude_current = exclude_current
15 self.conv = nn.Conv1d(
16 in_channels,
17 out_channels,
18 kernel_size=kernel_size,
19 dilation=dilation,
20 padding=0,
AssistantStep 2test