ADR 0010: ForecastNode — Predictive Analytics Integration
ADR 0010: ForecastNode — Predictive Analytics Integration
Status
Proposed (Phase 4.1 MVP)
Context
Jorvis currently excels at descriptive analytics (answering "what happened?"). The natural evolution is predictive analytics (answering "what will happen?"). This capability is documented in:
ARCHITECTURE_TARGET_VISION.md§2.5 "Predictive Analytics"ROADMAP_v3_Graph.mdPhase 4 "Predict Lane"
Time-series forecasting is the most requested predictive capability for business analytics:
- Sales forecasting
- Demand prediction
- Trend analysis
- Anomaly prediction
Decision
Implement predictive analytics via a ForecastNode in the graph orchestration pipeline, using a Python Sidecar pattern (similar to sqlglot-sidecar) for the MVP phase.
Architecture: Python Sidecar (Phase 4.1 MVP)
┌─────────────────────────────────────────────────────────────┐
│ NestJS Backend │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │DataExecution │───▶│ DataQuality │───▶│ ForecastNode │ │
│ │ Node │ │ Node │ │ │ │
│ └──────────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │
│ HTTP POST /forecast│
│ │ │
└──────────────────────────────────────────────────┼──────────┘
▼
┌─────────────────────────────────────────────────────────────┐
│ forecast-sidecar (FastAPI) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Validator │───▶│ Prophet │───▶│ Formatter │ │
│ │ (Min Data) │ │ Engine │ │ (Response) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
Why Python Sidecar (not DB-Native ML)?
| Criterion | Python Sidecar | DB-Native (BigQuery ML) |
|---|---|---|
| Postgres support | ✅ Yes | ❌ No |
| Vendor lock-in | ✅ None | ⚠️ High |
| Prerequisites | None | Multi-Dialect support |
| Time to MVP | 1-2 weeks | 4-6 weeks |
| Existing pattern | ✅ sqlglot-sidecar | ❌ New |
Phase 4.2 will add DB-Native ML as an alternative strategy for BigQuery/Snowflake users.
Specification
1. forecast-sidecar API Contract
POST /forecast
Request:
{
"timestamps": ["2024-01-01", "2024-01-02", ...],
"values": [100, 105, 98, ...],
"periods": 30,
"frequency": "D",
"confidence_interval": 0.95,
"include_components": false
}
| Field | Type | Required | Description |
|---|---|---|---|
timestamps | string[] | Yes | ISO 8601 date strings |
values | number[] | Yes | Numeric time series values |
periods | int | Yes | Number of future periods to forecast |
frequency | enum | No | D (day), W (week), M (month), Y (year). Default: auto-detect |
confidence_interval | float | No | 0.8, 0.9, 0.95. Default: 0.95 |
include_components | bool | No | Include trend/seasonality breakdown. Default: false |
Response (Success):
{
"success": true,
"forecast": {
"timestamps": ["2024-02-01", "2024-02-02", ...],
"values": [112.5, 115.2, ...],
"lower": [105.1, 107.8, ...],
"upper": [119.9, 122.6, ...]
},
"metrics": {
"mape": 4.7,
"rmse": 12.3,
"mae": 9.8,
"data_points": 365,
"model": "prophet"
},
"components": null
}
Response (Error):
{
"success": false,
"error": {
"code": "INSUFFICIENT_DATA",
"message": "Need at least 10 data points for forecasting"
},
"forecast": null,
"metrics": null
}
Error Codes
| Code | Description |
|---|---|
INSUFFICIENT_DATA | Less than 10 data points |
INVALID_TIMESTAMPS | Timestamps not parseable or not monotonic |
INVALID_VALUES | Values contain non-numeric or all-null |
FREQUENCY_MISMATCH | Detected frequency doesn't match specified |
MODEL_FIT_FAILED | Prophet failed to fit (numerical issues) |
TIMEOUT | Forecast took longer than 30s |
GET /health
{
"status": "ok",
"fingerprint": "sha256:abc123...",
"service_version": "0.1.0",
"prophet_version": "1.1.5",
"min_data_points": 10,
"max_forecast_periods": 365,
"timeout_seconds": 30
}
2. ForecastNode Graph Integration
Location in Graph Flow
DataExecutionNode → DataQualityNode → [ForecastNode] → VisualizationNode → SynthesisNode
↑
(conditional)
ForecastNode activates when:
JORVIS_FORECAST_ENABLED=true- Query results contain time-series data (date column + numeric column)
- User question contains forecast intent keywords
Intent Detection (SplitterNode enhancement)
private hasForecastIntent(question: string): boolean {
const FORECAST_KEYWORDS = [
/\bforecast\b/i,
/\bpredict(ion)?\b/i,
/\btrend\b/i,
/\bnext\s+(week|month|quarter|year)\b/i,
/\bwill\s+(be|reach|hit)\b/i,
/\bproject(ion|ed)?\b/i,
/\bestimate\b/i,
/\bexpect(ed)?\b/i,
];
return FORECAST_KEYWORDS.some((re) => re.test(question));
}
ForecastNode Implementation
// analytics-platform/src/ai/graph/nodes/forecast.node.ts
@Injectable()
export class ForecastNode implements GraphNode {
private readonly logger = new Logger(ForecastNode.name);
private readonly enabled: boolean;
private readonly sidecarUrl: string;
private readonly timeoutMs: number;
constructor(
private readonly configService: ConfigService,
private readonly httpService: HttpService,
) {
this.enabled = configService.get('JORVIS_FORECAST_ENABLED') === 'true';
this.sidecarUrl = configService.get('FORECAST_SIDECAR_URL', 'http://localhost:8001');
this.timeoutMs = parseInt(configService.get('FORECAST_TIMEOUT_MS', '30000'));
}
async execute(state: GraphState): Promise<Partial<GraphState>> {
// 1. Feature flag check
if (!this.enabled) {
return { nextNode: 'VisualizationNode' };
}
// 2. Check if forecast was requested
const forecastRequested = state.metadata?.forecastIntent === true;
if (!forecastRequested) {
return { nextNode: 'VisualizationNode' };
}
// 3. Validate data suitability
const timeSeries = this.extractTimeSeries(state.queryResults);
if (!timeSeries) {
this.logger.warn('Data not suitable for forecasting');
return {
nextNode: 'VisualizationNode',
metadata: {
...state.metadata,
forecastSkipped: true,
forecastSkipReason: 'NOT_TIME_SERIES',
},
};
}
// 4. Call sidecar
const startTime = Date.now();
try {
const response = await this.callForecastSidecar(timeSeries);
const duration = Date.now() - startTime;
if (!response.success) {
this.logger.warn(`Forecast failed: ${response.error?.code}`);
return {
nextNode: 'VisualizationNode',
metadata: {
...state.metadata,
forecastAttempted: true,
forecastError: response.error?.code,
},
};
}
this.logger.log(
`Forecast generated: ${response.forecast.values.length} periods, ` +
`MAPE=${response.metrics.mape}%, duration=${duration}ms`
);
// 5. Enrich visualization payload
return {
nextNode: 'VisualizationNode',
visualizationPayload: this.enrichVisualization(
state.visualizationPayload,
response.forecast,
response.metrics
),
metadata: {
...state.metadata,
forecastGenerated: true,
forecastPeriods: response.forecast.values.length,
forecastMape: response.metrics.mape,
forecastDurationMs: duration,
},
};
} catch (error) {
this.logger.error(`Forecast sidecar error: ${(error as Error).message}`);
return {
nextNode: 'VisualizationNode',
metadata: {
...state.metadata,
forecastAttempted: true,
forecastError: 'SIDECAR_UNAVAILABLE',
},
};
}
}
private extractTimeSeries(
results: Array<Record<string, unknown>> | undefined
): TimeSeriesData | null {
if (!results || results.length < 10) return null;
const columns = Object.keys(results[0]);
const dateCol = columns.find((c) => this.isDateColumn(c, results));
const valueCol = columns.find((c) => this.isNumericColumn(c, results));
if (!dateCol || !valueCol) return null;
return {
timestamps: results.map((r) => String(r[dateCol])),
values: results.map((r) => Number(r[valueCol])),
};
}
private isDateColumn(col: string, results: Array<Record<string, unknown>>): boolean {
const sample = results[0][col];
if (!sample) return false;
const parsed = new Date(String(sample));
return !isNaN(parsed.getTime());
}
private isNumericColumn(col: string, results: Array<Record<string, unknown>>): boolean {
return results.every((r) => typeof r[col] === 'number' || !isNaN(Number(r[col])));
}
private async callForecastSidecar(data: TimeSeriesData): Promise<ForecastResponse> {
const response = await firstValueFrom(
this.httpService.post<ForecastResponse>(
`${this.sidecarUrl}/forecast`,
{
timestamps: data.timestamps,
values: data.values,
periods: 30,
confidence_interval: 0.95,
},
{ timeout: this.timeoutMs }
)
);
return response.data;
}
private enrichVisualization(
existing: any,
forecast: ForecastData,
metrics: ForecastMetrics
): any {
return {
...existing,
forecast: {
timestamps: forecast.timestamps,
values: forecast.values,
lower: forecast.lower,
upper: forecast.upper,
confidence: 0.95,
},
forecastMetrics: {
mape: metrics.mape,
model: metrics.model,
},
};
}
}
interface TimeSeriesData {
timestamps: string[];
values: number[];
}
interface ForecastData {
timestamps: string[];
values: number[];
lower: number[];
upper: number[];
}
interface ForecastMetrics {
mape: number;
rmse: number;
mae: number;
data_points: number;
model: string;
}
interface ForecastResponse {
success: boolean;
forecast: ForecastData | null;
metrics: ForecastMetrics | null;
error?: { code: string; message: string };
}
3. VisualPayload Extension
// Extend VisualPayload type
export interface VisualPayload {
// ... existing fields ...
forecast?: {
readonly timestamps: string[];
readonly values: number[];
readonly lower: number[];
readonly upper: number[];
readonly confidence: number;
};
forecastMetrics?: {
readonly mape: number;
readonly model: string;
};
}
4. forecast-sidecar Implementation
# analytics-platform/forecast-sidecar/main.py
import os
from typing import List, Optional
from fastapi import FastAPI
from pydantic import BaseModel, Field
from prophet import Prophet
import pandas as pd
import numpy as np
SERVICE_VERSION = '0.1.0'
MIN_DATA_POINTS = int(os.getenv('FORECAST_MIN_DATA_POINTS', '10'))
MAX_FORECAST_PERIODS = int(os.getenv('FORECAST_MAX_PERIODS', '365'))
TIMEOUT_SECONDS = int(os.getenv('FORECAST_TIMEOUT_SECONDS', '30'))
class ForecastRequest(BaseModel):
timestamps: List[str] = Field(..., min_items=10)
values: List[float] = Field(..., min_items=10)
periods: int = Field(default=30, ge=1, le=MAX_FORECAST_PERIODS)
frequency: Optional[str] = Field(default=None, pattern='^[DWMY]$')
confidence_interval: float = Field(default=0.95, ge=0.5, le=0.99)
include_components: bool = Field(default=False)
class ForecastOutput(BaseModel):
timestamps: List[str]
values: List[float]
lower: List[float]
upper: List[float]
class ForecastMetrics(BaseModel):
mape: float
rmse: float
mae: float
data_points: int
model: str = 'prophet'
class ForecastError(BaseModel):
code: str
message: str
class ForecastResponse(BaseModel):
success: bool
forecast: Optional[ForecastOutput] = None
metrics: Optional[ForecastMetrics] = None
error: Optional[ForecastError] = None
components: Optional[dict] = None
app = FastAPI(
title='Jorvis Forecast Sidecar',
version=SERVICE_VERSION,
description='Time-series forecasting using Prophet',
)
@app.get('/health')
def health():
return {
'status': 'ok',
'service_version': SERVICE_VERSION,
'prophet_version': Prophet.__version__ if hasattr(Prophet, '__version__') else 'unknown',
'min_data_points': MIN_DATA_POINTS,
'max_forecast_periods': MAX_FORECAST_PERIODS,
'timeout_seconds': TIMEOUT_SECONDS,
}
@app.post('/forecast', response_model=ForecastResponse)
def forecast(request: ForecastRequest):
# Validation
if len(request.timestamps) != len(request.values):
return ForecastResponse(
success=False,
error=ForecastError(
code='LENGTH_MISMATCH',
message='timestamps and values must have same length'
)
)
if len(request.timestamps) < MIN_DATA_POINTS:
return ForecastResponse(
success=False,
error=ForecastError(
code='INSUFFICIENT_DATA',
message=f'Need at least {MIN_DATA_POINTS} data points'
)
)
try:
# Prepare data for Prophet
df = pd.DataFrame({
'ds': pd.to_datetime(request.timestamps),
'y': request.values
})
df = df.sort_values('ds').dropna()
# Calculate baseline metrics (for MAPE on training data)
train_mean = df['y'].mean()
# Fit model
model = Prophet(
interval_width=request.confidence_interval,
yearly_seasonality='auto',
weekly_seasonality='auto',
daily_seasonality='auto',
)
model.fit(df)
# Generate forecast
future = model.make_future_dataframe(
periods=request.periods,
freq=request.frequency or 'D'
)
prediction = model.predict(future)
# Extract forecast (only future periods)
forecast_df = prediction.tail(request.periods)
# Calculate metrics on training data
train_pred = prediction.head(len(df))
mape = np.mean(np.abs((df['y'].values - train_pred['yhat'].values) / df['y'].values)) * 100
rmse = np.sqrt(np.mean((df['y'].values - train_pred['yhat'].values) ** 2))
mae = np.mean(np.abs(df['y'].values - train_pred['yhat'].values))
response = ForecastResponse(
success=True,
forecast=ForecastOutput(
timestamps=forecast_df['ds'].dt.strftime('%Y-%m-%d').tolist(),
values=forecast_df['yhat'].tolist(),
lower=forecast_df['yhat_lower'].tolist(),
upper=forecast_df['yhat_upper'].tolist(),
),
metrics=ForecastMetrics(
mape=round(mape, 2),
rmse=round(rmse, 2),
mae=round(mae, 2),
data_points=len(df),
),
)
if request.include_components:
response.components = {
'trend': prediction['trend'].tolist(),
'weekly': prediction.get('weekly', pd.Series()).tolist(),
'yearly': prediction.get('yearly', pd.Series()).tolist(),
}
return response
except Exception as e:
return ForecastResponse(
success=False,
error=ForecastError(
code='MODEL_FIT_FAILED',
message='Forecasting model failed to fit data'
)
)
5. Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
JORVIS_FORECAST_ENABLED | false | Feature flag for ForecastNode |
FORECAST_SIDECAR_URL | http://localhost:8001 | URL of forecast-sidecar |
FORECAST_TIMEOUT_MS | 30000 | Timeout for sidecar calls |
FORECAST_MIN_DATA_POINTS | 10 | Minimum data points required |
FORECAST_MAX_PERIODS | 365 | Maximum forecast horizon |
Docker Compose Addition
# analytics-platform/deploy/docker-compose.yml
services:
forecast-sidecar:
build: ../forecast-sidecar
ports:
- "8001:8001"
environment:
- FORECAST_MIN_DATA_POINTS=10
- FORECAST_MAX_PERIODS=365
- FORECAST_TIMEOUT_SECONDS=30
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 30s
timeout: 10s
retries: 3
Implementation Plan
Phase 4.1 Tasks (MVP)
| Task | Description | Estimate |
|---|---|---|
| 4.1.1 | Create forecast-sidecar directory structure | S |
| 4.1.2 | Implement /forecast and /health endpoints | M |
| 4.1.3 | Write sidecar tests (pytest) | M |
| 4.1.4 | Implement ForecastNode in NestJS | M |
| 4.1.5 | Update SplitterNode with forecast intent detection | S |
| 4.1.6 | Extend VisualPayload types | S |
| 4.1.7 | Register ForecastNode in orchestrator | S |
| 4.1.8 | Add Docker Compose configuration | S |
| 4.1.9 | E2E test with AdventureWorks time-series query | M |
| 4.1.10 | Documentation update | S |
S = Small (< 2 hours), M = Medium (2-4 hours)
Dependencies
- None for MVP (pure additive, feature-flagged)
- Phase 4.2 (DB-Native ML) depends on Multi-Dialect support
DoD / Evidence
forecast-sidecarpasses all pytest tests- ForecastNode unit tests pass
- E2E test: "Forecast sales for next 30 days" with AdventureWorks data produces valid visualization
- Feature flag
JORVIS_FORECAST_ENABLED=falseby default (opt-in) - CI pipeline includes sidecar build/test
- Release-gate PASS
Risks
| Risk | Mitigation |
|---|---|
| Prophet installation issues (cmdstan) | Use pre-built Docker image with Prophet |
| Latency > 5s for large datasets | Timeout + async fallback to SynthesisNode |
| Memory usage on large time series | Limit input size (10K points max) |
| False positive forecast intent | Conservative keyword matching |
Future Enhancements (Phase 4.2+)
4.2.1 DB-Native ML Strategy
BigQuery ML ARIMA_PLUS, Snowflake Cortex for enterprise-scale forecasting.
- Requires: Multi-Dialect support (Phase 3)
- Benefit: Native integration, no data movement
4.2.2 Multi-variate Forecasting
Include external regressors (holidays, promotions, weather).
- Prophet supports
add_regressor()for external factors - Requires: Additional data sources in query
4.2.3 Anomaly Prediction
Confidence-based alerts when values fall outside prediction bounds.
- Use
yhat_lower/yhat_upperfor anomaly detection - Integration with DataQualityNode
4.2.4 Model Caching
Reuse trained models for similar queries.
- Cache key: hash of (timestamps, values, frequency)
- TTL: 1 hour (configurable)
- Storage: Redis or in-memory LRU
4.2.5 Streaming Forecasts
Real-time updates via SSE for long-running forecasts.
- Benefit: Better UX for large datasets
- Requires: SSE infrastructure in NestJS
4.2.6 Trend Deviation Alerts (from a-Gemini)
Concept: Jorvis proactively monitors actual values against forecasts and sends alerts when significant deviations occur.
Architecture:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Scheduler │────▶│ AlertChecker │────▶│ Notification │
│ (cron) │ │ Service │ │ Service │
└─────────────────┘ └────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ ForecastNode │ │ Slack/Email/ │
│ (get forecast) │ │ Webhook │
└─────────────────┘ └─────────────────┘
Use Case:
"Attention! Sales for yesterday (€45,000) are 15% below forecast. Expected €53,000."
Implementation Sketch:
// analytics-platform/src/alerts/trend-deviation-checker.service.ts
@Injectable()
export class TrendDeviationCheckerService {
constructor(
private readonly forecastService: ForecastService,
private readonly dataService: DataService,
private readonly notificationService: NotificationService,
) {}
@Cron('0 8 * * *') // Daily at 8:00 AM
async checkDeviations(): Promise<void> {
const alerts = await this.configService.get<AlertConfig[]>('TREND_ALERTS');
for (const alert of alerts) {
const actual = await this.dataService.getLatestValue(alert.metric);
const forecast = await this.forecastService.getForecastForDate(
alert.metric,
new Date()
);
const deviation = Math.abs((actual - forecast.value) / forecast.value);
if (deviation > alert.threshold) {
await this.notificationService.send({
channel: alert.channel, // 'slack', 'email', 'webhook'
message: this.formatAlert(alert, actual, forecast, deviation),
severity: deviation > alert.criticalThreshold ? 'critical' : 'warning',
});
}
}
}
}
Configuration:
{
"TREND_ALERTS": [
{
"metric": "daily_sales",
"query": "SELECT date, SUM(amount) FROM sales GROUP BY date",
"threshold": 0.10,
"criticalThreshold": 0.25,
"channel": "slack",
"webhookUrl": "https://hooks.slack.com/..."
}
]
}
Dependencies:
- ForecastNode (Phase 4.1) — must be implemented first
- Notification Service — new service for alerts
- Scheduler — NestJS
@nestjs/schedule - Alert Configuration — JSON or database-backed
Risks:
- Alert fatigue (too many notifications)
- False positives during holidays/weekends
- Timezone handling for global teams
Mitigation:
- Configurable thresholds per metric
- Holiday calendar integration
- Alert cooldown periods
References
- Prophet Documentation
- BigQuery ML ARIMA_PLUS
- Snowflake Cortex ML Forecasting
ARCHITECTURE_TARGET_VISION.md§2.5ROADMAP_v3_Graph.mdPhase 4