ADR 0010: ForecastNode — Predictive Analytics Integration

Status

Proposed (Phase 4.1 MVP)

Context

Jorvis currently excels at descriptive analytics (answering "what happened?"). The natural evolution is predictive analytics (answering "what will happen?"). This capability is documented in:

  • ARCHITECTURE_TARGET_VISION.md §2.5 "Predictive Analytics"
  • ROADMAP_v3_Graph.md Phase 4 "Predict Lane"

Time-series forecasting is the most requested predictive capability for business analytics:

  • Sales forecasting
  • Demand prediction
  • Trend analysis
  • Anomaly prediction

Decision

Implement predictive analytics via a ForecastNode in the graph orchestration pipeline, using a Python Sidecar pattern (similar to sqlglot-sidecar) for the MVP phase.

Architecture: Python Sidecar (Phase 4.1 MVP)

┌─────────────────────────────────────────────────────────────┐
│                     NestJS Backend                          │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │DataExecution │───▶│ DataQuality  │───▶│ ForecastNode │  │
│  │    Node      │    │    Node      │    │              │  │
│  └──────────────┘    └──────────────┘    └──────┬───────┘  │
│                                                  │          │
│                                          HTTP POST /forecast│
│                                                  │          │
└──────────────────────────────────────────────────┼──────────┘
┌─────────────────────────────────────────────────────────────┐
│                  forecast-sidecar (FastAPI)                 │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │  Validator   │───▶│   Prophet    │───▶│  Formatter   │  │
│  │  (Min Data)  │    │   Engine     │    │  (Response)  │  │
│  └──────────────┘    └──────────────┘    └──────────────┘  │
└─────────────────────────────────────────────────────────────┘

Why Python Sidecar (not DB-Native ML)?

CriterionPython SidecarDB-Native (BigQuery ML)
Postgres support✅ Yes❌ No
Vendor lock-in✅ None⚠️ High
PrerequisitesNoneMulti-Dialect support
Time to MVP1-2 weeks4-6 weeks
Existing pattern✅ sqlglot-sidecar❌ New

Phase 4.2 will add DB-Native ML as an alternative strategy for BigQuery/Snowflake users.

Specification

1. forecast-sidecar API Contract

POST /forecast

Request:

{
  "timestamps": ["2024-01-01", "2024-01-02", ...],
  "values": [100, 105, 98, ...],
  "periods": 30,
  "frequency": "D",
  "confidence_interval": 0.95,
  "include_components": false
}
FieldTypeRequiredDescription
timestampsstring[]YesISO 8601 date strings
valuesnumber[]YesNumeric time series values
periodsintYesNumber of future periods to forecast
frequencyenumNoD (day), W (week), M (month), Y (year). Default: auto-detect
confidence_intervalfloatNo0.8, 0.9, 0.95. Default: 0.95
include_componentsboolNoInclude trend/seasonality breakdown. Default: false

Response (Success):

{
  "success": true,
  "forecast": {
    "timestamps": ["2024-02-01", "2024-02-02", ...],
    "values": [112.5, 115.2, ...],
    "lower": [105.1, 107.8, ...],
    "upper": [119.9, 122.6, ...]
  },
  "metrics": {
    "mape": 4.7,
    "rmse": 12.3,
    "mae": 9.8,
    "data_points": 365,
    "model": "prophet"
  },
  "components": null
}

Response (Error):

{
  "success": false,
  "error": {
    "code": "INSUFFICIENT_DATA",
    "message": "Need at least 10 data points for forecasting"
  },
  "forecast": null,
  "metrics": null
}

Error Codes

CodeDescription
INSUFFICIENT_DATALess than 10 data points
INVALID_TIMESTAMPSTimestamps not parseable or not monotonic
INVALID_VALUESValues contain non-numeric or all-null
FREQUENCY_MISMATCHDetected frequency doesn't match specified
MODEL_FIT_FAILEDProphet failed to fit (numerical issues)
TIMEOUTForecast took longer than 30s

GET /health

{
  "status": "ok",
  "fingerprint": "sha256:abc123...",
  "service_version": "0.1.0",
  "prophet_version": "1.1.5",
  "min_data_points": 10,
  "max_forecast_periods": 365,
  "timeout_seconds": 30
}

2. ForecastNode Graph Integration

Location in Graph Flow

DataExecutionNode → DataQualityNode → [ForecastNode] → VisualizationNode → SynthesisNode
                                     (conditional)

