Agent Conversation: gpt-5.4-pro

Model: gpt-5.4-pro
Total steps: 2
Messages: 5
UserStep 0

Task: ts-short-term-forecast

Short-Term Time Series Forecasting: Custom Model Design

Objective

Design and implement a custom deep learning model for univariate short-term time series forecasting on the M4 dataset. Your code goes in the Model class in models/Custom.py. Three reference implementations (DLinear, TimesNet, PatchTST) are provided as read-only.

Evaluation

Trained and evaluated on three M4 seasonal patterns:

  • Monthly (pred_len=18, seq_len=104)
  • Quarterly (pred_len=8, seq_len=52)
  • Yearly (pred_len=6, seq_len=42)

All use enc_in=1, features=M, loss=SMAPE. Metric: SMAPE (lower is better).

Time-Series-Library/models/Custom.py [EDITABLE — entire file only]

     1: import torch
     2: import torch.nn as nn
     3: 
     4: 
     5: class Model(nn.Module):
     6:     """
     7:     Custom model for short-term time series forecasting (M4 dataset).
     8: 
     9:     Forward signature: forward(x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None)
    10:     - x_enc: [batch, seq_len, enc_in] — input time series (enc_in=1 for M4)
    11:     - x_mark_enc: [batch, seq_len, time_features] — time feature encoding
    12:     - x_dec: [batch, label_len+pred_len, dec_in] — decoder input
    13:     - x_mark_dec: [batch, label_len+pred_len, time_features] — decoder time features
    14: 
    15:     Must return: [batch, pred_len, c_out] for forecasting
    16:     """
    17: 
    18:     def __init__(self, configs):
    19:         super(Model, self).__init__()
    20:         self.task_name = configs.task_name
    21:         self.seq_len = configs.seq_len
    22:         self.pred_len = configs.pred_len
    23:         self.enc_in = configs.enc_in
    24:         self.c_out = configs.c_out
    25:         # TODO: Define your model architecture here
    26: 
    27:     def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
    28:         """
    29:         Forecasting: given input sequence, predict future values.
    30:         Input: x_enc [batch, seq_len, 1]
    31:         Output: [batch, pred_len, 1]
    32:         """
    33:         # TODO: Implement your forecasting logic
    34:         batch_size = x_enc.shape[0]
    35:         return torch.zeros(batch_size, self.pred_len, self.c_out).to(x_enc.device)
    36: 
    37:     def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None):
    38:         if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast':
    39:             dec_out = self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec)
    40:             return dec_out[:, -self.pred_len:, :]
    41:         return None

