Agent Conversation: google/gemini-3.1-pro-preview
Model: google/gemini-3.1-pro-preview
Total steps: 8
Messages: 17
UserStep 0
Task: ts-anomaly-detection
Time Series Anomaly Detection: Custom Model Design
Objective
Design and implement a custom deep learning model for unsupervised time series anomaly detection via reconstruction. Your code goes in the Model class in models/Custom.py. Three reference implementations (DLinear, TimesNet, PatchTST) are provided as read-only.
Evaluation
Trained and evaluated on three anomaly detection datasets:
- PSM (25 variables, server machine dataset)
- MSL (55 variables, Mars Science Laboratory)
- SMAP (25 variables, Soil Moisture Active Passive satellite)
All use seq_len=100, anomaly_ratio=1. Metric: F-score (higher is better).
Time-Series-Library/models/Custom.py [EDITABLE — entire file only]
1: import torch
2: import torch.nn as nn
3:
4:
5: class Model(nn.Module):
6: """
7: Custom model for time series anomaly detection.
8:
9: Forward signature: forward(x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None)
10: - x_enc: [batch, seq_len, enc_in] — input time series
11: - x_mark_enc: not used for anomaly detection (None)
12: - x_dec: not used for anomaly detection (None)
13: - x_mark_dec: not used for anomaly detection (None)
14:
15: Must return: [batch, seq_len, c_out] — reconstructed sequence
16: The framework computes MSE between input and output for anomaly scoring.
17: """
18:
19: def __init__(self, configs):
20: super(Model, self).__init__()
21: self.task_name = configs.task_name
22: self.seq_len = configs.seq_len
23: self.pred_len = configs.seq_len # For anomaly detection, pred_len = seq_len
24: self.enc_in = configs.enc_in
25: self.c_out = configs.c_out
26: # TODO: Define your model architecture here
27:
28: def anomaly_detection(self, x_enc):
29: """
30: Anomaly detection: reconstruct the input sequence.
31: Input: x_enc [batch, seq_len, enc_in]
32: Output: [batch, seq_len, c_out]
33: """
34: # TODO: Implement your reconstruction logic
35: return x_enc # Placeholder: identity reconstruction
36:
37: def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):
38: if self.task_name == 'anomaly_detection':
39: dec_out = self.anomaly_detection(x_enc)
40: return dec_out
41: return None
Time-Series-Library/models/DLinear.py [READ-ONLY — do not edit]
1: import torch
2: import torch.nn as nn
3: import torch.nn.functional as F
4: from layers.Autoformer_EncDec import series_decomp
5:
6:
7: class Model(nn.Module):
8: """
9: Paper link: https://arxiv.org/pdf/2205.13504.pdf
10: """
11:
12: def __init__(self, configs, iAssistantStep 1edit
Tool Input
1{
2 "content": "import torch\nimport torch.nn as nn\nimport torch.nn.functional as F\nimport math\n\nclass PositionalEmbedding(nn.Module):\n def __init__(self, d_model, max_len=5000):\n super(PositionalEmbedding, self).__init__()\n pe = torch.zeros(max_len, d_model).float()\n pe.require_grad = False\n position = torch.arange(0, max_len).float().unsqueeze(1)\n div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()\n pe[:, 0::2] = torch.sin(position * div_term)\n pe[:, 1::2] = torch.cos(position * div_term)\n pe = pe.unsqueeze(0)\n self.register_buffer('pe', pe)\n\n def forward(self, x):\n return self.pe[:, :x.size(1)]\n\nclass EncoderLayer(nn.Module):\n def __init__(self, d_model, n_heads, d_ff, dropout):\n super(EncoderLayer, self).__init__()\n self.self_attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)\n self.conv1 = nn.Conv1d(d_model, d_ff, 1)\n self.conv2 = nn.Conv1d(d_ff, d_model, 1)\n self.norm1 = nn.LayerNorm(d_model)\n self.norm2 = nn.LayerNorm(d_model)\n self.dropout = nn.Dropout(dropout)\n self.activation = F.gelu\n\n def forward(self, x):\n new_x, _ = self.self_attn(x, x, x)\n x = x + self.dropout(new_x)\n y = self.norm1(x)\n y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))\n y = self.dropout(self.conv2(y).transpose(-1, 1))\n return self.norm2(x + y)\n\nclass Model(nn.Module):\n def __init__(self, configs):\n super(Model, self).__init__()\n self.task_name = configs.task_name\n self.seq_len = configs.seq_len\n self.pred_len = configs.seq_len\n self.enc_in = configs.enc_in\n self.c_out = configs.c_out\n \n self.d_model = getattr(configs, 'd_model', 128)\n self.n_heads = getattr(configs, 'n_heads', 8)\n self.e_layers = getattr(configs, 'e_layers', 2)\n self.d_ff = getattr(configs, 'd_ff', 256)\n self.dropout = getattr(configs, 'dropout', 0.1)\n \n # Temporal Branch\n self.proj_t = nn.Linear(self.enc_in, self.d_model)\n self.pe_t = PositionalEmbedding(self.d_model, max_len=self.seq_len)\n self.encoder_t = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_t = nn.Linear(self.d_model, self.enc_in)\n \n # Variate Branch\n self.proj_v = nn.Linear(self.seq_len, self.d_model)\n self.pe_v = nn.Parameter(torch.randn(1, self.enc_in, self.d_model) * 0.02)\n self.encoder_v = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_v = nn.Linear(self.d_model, self.seq_len)\n \n if self.enc_in != self.c_out:\n self.final_proj = nn.Linear(self.enc_in, self.c_out)\n else:\n self.final_proj = nn.Identity()\n\n def anomaly_detection(self, x_enc):\n B, T, N = x_enc.shape\n \n mean = x_enc.mean(dim=1, keepdim=True).detach()\n var = x_enc.var(dim=1, keepdim=True, unbiased=False).detach()\n stdev = torch.sqrt(var + 1e-5)\n x = (x_enc - mean) / stdev\n \n # Denoising objective\n if self.training:\n x_corrupt = F.dropout(x, p=0.2, training=True)\n else:\n x_corrupt = x\n \n # -- Temporal Branch --\n e_t = self.proj_t(x_corrupt)\n e_t = e_t + self.pe_t(e_t)\n for layer in self.encoder_t:\n e_t = layer(e_t)\n out_t = self.out_t(e_t)\n \n # -- Variate Branch --\n x_v = x_corrupt.transpose(1, 2)\n e_v = self.proj_v(x_v)\n e_v = e_v + self.pe_v\n for layer in self.encoder_v:\n e_v = layer(e_v)\n out_v = self.out_v(e_v)\n out_v = out_v.transpose(1, 2)\n \n # Fusion\n x_pred = (out_t + out_v) * 0.5\n \n out = x_pred * stdev + mean\n out = self.final_proj(out)\n return out\n\n def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):\n if self.task_name == 'anomaly_detection':\n dec_out = self.anomaly_detection(x_enc)\n return dec_out\n return None\n",
3 "filename": "models/Custom.py",
4 "op": "replace",
5 "end_line": 41,
6 "start_line": 1
7}Tool ResultStep 1
ERROR: Package 'models' is not in allowed packages
AssistantStep 2edit
Tool Input
1{
2 "filename": "Time-Series-Library/models/Custom.py",
3 "end_line": 41,
4 "op": "replace",
5 "content": "import torch\nimport torch.nn as nn\nimport torch.nn.functional as F\nimport math\n\nclass PositionalEmbedding(nn.Module):\n def __init__(self, d_model, max_len=5000):\n super(PositionalEmbedding, self).__init__()\n pe = torch.zeros(max_len, d_model).float()\n pe.require_grad = False\n position = torch.arange(0, max_len).float().unsqueeze(1)\n div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()\n pe[:, 0::2] = torch.sin(position * div_term)\n pe[:, 1::2] = torch.cos(position * div_term)\n pe = pe.unsqueeze(0)\n self.register_buffer('pe', pe)\n\n def forward(self, x):\n return self.pe[:, :x.size(1)]\n\nclass EncoderLayer(nn.Module):\n def __init__(self, d_model, n_heads, d_ff, dropout):\n super(EncoderLayer, self).__init__()\n self.self_attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)\n self.conv1 = nn.Conv1d(d_model, d_ff, 1)\n self.conv2 = nn.Conv1d(d_ff, d_model, 1)\n self.norm1 = nn.LayerNorm(d_model)\n self.norm2 = nn.LayerNorm(d_model)\n self.dropout = nn.Dropout(dropout)\n self.activation = F.gelu\n\n def forward(self, x):\n new_x, _ = self.self_attn(x, x, x)\n x = x + self.dropout(new_x)\n y = self.norm1(x)\n y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))\n y = self.dropout(self.conv2(y).transpose(-1, 1))\n return self.norm2(x + y)\n\nclass Model(nn.Module):\n \"\"\"\n Dual-Perspective Denoising Autoencoder for Time Series Anomaly Detection.\n \"\"\"\n def __init__(self, configs):\n super(Model, self).__init__()\n self.task_name = configs.task_name\n self.seq_len = configs.seq_len\n self.pred_len = configs.seq_len\n self.enc_in = configs.enc_in\n self.c_out = configs.c_out\n \n self.d_model = getattr(configs, 'd_model', 128)\n self.n_heads = getattr(configs, 'n_heads', 8)\n self.e_layers = getattr(configs, 'e_layers', 2)\n self.d_ff = getattr(configs, 'd_ff', 256)\n self.dropout = getattr(configs, 'dropout', 0.1)\n \n # Temporal Branch\n self.proj_t = nn.Linear(self.enc_in, self.d_model)\n self.pe_t = PositionalEmbedding(self.d_model, max_len=self.seq_len)\n self.encoder_t = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_t = nn.Linear(self.d_model, self.enc_in)\n \n # Variate Branch\n self.proj_v = nn.Linear(self.seq_len, self.d_model)\n self.pe_v = nn.Parameter(torch.randn(1, self.enc_in, self.d_model) * 0.02)\n self.encoder_v = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_v = nn.Linear(self.d_model, self.seq_len)\n \n if self.enc_in != self.c_out:\n self.final_proj = nn.Linear(self.enc_in, self.c_out)\n else:\n self.final_proj = nn.Identity()\n\n def anomaly_detection(self, x_enc):\n B, T, N = x_enc.shape\n \n mean = x_enc.mean(dim=1, keepdim=True).detach()\n var = x_enc.var(dim=1, keepdim=True, unbiased=False).detach()\n stdev = torch.sqrt(var + 1e-5)\n x = (x_enc - mean) / stdev\n \n # Denoising objective\n if self.training:\n x_corrupt = F.dropout(x, p=0.15, training=True)\n else:\n x_corrupt = x\n \n # -- Temporal Branch --\n e_t = self.proj_t(x_corrupt)\n e_t = e_t + self.pe_t(e_t)\n for layer in self.encoder_t:\n e_t = layer(e_t)\n out_t = self.out_t(e_t)\n \n # -- Variate Branch --\n x_v = x_corrupt.transpose(1, 2)\n e_v = self.proj_v(x_v)\n e_v = e_v + self.pe_v\n for layer in self.encoder_v:\n e_v = layer(e_v)\n out_v = self.out_v(e_v)\n out_v = out_v.transpose(1, 2)\n \n # Fusion\n x_pred = (out_t + out_v) * 0.5\n \n out = x_pred * stdev + mean\n out = self.final_proj(out)\n return out\n\n def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):\n if self.task_name == 'anomaly_detection':\n dec_out = self.anomaly_detection(x_enc)\n return dec_out\n return None\n",
6 "start_line": 1
7}Tool ResultStep 2
OK: Replaced lines 1..41 with 122 line(s) in Time-Series-Library/models/Custom.py. Editable range: entire file.
[Current file: Time-Series-Library/models/Custom.py | editable: 1–122 | total: 122 lines]
1 import torch
2 import torch.nn as nn
3 import torch.nn.functional as F
4 import math
5
6 class PositionalEmbedding(nn.Module):
7 def __init__(self, d_model, max_len=5000):
8 super(PositionalEmbedding, self).__init__()
9 pe = torch.zeros(max_len, d_model).float()
10 pe.require_grad = False
11 position = torch.arange(0, max_len).float().unsqueeze(1)
12 div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
13 pe[:, 0::2] = torch.sin(position * div_term)
14 pe[:, 1::2] = torch.cos(position * div_term)
15 pe = pe.unsqueeze(0)
16 self.register_buffer('pe', pe)
17
18 def forward(s
AssistantStep 3test
Tool ResultStep 3
[Test #1] (2 tests remaining; call submit(n=1) to submit this result) ## Seed 42 ### PSM (scripts/psm.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.0134271 vali_loss=0.0043740 test_loss=0.0065438 TRAIN_METRICS epoch=2 train_loss=0.0095262 vali_loss=0.0055988 test_loss=0.0053275 TRAIN_METRICS epoch=3 train_loss=0.0080759 vali_loss=0.0036037 test_loss=0.0031424 Test results (PSM): F-score: 0.9352, Precision: 0.9897, Recall: 0.8864, Accuracy: 0.9659 ### MSL (scripts/msl.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.1354027 vali_loss=0.0079648 test_loss=0.0416327 TRAIN_METRICS epoch=2 train_loss=0.0939294 vali_loss=0.0086531 test_loss=0.0558232 TRAIN_METRICS epoch=3 train_loss=0.0832496 vali_loss=0.0070921 test_loss=0.0459616 Test results (MSL): F-score: 0.7383, Precision: 0.8706, Recall: 0.6409, Accuracy: 0.9521 ## Seed 123 ### PSM (scripts/psm.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.0130907 vali_l
AssistantStep 4edit
Tool Input
1{
2 "filename": "Time-Series-Library/models/Custom.py",
3 "start_line": 1,
4 "op": "replace",
5 "end_line": 122,
6 "content": "import torch\nimport torch.nn as nn\nimport torch.nn.functional as F\nimport math\n\nclass PositionalEmbedding(nn.Module):\n def __init__(self, d_model, max_len=5000):\n super(PositionalEmbedding, self).__init__()\n pe = torch.zeros(max_len, d_model).float()\n pe.require_grad = False\n position = torch.arange(0, max_len).float().unsqueeze(1)\n div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()\n pe[:, 0::2] = torch.sin(position * div_term)\n pe[:, 1::2] = torch.cos(position * div_term)\n pe = pe.unsqueeze(0)\n self.register_buffer('pe', pe)\n\n def forward(self, x):\n return self.pe[:, :x.size(1)]\n\nclass EncoderLayer(nn.Module):\n def __init__(self, d_model, n_heads, d_ff, dropout):\n super(EncoderLayer, self).__init__()\n self.self_attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)\n self.conv1 = nn.Conv1d(d_model, d_ff, 1)\n self.conv2 = nn.Conv1d(d_ff, d_model, 1)\n self.norm1 = nn.LayerNorm(d_model)\n self.norm2 = nn.LayerNorm(d_model)\n self.dropout = nn.Dropout(dropout)\n self.activation = F.gelu\n\n def forward(self, x):\n new_x, _ = self.self_attn(x, x, x)\n x = x + self.dropout(new_x)\n y = self.norm1(x)\n y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))\n y = self.dropout(self.conv2(y).transpose(-1, 1))\n return self.norm2(x + y)\n\nclass Model(nn.Module):\n \"\"\"\n Latent Block-Masked Dual-Perspective Autoencoder\n \"\"\"\n def __init__(self, configs):\n super(Model, self).__init__()\n self.task_name = configs.task_name\n self.seq_len = configs.seq_len\n self.pred_len = configs.seq_len\n self.enc_in = configs.enc_in\n self.c_out = configs.c_out\n \n self.d_model = getattr(configs, 'd_model', 128)\n self.n_heads = getattr(configs, 'n_heads', 8)\n self.e_layers = getattr(configs, 'e_layers', 2)\n self.d_ff = getattr(configs, 'd_ff', 256)\n self.dropout = getattr(configs, 'dropout', 0.1)\n \n # Masking hyperparameters\n self.patch_len = 10\n self.mask_p_t = 0.25\n self.mask_p_v = 0.25\n \n # Temporal Branch\n self.proj_t = nn.Linear(self.enc_in, self.d_model)\n self.pe_t = PositionalEmbedding(self.d_model, max_len=self.seq_len)\n self.mask_token_t = nn.Parameter(torch.zeros(1, 1, self.d_model))\n self.encoder_t = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_t = nn.Linear(self.d_model, self.enc_in)\n \n # Variate Branch\n self.proj_v = nn.Linear(self.seq_len, self.d_model)\n self.pe_v = nn.Parameter(torch.randn(1, self.enc_in, self.d_model) * 0.02)\n self.mask_token_v = nn.Parameter(torch.zeros(1, 1, self.d_model))\n self.encoder_v = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_v = nn.Linear(self.d_model, self.seq_len)\n \n # Init mask tokens\n nn.init.trunc_normal_(self.mask_token_t, std=0.02)\n nn.init.trunc_normal_(self.mask_token_v, std=0.02)\n \n if self.enc_in != self.c_out:\n self.final_proj = nn.Linear(self.enc_in, self.c_out)\n else:\n self.final_proj = nn.Identity()\n\n def anomaly_detection(self, x_enc):\n B, T, N = x_enc.shape\n \n mean = x_enc.mean(dim=1, keepdim=True).detach()\n var = x_enc.var(dim=1, keepdim=True, unbiased=False).detach()\n stdev = torch.sqrt(var + 1e-5)\n x = (x_enc - mean) / stdev\n \n # ---------------- Temporal Branch ----------------\n e_t = self.proj_t(x) # [B, T, d_model]\n if self.training:\n num_patches = T // self.patch_len\n if num_patches > 0:\n mask_t = (torch.rand(B, num_patches, 1, device=x.device) < self.mask_p_t).float()\n mask_t = mask_t.repeat_interleave(self.patch_len, dim=1)\n pad = T - mask_t.shape[1]\n if pad > 0:\n mask_t = torch.cat([mask_t, torch.zeros(B, pad, 1, device=x.device)], dim=1)\n e_t = e_t * (1 - mask_t) + self.mask_token_t * mask_t\n \n e_t = e_t + self.pe_t(e_t)\n for layer in self.encoder_t:\n e_t = layer(e_t)\n out_t = self.out_t(e_t)\n \n # ---------------- Variate Branch ----------------\n x_v = x.transpose(1, 2) # [B, N, T]\n e_v = self.proj_v(x_v) # [B, N, d_model]\n if self.training:\n mask_v = (torch.rand(B, N, 1, device=x.device) < self.mask_p_v).float()\n e_v = e_v * (1 - mask_v) + self.mask_token_v * mask_v\n \n e_v = e_v + self.pe_v\n for layer in self.encoder_v:\n e_v = layer(e_v)\n out_v = self.out_v(e_v)\n out_v = out_v.transpose(1, 2) # [B, T, N]\n \n # ---------------- Fusion ----------------\n x_pred = (out_t + out_v) * 0.5\n \n out = x_pred * stdev + mean\n out = self.final_proj(out)\n return out\n\n def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):\n if self.task_name == 'anomaly_detection':\n dec_out = self.anomaly_detection(x_enc)\n return dec_out\n return None"
7}Tool ResultStep 4
OK: Replaced lines 1..122 with 141 line(s) in Time-Series-Library/models/Custom.py. Editable range: entire file.
[Current file: Time-Series-Library/models/Custom.py | editable: 1–141 | total: 141 lines]
1 import torch
2 import torch.nn as nn
3 import torch.nn.functional as F
4 import math
5
6 class PositionalEmbedding(nn.Module):
7 def __init__(self, d_model, max_len=5000):
8 super(PositionalEmbedding, self).__init__()
9 pe = torch.zeros(max_len, d_model).float()
10 pe.require_grad = False
11 position = torch.arange(0, max_len).float().unsqueeze(1)
12 div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
13 pe[:, 0::2] = torch.sin(position * div_term)
14 pe[:, 1::2] = torch.cos(position * div_term)
15 pe = pe.unsqueeze(0)
16 self.register_buffer('pe', pe)
17
18 def forward(
AssistantStep 5test
Tool ResultStep 5
[Test #2] (1 test remaining; call submit(n=2) to submit this result)
### PSM (scripts/psm.sh)
Training metrics (last epochs):
TRAIN_METRICS epoch=1 train_loss=0.0183616 vali_loss=0.0023354 test_loss=0.0027719
TRAIN_METRICS epoch=2 train_loss=0.0126146 vali_loss=0.0028398 test_loss=0.0033257
TRAIN_METRICS epoch=3 train_loss=0.0099696 vali_loss=0.0020056 test_loss=0.0022212
Test results (PSM):
F-score: 0.9528, Precision: 0.9868, Recall: 0.9211, Accuracy: 0.9747
### MSL (scripts/msl.sh)
Training metrics (last epochs):
TRAIN_METRICS epoch=1 train_loss=0.1492890 vali_loss=0.0129056 test_loss=0.0546097
TRAIN_METRICS epoch=2 train_loss=0.0909167 vali_loss=0.0204159 test_loss=0.0476133
TRAIN_METRICS epoch=3 train_loss=0.0841907 vali_loss=0.0152254 test_loss=0.0556832
Test results (MSL):
F-score: 0.7583, Precision: 0.8830, Recall: 0.6645, Accuracy: 0.9553
[Leaderboard] Results saved: {'f_score_PSM': 0.9528, 'precision_PSM': 0.9868, 'recall_PSM': 0.9211, 'elapsed_PSM': 468, 'f_score_MSL':
AssistantStep 6edit
Tool Input
1{
2 "filename": "Time-Series-Library/models/Custom.py",
3 "start_line": 444,
4 "op": "replace",
5 "end_line": 136,
6 "content": " def __init__(self, configs):\n super(Model, self).__init__()\n self.task_name = configs.task_name\n self.seq_len = configs.seq_len\n self.pred_len = configs.seq_len\n self.enc_in = configs.enc_in\n self.c_out = configs.c_out\n \n self.d_model = getattr(configs, 'd_model', 128)\n self.n_heads = getattr(configs, 'n_heads', 8)\n self.e_layers = getattr(configs, 'e_layers', 2)\n self.d_ff = getattr(configs, 'd_ff', 256)\n self.dropout = getattr(configs, 'dropout', 0.1)\n \n # Masking hyperparameters\n self.patch_len = 10\n self.mask_p_t = 0.4\n self.mask_p_v = 0.4\n \n # Temporal Branch\n self.proj_t = nn.Linear(self.enc_in, self.d_model)\n self.pe_t = PositionalEmbedding(self.d_model, max_len=self.seq_len)\n self.mask_token_t = nn.Parameter(torch.zeros(1, 1, self.d_model))\n self.encoder_t = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_t = nn.Linear(self.d_model, self.enc_in)\n \n # Variate Branch\n self.proj_v = nn.Linear(self.seq_len, self.d_model)\n self.pe_v = nn.Parameter(torch.randn(1, self.enc_in, self.d_model) * 0.02)\n self.mask_token_v = nn.Parameter(torch.zeros(1, 1, self.d_model))\n self.encoder_v = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_v = nn.Linear(self.d_model, self.seq_len)\n \n # Fusion weights per variate\n self.alpha = nn.Parameter(torch.ones(1, 1, self.enc_in) * 0.5)\n \n # Init mask tokens\n nn.init.trunc_normal_(self.mask_token_t, std=0.02)\n nn.init.trunc_normal_(self.mask_token_v, std=0.02)\n \n if self.enc_in != self.c_out:\n self.final_proj = nn.Linear(self.enc_in, self.c_out)\n else:\n self.final_proj = nn.Identity()\n\n def anomaly_detection(self, x_enc):\n B, T, N = x_enc.shape\n \n mean = x_enc.mean(dim=1, keepdim=True).detach()\n var = x_enc.var(dim=1, keepdim=True, unbiased=False).detach()\n stdev = torch.sqrt(var + 1e-5)\n x = (x_enc - mean) / stdev\n \n # ---------------- Temporal Branch ----------------\n e_t = self.proj_t(x) # [B, T, d_model]\n if self.training:\n num_patches = T // self.patch_len\n if num_patches > 0:\n mask_t = (torch.rand(B, num_patches, 1, device=x.device) < self.mask_p_t).float()\n mask_t = mask_t.repeat_interleave(self.patch_len, dim=1)\n pad = T - mask_t.shape[1]\n if pad > 0:\n mask_t = torch.cat([mask_t, torch.zeros(B, pad, 1, device=x.device)], dim=1)\n e_t = e_t * (1 - mask_t) + self.mask_token_t * mask_t\n \n e_t = e_t + self.pe_t(e_t)\n for layer in self.encoder_t:\n e_t = layer(e_t)\n out_t = self.out_t(e_t)\n \n # ---------------- Variate Branch ----------------\n x_v = x.transpose(1, 2) # [B, N, T]\n e_v = self.proj_v(x_v) # [B, N, d_model]\n if self.training:\n mask_v = (torch.rand(B, N, 1, device=x.device) < self.mask_p_v).float()\n e_v = e_v * (1 - mask_v) + self.mask_token_v * mask_v\n \n e_v = e_v + self.pe_v\n for layer in self.encoder_v:\n e_v = layer(e_v)\n out_v = self.out_v(e_v)\n out_v = out_v.transpose(1, 2) # [B, T, N]\n \n # ---------------- Fusion ----------------\n x_pred = out_t * self.alpha + out_v * (1.0 - self.alpha)\n \n out = x_pred * stdev + mean\n out = self.final_proj(out)\n return out"
7}Tool ResultStep 6
ERROR: Invalid line range 444..136 (file has 141 lines)
[Current file: Time-Series-Library/models/Custom.py | editable: 1–141 | total: 141 lines]
1 import torch
2 import torch.nn as nn
3 import torch.nn.functional as F
4 import math
5
6 class PositionalEmbedding(nn.Module):
7 def __init__(self, d_model, max_len=5000):
8 super(PositionalEmbedding, self).__init__()
9 pe = torch.zeros(max_len, d_model).float()
10 pe.require_grad = False
11 position = torch.arange(0, max_len).float().unsqueeze(1)
12 div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
13 pe[:, 0::2] = torch.sin(position * div_term)
14 pe[:, 1::2] = torch.cos(position * div_term)
15 pe = pe.unsqueeze(0)
16 self.register_buffer('pe', pe)
17
18 def forward(self, x):
19 return self.pe[:, :x.size(1)]
AssistantStep 7edit
Tool Input
1{
2 "op": "replace",
3 "end_line": 135,
4 "start_line": 44,
5 "filename": "Time-Series-Library/models/Custom.py",
6 "content": " def __init__(self, configs):\n super(Model, self).__init__()\n self.task_name = configs.task_name\n self.seq_len = configs.seq_len\n self.pred_len = configs.seq_len\n self.enc_in = configs.enc_in\n self.c_out = configs.c_out\n \n self.d_model = getattr(configs, 'd_model', 128)\n self.n_heads = getattr(configs, 'n_heads', 8)\n self.e_layers = getattr(configs, 'e_layers', 2)\n self.d_ff = getattr(configs, 'd_ff', 256)\n self.dropout = getattr(configs, 'dropout', 0.1)\n \n # Masking hyperparameters\n self.patch_len = 10\n self.mask_p_t = 0.4\n self.mask_p_v = 0.4\n \n # Temporal Branch\n self.proj_t = nn.Linear(self.enc_in, self.d_model)\n self.pe_t = PositionalEmbedding(self.d_model, max_len=self.seq_len)\n self.mask_token_t = nn.Parameter(torch.zeros(1, 1, self.d_model))\n self.encoder_t = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_t = nn.Linear(self.d_model, self.enc_in)\n \n # Variate Branch\n self.proj_v = nn.Linear(self.seq_len, self.d_model)\n self.pe_v = nn.Parameter(torch.randn(1, self.enc_in, self.d_model) * 0.02)\n self.mask_token_v = nn.Parameter(torch.zeros(1, 1, self.d_model))\n self.encoder_v = nn.ModuleList([\n EncoderLayer(self.d_model, self.n_heads, self.d_ff, self.dropout)\n for _ in range(self.e_layers)\n ])\n self.out_v = nn.Linear(self.d_model, self.seq_len)\n \n # Fusion weights per variate\n self.alpha = nn.Parameter(torch.ones(1, 1, self.enc_in) * 0.5)\n \n # Init mask tokens\n nn.init.trunc_normal_(self.mask_token_t, std=0.02)\n nn.init.trunc_normal_(self.mask_token_v, std=0.02)\n \n if self.enc_in != self.c_out:\n self.final_proj = nn.Linear(self.enc_in, self.c_out)\n else:\n self.final_proj = nn.Identity()\n\n def anomaly_detection(self, x_enc):\n B, T, N = x_enc.shape\n \n mean = x_enc.mean(dim=1, keepdim=True).detach()\n var = x_enc.var(dim=1, keepdim=True, unbiased=False).detach()\n stdev = torch.sqrt(var + 1e-5)\n x = (x_enc - mean) / stdev\n \n # ---------------- Temporal Branch ----------------\n e_t = self.proj_t(x) # [B, T, d_model]\n if self.training:\n num_patches = T // self.patch_len\n if num_patches > 0:\n mask_t = (torch.rand(B, num_patches, 1, device=x.device) < self.mask_p_t).float()\n mask_t = mask_t.repeat_interleave(self.patch_len, dim=1)\n pad = T - mask_t.shape[1]\n if pad > 0:\n mask_t = torch.cat([mask_t, torch.zeros(B, pad, 1, device=x.device)], dim=1)\n e_t = e_t * (1 - mask_t) + self.mask_token_t * mask_t\n \n e_t = e_t + self.pe_t(e_t)\n for layer in self.encoder_t:\n e_t = layer(e_t)\n out_t = self.out_t(e_t)\n \n # ---------------- Variate Branch ----------------\n x_v = x.transpose(1, 2) # [B, N, T]\n e_v = self.proj_v(x_v) # [B, N, d_model]\n if self.training:\n mask_v = (torch.rand(B, N, 1, device=x.device) < self.mask_p_v).float()\n e_v = e_v * (1 - mask_v) + self.mask_token_v * mask_v\n \n e_v = e_v + self.pe_v\n for layer in self.encoder_v:\n e_v = layer(e_v)\n out_v = self.out_v(e_v)\n out_v = out_v.transpose(1, 2) # [B, T, N]\n \n # ---------------- Fusion ----------------\n x_pred = out_t * self.alpha + out_v * (1.0 - self.alpha)\n \n out = x_pred * stdev + mean\n out = self.final_proj(out)\n return out"
7}Tool ResultStep 7
OK: Replaced lines 44..135 with 95 line(s) in Time-Series-Library/models/Custom.py. Editable range: entire file.
[Current file: Time-Series-Library/models/Custom.py | editable: 1–144 | total: 144 lines]
1 import torch
2 import torch.nn as nn
3 import torch.nn.functional as F
4 import math
5
6 class PositionalEmbedding(nn.Module):
7 def __init__(self, d_model, max_len=5000):
8 super(PositionalEmbedding, self).__init__()
9 pe = torch.zeros(max_len, d_model).float()
10 pe.require_grad = False
11 position = torch.arange(0, max_len).float().unsqueeze(1)
12 div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
13 pe[:, 0::2] = torch.sin(position * div_term)
14 pe[:, 1::2] = torch.cos(position * div_term)
15 pe = pe.unsqueeze(0)
16 self.register_buffer('pe', pe)
17
18 def forward(
AssistantStep 8test
Tool ResultStep 8
[Test #3] (0 tests remaining) ## Seed 42 ### PSM (scripts/psm.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.0282243 vali_loss=0.0052857 test_loss=0.0055203 TRAIN_METRICS epoch=2 train_loss=0.0198345 vali_loss=0.0047195 test_loss=0.0049557 TRAIN_METRICS epoch=3 train_loss=0.0162616 vali_loss=0.0039652 test_loss=0.0041921 Test results (PSM): F-score: 0.9512, Precision: 0.9854, Recall: 0.9193, Accuracy: 0.9738 ### MSL (scripts/msl.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.2168707 vali_loss=0.0308854 test_loss=0.1031658 TRAIN_METRICS epoch=2 train_loss=0.1513655 vali_loss=0.0349131 test_loss=0.0996974 TRAIN_METRICS epoch=3 train_loss=0.1452134 vali_loss=0.0389029 test_loss=0.1443038 Test results (MSL): F-score: 0.7660, Precision: 0.8818, Recall: 0.6771, Accuracy: 0.9564 ### SMAP (scripts/smap.sh) Training metrics (last epochs): TRAIN_METRICS epoch=1 train_loss=0.2853115 vali_loss=0.0300938 test_loss=0.0640166 TRAIN_METRICS epoc