ForecastNode activates when:

  1. JORVIS_FORECAST_ENABLED=true
  2. Query results contain time-series data (date column + numeric column)
  3. User question contains forecast intent keywords

Intent Detection (SplitterNode enhancement)

private hasForecastIntent(question: string): boolean {
  const FORECAST_KEYWORDS = [
    /\bforecast\b/i,
    /\bpredict(ion)?\b/i,
    /\btrend\b/i,
    /\bnext\s+(week|month|quarter|year)\b/i,
    /\bwill\s+(be|reach|hit)\b/i,
    /\bproject(ion|ed)?\b/i,
    /\bestimate\b/i,
    /\bexpect(ed)?\b/i,
  ];
  return FORECAST_KEYWORDS.some((re) => re.test(question));
}

ForecastNode Implementation

// analytics-platform/src/ai/graph/nodes/forecast.node.ts

@Injectable()
export class ForecastNode implements GraphNode {
  private readonly logger = new Logger(ForecastNode.name);
  private readonly enabled: boolean;
  private readonly sidecarUrl: string;
  private readonly timeoutMs: number;

  constructor(
    private readonly configService: ConfigService,
    private readonly httpService: HttpService,
  ) {
    this.enabled = configService.get('JORVIS_FORECAST_ENABLED') === 'true';
    this.sidecarUrl = configService.get('FORECAST_SIDECAR_URL', 'http://localhost:8001');
    this.timeoutMs = parseInt(configService.get('FORECAST_TIMEOUT_MS', '30000'));
  }

  async execute(state: GraphState): Promise<Partial<GraphState>> {
    // 1. Feature flag check
    if (!this.enabled) {
      return { nextNode: 'VisualizationNode' };
    }

    // 2. Check if forecast was requested
    const forecastRequested = state.metadata?.forecastIntent === true;
    if (!forecastRequested) {
      return { nextNode: 'VisualizationNode' };
    }

    // 3. Validate data suitability
    const timeSeries = this.extractTimeSeries(state.queryResults);
    if (!timeSeries) {
      this.logger.warn('Data not suitable for forecasting');
      return {
        nextNode: 'VisualizationNode',
        metadata: {
          ...state.metadata,
          forecastSkipped: true,
          forecastSkipReason: 'NOT_TIME_SERIES',
        },
      };
    }

    // 4. Call sidecar
    const startTime = Date.now();
    try {
      const response = await this.callForecastSidecar(timeSeries);
      const duration = Date.now() - startTime;

      if (!response.success) {
        this.logger.warn(`Forecast failed: ${response.error?.code}`);
        return {
          nextNode: 'VisualizationNode',
          metadata: {
            ...state.metadata,
            forecastAttempted: true,
            forecastError: response.error?.code,
          },
        };
      }

      this.logger.log(
        `Forecast generated: ${response.forecast.values.length} periods, ` +
        `MAPE=${response.metrics.mape}%, duration=${duration}ms`
      );

      // 5. Enrich visualization payload
      return {
        nextNode: 'VisualizationNode',
        visualizationPayload: this.enrichVisualization(
          state.visualizationPayload,
          response.forecast,
          response.metrics
        ),
        metadata: {
          ...state.metadata,
          forecastGenerated: true,
          forecastPeriods: response.forecast.values.length,
          forecastMape: response.metrics.mape,
          forecastDurationMs: duration,
        },
      };
    } catch (error) {
      this.logger.error(`Forecast sidecar error: ${(error as Error).message}`);
      return {
        nextNode: 'VisualizationNode',
        metadata: {
          ...state.metadata,
          forecastAttempted: true,
          forecastError: 'SIDECAR_UNAVAILABLE',
        },
      };
    }
  }

  private extractTimeSeries(
    results: Array<Record<string, unknown>> | undefined
  ): TimeSeriesData | null {
    if (!results || results.length < 10) return null;

    const columns = Object.keys(results[0]);
    const dateCol = columns.find((c) => this.isDateColumn(c, results));
    const valueCol = columns.find((c) => this.isNumericColumn(c, results));

    if (!dateCol || !valueCol) return null;

    return {
      timestamps: results.map((r) => String(r[dateCol])),
      values: results.map((r) => Number(r[valueCol])),
    };
  }

  private isDateColumn(col: string, results: Array<Record<string, unknown>>): boolean {
    const sample = results[0][col];
    if (!sample) return false;
    const parsed = new Date(String(sample));
    return !isNaN(parsed.getTime());
  }

  private isNumericColumn(col: string, results: Array<Record<string, unknown>>): boolean {
    return results.every((r) => typeof r[col] === 'number' || !isNaN(Number(r[col])));
  }

  private async callForecastSidecar(data: TimeSeriesData): Promise<ForecastResponse> {
    const response = await firstValueFrom(
      this.httpService.post<ForecastResponse>(
        `${this.sidecarUrl}/forecast`,
        {
          timestamps: data.timestamps,
          values: data.values,
          periods: 30,
          confidence_interval: 0.95,
        },
        { timeout: this.timeoutMs }
      )
    );
    return response.data;
  }

  private enrichVisualization(
    existing: any,
    forecast: ForecastData,
    metrics: ForecastMetrics
  ): any {
    return {
      ...existing,
      forecast: {
        timestamps: forecast.timestamps,
        values: forecast.values,
        lower: forecast.lower,
        upper: forecast.upper,
        confidence: 0.95,
      },
      forecastMetrics: {
        mape: metrics.mape,
        model: metrics.model,
      },
    };
  }
}

interface TimeSeriesData {
  timestamps: string[];
  values: number[];
}

interface ForecastData {
  timestamps: string[];
  values: number[];
  lower: number[];
  upper: number[];
}

interface ForecastMetrics {
  mape: number;
  rmse: number;
  mae: number;
  data_points: number;
  model: string;
}

interface ForecastResponse {
  success: boolean;
  forecast: ForecastData | null;
  metrics: ForecastMetrics | null;
  error?: { code: string; message: string };
}

3. VisualPayload Extension

// Extend VisualPayload type
export interface VisualPayload {
  // ... existing fields ...

  forecast?: {
    readonly timestamps: string[];
    readonly values: number[];
    readonly lower: number[];
    readonly upper: number[];
    readonly confidence: number;
  };
  forecastMetrics?: {
    readonly mape: number;
    readonly model: string;
  };
}

4. forecast-sidecar Implementation

# analytics-platform/forecast-sidecar/main.py

import os
from typing import List, Optional
from fastapi import FastAPI
from pydantic import BaseModel, Field
from prophet import Prophet
import pandas as pd
import numpy as np

SERVICE_VERSION = '0.1.0'
MIN_DATA_POINTS = int(os.getenv('FORECAST_MIN_DATA_POINTS', '10'))
MAX_FORECAST_PERIODS = int(os.getenv('FORECAST_MAX_PERIODS', '365'))
TIMEOUT_SECONDS = int(os.getenv('FORECAST_TIMEOUT_SECONDS', '30'))


class ForecastRequest(BaseModel):
    timestamps: List[str] = Field(..., min_items=10)
    values: List[float] = Field(..., min_items=10)
    periods: int = Field(default=30, ge=1, le=MAX_FORECAST_PERIODS)
    frequency: Optional[str] = Field(default=None, pattern='^[DWMY]$')
    confidence_interval: float = Field(default=0.95, ge=0.5, le=0.99)
    include_components: bool = Field(default=False)


class ForecastOutput(BaseModel):
    timestamps: List[str]
    values: List[float]
    lower: List[float]
    upper: List[float]


class ForecastMetrics(BaseModel):
    mape: float
    rmse: float
    mae: float
    data_points: int
    model: str = 'prophet'


class ForecastError(BaseModel):
    code: str
    message: str


class ForecastResponse(BaseModel):
    success: bool
    forecast: Optional[ForecastOutput] = None
    metrics: Optional[ForecastMetrics] = None
    error: Optional[ForecastError] = None
    components: Optional[dict] = None


app = FastAPI(
    title='Jorvis Forecast Sidecar',
    version=SERVICE_VERSION,
    description='Time-series forecasting using Prophet',
)


@app.get('/health')
def health():
    return {
        'status': 'ok',
        'service_version': SERVICE_VERSION,
        'prophet_version': Prophet.__version__ if hasattr(Prophet, '__version__') else 'unknown',
        'min_data_points': MIN_DATA_POINTS,
        'max_forecast_periods': MAX_FORECAST_PERIODS,
        'timeout_seconds': TIMEOUT_SECONDS,
    }


@app.post('/forecast', response_model=ForecastResponse)
def forecast(request: ForecastRequest):
    # Validation
    if len(request.timestamps) != len(request.values):
        return ForecastResponse(
            success=False,
            error=ForecastError(
                code='LENGTH_MISMATCH',
                message='timestamps and values must have same length'
            )
        )

    if len(request.timestamps) < MIN_DATA_POINTS:
        return ForecastResponse(
            success=False,
            error=ForecastError(
                code='INSUFFICIENT_DATA',
                message=f'Need at least {MIN_DATA_POINTS} data points'
            )
        )

    try:
        # Prepare data for Prophet
        df = pd.DataFrame({
            'ds': pd.to_datetime(request.timestamps),
            'y': request.values
        })
        df = df.sort_values('ds').dropna()

        # Calculate baseline metrics (for MAPE on training data)
        train_mean = df['y'].mean()

        # Fit model
        model = Prophet(
            interval_width=request.confidence_interval,
            yearly_seasonality='auto',
            weekly_seasonality='auto',
            daily_seasonality='auto',
        )
        model.fit(df)

        # Generate forecast
        future = model.make_future_dataframe(
            periods=request.periods,
            freq=request.frequency or 'D'
        )
        prediction = model.predict(future)

        # Extract forecast (only future periods)
        forecast_df = prediction.tail(request.periods)

        # Calculate metrics on training data
        train_pred = prediction.head(len(df))
        mape = np.mean(np.abs((df['y'].values - train_pred['yhat'].values) / df['y'].values)) * 100
        rmse = np.sqrt(np.mean((df['y'].values - train_pred['yhat'].values) ** 2))
        mae = np.mean(np.abs(df['y'].values - train_pred['yhat'].values))

        response = ForecastResponse(
            success=True,
            forecast=ForecastOutput(
                timestamps=forecast_df['ds'].dt.strftime('%Y-%m-%d').tolist(),
                values=forecast_df['yhat'].tolist(),
                lower=forecast_df['yhat_lower'].tolist(),
                upper=forecast_df['yhat_upper'].tolist(),
            ),
            metrics=ForecastMetrics(
                mape=round(mape, 2),
                rmse=round(rmse, 2),
                mae=round(mae, 2),
                data_points=len(df),
            ),
        )

        if request.include_components:
            response.components = {
                'trend': prediction['trend'].tolist(),
                'weekly': prediction.get('weekly', pd.Series()).tolist(),
                'yearly': prediction.get('yearly', pd.Series()).tolist(),
            }

        return response

    except Exception as e:
        return ForecastResponse(
            success=False,
            error=ForecastError(
                code='MODEL_FIT_FAILED',
                message='Forecasting model failed to fit data'
            )
        )

5. Configuration

Environment Variables

VariableDefaultDescription
JORVIS_FORECAST_ENABLEDfalseFeature flag for ForecastNode
FORECAST_SIDECAR_URLhttp://localhost:8001URL of forecast-sidecar
FORECAST_TIMEOUT_MS30000Timeout for sidecar calls
FORECAST_MIN_DATA_POINTS10Minimum data points required
FORECAST_MAX_PERIODS365Maximum forecast horizon

Docker Compose Addition

# analytics-platform/deploy/docker-compose.yml
services:
  forecast-sidecar:
    build: ../forecast-sidecar
    ports:
      - "8001:8001"
    environment:
      - FORECAST_MIN_DATA_POINTS=10
      - FORECAST_MAX_PERIODS=365
      - FORECAST_TIMEOUT_SECONDS=30
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
      interval: 30s
      timeout: 10s
      retries: 3

Implementation Plan

Phase 4.1 Tasks (MVP)

TaskDescriptionEstimate
4.1.1Create forecast-sidecar directory structureS
4.1.2Implement /forecast and /health endpointsM
4.1.3Write sidecar tests (pytest)M
4.1.4Implement ForecastNode in NestJSM
4.1.5Update SplitterNode with forecast intent detectionS
4.1.6Extend VisualPayload typesS
4.1.7Register ForecastNode in orchestratorS
4.1.8Add Docker Compose configurationS
4.1.9E2E test with AdventureWorks time-series queryM
4.1.10Documentation updateS

S = Small (< 2 hours), M = Medium (2-4 hours)

Dependencies

  • None for MVP (pure additive, feature-flagged)
  • Phase 4.2 (DB-Native ML) depends on Multi-Dialect support

DoD / Evidence

  1. forecast-sidecar passes all pytest tests
  2. ForecastNode unit tests pass
  3. E2E test: "Forecast sales for next 30 days" with AdventureWorks data produces valid visualization
  4. Feature flag JORVIS_FORECAST_ENABLED=false by default (opt-in)
  5. CI pipeline includes sidecar build/test
  6. Release-gate PASS

Risks

RiskMitigation
Prophet installation issues (cmdstan)Use pre-built Docker image with Prophet
Latency > 5s for large datasetsTimeout + async fallback to SynthesisNode
Memory usage on large time seriesLimit input size (10K points max)
False positive forecast intentConservative keyword matching

Future Enhancements (Phase 4.2+)

4.2.1 DB-Native ML Strategy

BigQuery ML ARIMA_PLUS, Snowflake Cortex for enterprise-scale forecasting.

  • Requires: Multi-Dialect support (Phase 3)
  • Benefit: Native integration, no data movement

4.2.2 Multi-variate Forecasting

Include external regressors (holidays, promotions, weather).

  • Prophet supports add_regressor() for external factors
  • Requires: Additional data sources in query

4.2.3 Anomaly Prediction

Confidence-based alerts when values fall outside prediction bounds.

  • Use yhat_lower / yhat_upper for anomaly detection
  • Integration with DataQualityNode

4.2.4 Model Caching

Reuse trained models for similar queries.

  • Cache key: hash of (timestamps, values, frequency)
  • TTL: 1 hour (configurable)
  • Storage: Redis or in-memory LRU

4.2.5 Streaming Forecasts

Real-time updates via SSE for long-running forecasts.

  • Benefit: Better UX for large datasets
  • Requires: SSE infrastructure in NestJS

4.2.6 Trend Deviation Alerts (from a-Gemini)

Concept: Jorvis proactively monitors actual values against forecasts and sends alerts when significant deviations occur.

Architecture:

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│  Scheduler      │────▶│  AlertChecker   │────▶│  Notification   │
│  (cron)         │     │  Service        │     │  Service        │
└─────────────────┘     └────────┬────────┘     └────────┬────────┘
                                 │                       │
                                 ▼                       ▼
                        ┌─────────────────┐     ┌─────────────────┐
                        │  ForecastNode   │     │  Slack/Email/   │
                        │  (get forecast) │     │  Webhook        │
                        └─────────────────┘     └─────────────────┘

Use Case:

"Attention! Sales for yesterday (€45,000) are 15% below forecast. Expected €53,000."

Implementation Sketch:

// analytics-platform/src/alerts/trend-deviation-checker.service.ts

@Injectable()
export class TrendDeviationCheckerService {
  constructor(
    private readonly forecastService: ForecastService,
    private readonly dataService: DataService,
    private readonly notificationService: NotificationService,
  ) {}

  @Cron('0 8 * * *') // Daily at 8:00 AM
  async checkDeviations(): Promise<void> {
    const alerts = await this.configService.get<AlertConfig[]>('TREND_ALERTS');

    for (const alert of alerts) {
      const actual = await this.dataService.getLatestValue(alert.metric);
      const forecast = await this.forecastService.getForecastForDate(
        alert.metric,
        new Date()
      );

      const deviation = Math.abs((actual - forecast.value) / forecast.value);

      if (deviation > alert.threshold) {
        await this.notificationService.send({
          channel: alert.channel, // 'slack', 'email', 'webhook'
          message: this.formatAlert(alert, actual, forecast, deviation),
          severity: deviation > alert.criticalThreshold ? 'critical' : 'warning',
        });
      }
    }
  }
}

Configuration:

{
  "TREND_ALERTS": [
    {
      "metric": "daily_sales",
      "query": "SELECT date, SUM(amount) FROM sales GROUP BY date",
      "threshold": 0.10,
      "criticalThreshold": 0.25,
      "channel": "slack",
      "webhookUrl": "https://hooks.slack.com/..."
    }
  ]
}

Dependencies:

  • ForecastNode (Phase 4.1) — must be implemented first
  • Notification Service — new service for alerts
  • Scheduler — NestJS @nestjs/schedule
  • Alert Configuration — JSON or database-backed

Risks:

  • Alert fatigue (too many notifications)
  • False positives during holidays/weekends
  • Timezone handling for global teams

Mitigation:

  • Configurable thresholds per metric
  • Holiday calendar integration
  • Alert cooldown periods

References