Time-Series-Library/models/DLinear.py [READ-ONLY — do not edit]

     1: import torch
     2: import torch.nn as nn
     3: import torch.nn.functional as F
     4: from layers.Autoformer_EncDec import series_decomp
     5: 
     6: 
     7: class Model(nn.Module):
     8:     """
     9:     Paper link: https://a
AssistantStep 1edit
Tool Input
1{
2  "op": "replace",
3  "filename": "Time-Series-Library/models/Custom.py",
4  "content": "import math\nimport torch\nimport torch.nn as nn\nimport torch.nn.functional as F\n\n\nclass Model(nn.Module):\n    \"\"\"\n    AutoPhaseTrend: a decomposition-based short-term forecaster with two analytic priors\n    and learnable correction heads.\n\n    Key ideas:\n    1) sample-adaptive multi-scale moving-average trend extraction,\n    2) confidence-shrunk trend extrapolation via weighted least-squares,\n    3) autocorrelation-weighted phase averaging over candidate lags for seasonal extrapolation,\n    4) zero-initialized linear correction heads that learn residual errors on top of the priors.\n    \"\"\"\n\n    def __init__(self, configs):\n        super(Model, self).__init__()\n        self.task_name = configs.task_name\n        self.seq_len = configs.seq_len\n        self.pred_len = configs.pred_len\n        self.enc_in = configs.enc_in\n        self.c_out = configs.c_out\n        self.eps = 1e-5\n\n        self.kernels = self._build_kernels(self.seq_len, self.pred_len)\n        self.candidate_lags = self._build_candidate_lags(self.seq_len, self.pred_len)\n        self.top_k = min(5, len(self.candidate_lags))\n\n        phase_maps = torch.stack([self._build_phase_map(lag) for lag in self.candidate_lags], dim=0)\n        lag_prior = torch.tensor(\n            [1.0 - math.exp(-float(lag) / 3.0) for lag in self.candidate_lags],\n            dtype=torch.float32,\n        )\n        trend_weights = torch.pow(torch.tensor(0.97, dtype=torch.float32),\n                                  torch.arange(self.seq_len - 1, -1, -1, dtype=torch.float32))\n\n        self.register_buffer('phase_maps', phase_maps)          # [num_lags, pred_len, seq_len]\n        self.register_buffer('lag_prior', lag_prior)            # [num_lags]\n        self.register_buffer('time_index', torch.arange(self.seq_len, dtype=torch.float32))\n        self.register_buffer('future_index', torch.arange(self.seq_len, self.seq_len + self.pred_len, dtype=torch.float32))\n        self.register_buffer('trend_weights', trend_weights)\n\n        self.kernel_gate = nn.Sequential(\n            nn.Linear(6, 16),\n            nn.GELU(),\n            nn.Linear(16, len(self.kernels))\n        )\n\n        self.refine_trend = nn.Linear(self.seq_len, self.pred_len)\n        self.refine_season = nn.Linear(self.seq_len, self.pred_len)\n\n        # Positive branch scalings with initialization at 1.0 via 2 * sigmoid(0).\n        self.trend_scale_param = nn.Parameter(torch.tensor(0.0))\n        self.season_scale_param = nn.Parameter(torch.tensor(0.0))\n        self.lag_scale_param = nn.Parameter(torch.zeros(len(self.candidate_lags)))\n\n        self._reset_parameters()\n\n    def _reset_parameters(self):\n        nn.init.zeros_(self.refine_trend.weight)\n        nn.init.zeros_(self.refine_trend.bias)\n        nn.init.zeros_(self.refine_season.weight)\n        nn.init.zeros_(self.refine_season.bias)\n\n        # Start from uniform kernel mixing.\n        nn.init.zeros_(self.kernel_gate[-1].weight)\n        nn.init.zeros_(self.kernel_gate[-1].bias)\n\n    @staticmethod\n    def _make_odd(k, upper):\n        k = max(3, int(k))\n        upper = max(3, int(upper))\n        if upper % 2 == 0:\n            upper -= 1\n        k = min(k, upper)\n        if k % 2 == 0:\n            k = max(3, k - 1)\n        return k\n\n    def _build_kernels(self, seq_len, pred_len):\n        raw = [3, pred_len + 1, seq_len // 3]\n        kernels = []\n        for k in raw:\n            k = self._make_odd(k, seq_len)\n            if k not in kernels:\n                kernels.append(k)\n        while len(kernels) < 3:\n            nxt = self._make_odd(kernels[-1] + 2, seq_len)\n            if nxt not in kernels:\n                kernels.append(nxt)\n            else:\n                break\n        return kernels\n\n    def _build_candidate_lags(self, seq_len, pred_len):\n        max_lag = min(seq_len - 1, max(seq_len // 2, pred_len * 3))\n        return list(range(1, max_lag + 1))\n\n    def _build_phase_map(self, lag, decay=0.75):\n        phase_map = torch.zeros(self.pred_len, self.seq_len, dtype=torch.float32)\n        for h in range(self.pred_len):\n            future_idx = self.seq_len + h\n            aligned = []\n            weights = []\n            for t in range(self.seq_len):\n                delta = future_idx - t\n                if delta > 0 and delta % lag == 0:\n                    cycles_back = delta // lag\n                    aligned.append(t)\n                    weights.append(decay ** max(cycles_back - 1, 0))\n            if aligned:\n                w = torch.tensor(weights, dtype=torch.float32)\n                w = w / (w.sum() + self.eps)\n                phase_map[h, torch.tensor(aligned, dtype=torch.long)] = w\n        return phase_map\n\n    def _moving_average(self, x, kernel_size):\n        pad = (kernel_size - 1) // 2\n        x_t = x.permute(0, 2, 1)\n        x_t = F.pad(x_t, (pad, pad), mode='replicate')\n        x_t = F.avg_pool1d(x_t, kernel_size=kernel_size, stride=1)\n        return x_t.permute(0, 2, 1)\n\n    def _series_stats(self, x):\n        # Reduce multivariate signals to shared scalar descriptors; M4 uses enc_in=1.\n        def reduce_mean(v):\n            return v.mean(dim=(1, 2))\n\n        diff = x[:, 1:, :] - x[:, :-1, :] if x.size(1) > 1 else torch.zeros_like(x)\n        diff2 = diff[:, 1:, :] - diff[:, :-1, :] if diff.size(1) > 1 else torch.zeros_like(diff[:, :1, :])\n        recent = min(x.size(1), max(4, self.pred_len))\n\n        stats = [\n            reduce_mean(x),\n            torch.sqrt(x.pow(2).mean(dim=(1, 2)) + self.eps),\n            reduce_mean(diff.abs()) if diff.numel() > 0 else torch.zeros(x.size(0), device=x.device),\n            reduce_mean(diff[:, -min(diff.size(1), recent):, :].abs()) if diff.size(1) > 0 else torch.zeros(x.size(0), device=x.device),\n            reduce_mean(diff2.abs()) if diff2.numel() > 0 else torch.zeros(x.size(0), device=x.device),\n            reduce_mean(x[:, -recent:, :]) - reduce_mean(x[:, :recent, :]),\n        ]\n        return torch.stack(stats, dim=-1)\n\n    def _multi_scale_decompose(self, x):\n        trend_stack = torch.stack([self._moving_average(x, k) for k in self.kernels], dim=-1)\n        kernel_logits = self.kernel_gate(self._series_stats(x))\n        kernel_weights = torch.softmax(kernel_logits, dim=-1)\n        trend = (trend_stack * kernel_weights[:, None, None, :]).sum(dim=-1)\n        seasonal = x - trend\n        return trend, seasonal\n\n    def _trend_extrapolate(self, trend):\n        # trend: [B, T, C]\n        y = trend.permute(0, 2, 1)  # [B, C, T]\n        t = self.time_index.to(trend.device)\n        future = self.future_index.to(trend.device)\n        w = self.trend_weights.to(trend.device)\n\n        sw = w.sum()\n        sx = (w * t).sum()\n        sxx = (w * t * t).sum()\n        denom = sw * sxx - sx * sx + self.eps\n\n        w_view = w.view(1, 1, -1)\n        sy = (y * w_view).sum(dim=-1)\n        sxy = (y * (w * t).view(1, 1, -1)).sum(dim=-1)\n\n        slope = (sw * sxy - sx * sy) / denom\n\n        mean_y = sy / sw\n        intercept = (sy - slope * sx) / sw\n        fit = intercept.unsqueeze(-1) + slope.unsqueeze(-1) * t.view(1, 1, -1)\n        fit_error = ((y - fit).pow(2) * w_view).sum(dim=-1) / sw\n        var_y = ((y - mean_y.unsqueeze(-1)).pow(2) * w_view).sum(dim=-1) / sw + self.eps\n        confidence = var_y / (var_y + fit_error + self.eps)\n        slope = slope * confidence\n\n        # Anchor extrapolation at the last observed trend level and only extrapolate the slope.\n        horizon_steps = (future - t[-1]).view(1, 1, -1)\n        last_level = y[:, :, -1:].expand(-1, -1, self.pred_len)\n        pred = last_level + slope.unsqueeze(-1) * horizon_steps\n        return pred.permute(0, 2, 1)\n\n    def _phase_extrapolate(self, seasonal):\n        # seasonal: [B, T, C]\n        x = seasonal - seasonal.mean(dim=1, keepdim=True)\n        bsz, _, channels = x.shape\n\n        corr_list = []\n        for lag in self.candidate_lags:\n            left = x[:, lag:, :]\n            right = x[:, :-lag, :]\n            num = (left * right).mean(dim=(1, 2))\n            den = torch.sqrt(left.pow(2).mean(dim=(1, 2)) * right.pow(2).mean(dim=(1, 2)) + self.eps)\n            corr_list.append(num / den)\n        corr = torch.stack(corr_list, dim=-1)  # [B, num_lags]\n\n        lag_scale = 1.0 + 0.5 * torch.tanh(self.lag_scale_param).unsqueeze(0)\n        scores = torch.relu(corr) * self.lag_prior.unsqueeze(0) * lag_scale\n\n        top_vals, top_idx = torch.topk(scores, k=self.top_k, dim=-1)\n\n        # Build phase templates for every lag, then gather the strongest few.\n        all_templates = torch.einsum('bct,lht->blch', x.permute(0, 2, 1), self.phase_maps)\n        all_templates = all_templates.permute(0, 1, 3, 2)  # [B, num_lags, pred_len, C]\n        gather_idx = top_idx[:, :, None, None].expand(-1, -1, self.pred_len, channels)\n        selected_templates = torch.gather(all_templates, dim=1, index=gather_idx)\n\n        weights = torch.softmax(top_vals * 6.0, dim=-1)\n        seasonal_pred = (selected_templates * weights[:, :, None, None]).sum(dim=1)\n\n        best_corr = torch.gather(corr, dim=1, index=top_idx[:, :1]).clamp_min(0.0)\n        season_gate = torch.sigmoid(8.0 * (best_corr - 0.10)).unsqueeze(-1)\n        return seasonal_pred * season_gate\n\n    def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):\n        \"\"\"\n        Forecasting: given input sequence, predict future values.\n        Input: x_enc [batch, seq_len, enc_in]\n        Output: [batch, pred_len, c_out]\n        \"\"\"\n        anchor = x_enc[:, -1:, :]\n        scale = torch.sqrt(torch.mean((x_enc - anchor).pow(2), dim=1, keepdim=True) + self.eps)\n        scale = scale.clamp_min(1e-3)\n        x = (x_enc - anchor) / scale\n\n        trend, seasonal = self._multi_scale_decompose(x)\n\n        trend_prior = self._trend_extrapolate(trend)\n        season_prior = self._phase_extrapolate(seasonal)\n\n        trend_refine = self.refine_trend(trend.permute(0, 2, 1)).permute(0, 2, 1)\n        season_refine = self.refine_season(seasonal.permute(0, 2, 1)).permute(0, 2, 1)\n\n        trend_scale = 2.0 * torch.sigmoid(self.t\n... (truncated)",
5  "after_line": 41,
6  "start_line": 1,
7  "end_line": 41
8}
Tool ResultStep 1
OK: Replaced lines 1..41 with 248 line(s) in Time-Series-Library/models/Custom.py. Editable range: entire file.

[Current file: Time-Series-Library/models/Custom.py | editable: 1–248 | total: 248 lines]
     1  import math
     2  import torch
     3  import torch.nn as nn
     4  import torch.nn.functional as F
     5  
     6  
     7  class Model(nn.Module):
     8      """
     9      AutoPhaseTrend: a decomposition-based short-term forecaster with two analytic priors
    10      and learnable correction heads.
    11  
    12      Key ideas:
    13      1) sample-adaptive multi-scale moving-average trend extraction,
    14      2) confidence-shrunk trend extrapolation via weighted least-squares,
    15      3) autocorrelation-weighted phase averaging over candidate lags for seasonal extrapolation,
    16      4) zero-initialized linear correction heads that learn residual errors on top of the priors.
    17      """
    18  
    19      def __init__(self, configs):
    20         
AssistantStep 2test
Tool ResultStep 2
[Test #1] (2 tests remaining; call submit(n=1) to submit this result)

### m4_monthly (scripts/m4_monthly.sh)
Training metrics (last epochs):
TRAIN_METRICS epoch=6 train_loss=7.9097877 vali_loss=13.0523475
TRAIN_METRICS epoch=7 train_loss=7.8721694 vali_loss=13.0436300
TRAIN_METRICS epoch=8 train_loss=7.8878031 vali_loss=13.0330842
TRAIN_METRICS epoch=9 train_loss=7.8874104 vali_loss=13.0316344
TRAIN_METRICS epoch=10 train_loss=7.8774570 vali_loss=13.0317768
Test results (m4_monthly):
  SMAPE: 13.0316, MAPE: 13.0316, MAPE: 0.1571

### m4_quarterly (scripts/m4_quarterly.sh)
Training metrics (last epochs):
TRAIN_METRICS epoch=6 train_loss=7.3060970 vali_loss=10.5095915
TRAIN_METRICS epoch=7 train_loss=7.2327723 vali_loss=10.5077496
TRAIN_METRICS epoch=8 train_loss=7.2568031 vali_loss=10.5088777
TRAIN_METRICS epoch=9 train_loss=7.2866153 vali_loss=10.5102748
TRAIN_METRICS epoch=10 train_loss=7.3193247 vali_loss=10.5093887
Test results (m4_quarterly):
  SMAPE: 10.5077, MAPE: 10.5077, MAPE: