Spaces:
Sleeping
Sleeping
File size: 27,138 Bytes
48abd32 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 |
"""
Data Processing and Validation Module
This module provides comprehensive data processing capabilities for TimesFM,
including CSV loading, covariate preparation, and data validation.
Key Features:
- CSV data loading with flexible column configuration
- Automatic data type inference and conversion
- Covariates data preparation and validation
- Data structure formatting for TimesFM input requirements
- Support for dynamic and static, numerical and categorical covariates
"""
import pandas as pd
import numpy as np
import json
import logging
from typing import Dict, List, Union, Optional, Tuple, Any
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class DataProcessor:
"""
Handles data loading, processing, and validation for TimesFM forecasting.
This class provides methods to load CSV data, process covariates according
to TimesFM requirements, and validate data structures before forecasting.
Example:
>>> processor = DataProcessor()
>>> data = processor.load_csv_data("data.csv", data_definition)
>>> inputs, covariates = processor.prepare_forecast_data(
... data, context_len=100, horizon_len=24
... )
"""
def __init__(self):
"""Initialize the DataProcessor."""
self.data = None
self.data_definition = None
self.processed_data = None
def load_csv_data(
self,
csv_file_path: str,
data_definition: Union[str, Dict[str, str]]
) -> pd.DataFrame:
"""
Load CSV data with proper column type conversion based on data definition.
Args:
csv_file_path: Path to the CSV file
data_definition: Either JSON file path or dictionary defining column types
Returns:
Loaded and processed DataFrame
Raises:
FileNotFoundError: If CSV or JSON file not found
ValueError: If data definition is invalid
"""
logger.info(f"Loading CSV data from: {csv_file_path}")
# Load data definition
if isinstance(data_definition, str):
with open(data_definition, 'r') as f:
self.data_definition = json.load(f)
else:
self.data_definition = data_definition.copy()
logger.info(f"Data definition: {self.data_definition}")
# Load CSV
try:
self.data = pd.read_csv(csv_file_path).dropna(axis=0)
logger.info(f"Loaded CSV with shape: {self.data.shape}")
logger.info(f"Columns: {list(self.data.columns)}")
except FileNotFoundError:
raise FileNotFoundError(f"CSV file not found: {csv_file_path}")
# Validate that 'date' column exists
if 'date' not in self.data.columns:
raise ValueError("CSV file must contain a 'date' column as the first column")
# Convert date column
self.data['date'] = pd.to_datetime(self.data['date'])
logger.info(f"Date range: {self.data['date'].min()} to {self.data['date'].max()}")
# Apply data type conversions based on definition
self._apply_data_types()
# Validate data definition
self._validate_data_definition()
logger.info("β
CSV data loaded and processed successfully")
return self.data.copy()
def _apply_data_types(self) -> None:
"""Apply appropriate data types based on the data definition."""
logger.info("Applying data type conversions...")
for column, data_type in self.data_definition.items():
if column not in self.data.columns:
logger.warning(f"Column '{column}' in data definition not found in CSV")
continue
try:
if data_type in ['target', 'dynamic_numerical', 'static_numerical']:
# Convert to float
self.data[column] = pd.to_numeric(self.data[column], errors='coerce')
logger.info(f"Converted '{column}' to numerical (float)")
elif data_type in ['dynamic_categorical', 'static_categorical']:
# Convert to string
self.data[column] = self.data[column].astype(str)
logger.info(f"Converted '{column}' to categorical (string)")
else:
logger.warning(f"Unknown data type '{data_type}' for column '{column}'")
except Exception as e:
logger.error(f"Failed to convert column '{column}': {str(e)}")
raise
def _validate_data_definition(self) -> None:
"""Validate the data definition against the loaded data."""
logger.info("Validating data definition...")
# Check for required data types
target_columns = [col for col, dtype in self.data_definition.items() if dtype == 'target']
if not target_columns:
raise ValueError("Data definition must contain at least one 'target' column")
if len(target_columns) > 1:
logger.warning(f"Multiple target columns found: {target_columns}. Using first one for univariate forecasting.")
# Validate column existence
missing_columns = set(self.data_definition.keys()) - set(self.data.columns)
if missing_columns:
raise ValueError(f"Columns defined in data_definition but missing from CSV: {missing_columns}")
# Check for data quality issues
for column in target_columns:
if self.data[column].isnull().any():
null_count = self.data[column].isnull().sum()
logger.warning(f"Target column '{column}' has {null_count} null values")
logger.info("β
Data definition validation passed")
def prepare_forecast_data(
self,
data: pd.DataFrame,
context_len: int,
horizon_len: int,
target_column: Optional[str] = None
) -> Tuple[List[float], Dict[str, Any]]:
"""
Prepare data for TimesFM forecasting with covariates.
Args:
data: Input DataFrame
context_len: Length of context window for forecasting
horizon_len: Length of forecast horizon
target_column: Target column name (auto-detected if None)
Returns:
Tuple of (target_inputs, covariates_dict)
Raises:
ValueError: If insufficient data or invalid configuration
"""
logger.info(f"Preparing forecast data (context: {context_len}, horizon: {horizon_len})")
# Auto-detect target column if not specified
if target_column is None:
target_columns = [col for col, dtype in self.data_definition.items() if dtype == 'target']
if not target_columns:
raise ValueError("No target column found in data definition")
target_column = target_columns[0]
logger.info(f"Using target column: {target_column}")
# Validate data length - only need context_len for the data
if len(data) < context_len:
raise ValueError(f"Insufficient data: need {context_len} points, have {len(data)}")
# Prepare target inputs using the most recent context window
target_series = data[target_column].values
context_start = max(0, len(data) - context_len)
context_end = len(data) # Use last context_len periods
target_inputs = target_series[context_start:context_end].tolist()
logger.info(f"Target data preparation:")
logger.info(f" - Target column: {target_column}")
logger.info(f" - Context start index: {context_start}")
logger.info(f" - Context end index: {context_end}")
logger.info(f" - Target inputs length: {len(target_inputs)}")
logger.info(f" - Target range: {min(target_inputs):.2f} - {max(target_inputs):.2f}")
# Prepare covariates
covariates = self._prepare_covariates(data, context_len, horizon_len)
logger.info(f"β
Prepared forecast data:")
logger.info(f" Target inputs length: {len(target_inputs)}")
logger.info(f" Target range: {min(target_inputs):.2f} - {max(target_inputs):.2f}")
logger.info(f" Covariates: {list(covariates.keys())}")
return target_inputs, covariates
def _prepare_covariates(
self,
data: pd.DataFrame,
context_len: int,
horizon_len: int
) -> Dict[str, Dict[str, List]]:
"""
Prepare covariates data structure for TimesFM.
Args:
data: Input DataFrame
context_len: Context window length
horizon_len: Forecast horizon length
Returns:
Dictionary containing organized covariates
"""
covariates = {
'dynamic_numerical_covariates': {},
'dynamic_categorical_covariates': {},
'static_numerical_covariates': {},
'static_categorical_covariates': {}
}
# For dynamic covariates, we need context_len + horizon_len total periods
# Context period: last context_len periods of available data
# Horizon period: horizon_len periods (padded with last known values)
total_len = context_len + horizon_len
logger.info(f"Covariate preparation debug:")
logger.info(f" - Data length: {len(data)}")
logger.info(f" - Context length: {context_len}")
logger.info(f" - Horizon length: {horizon_len}")
logger.info(f" - Total periods needed: {total_len}")
logger.info(f" - Data date range: {data['date'].min()} to {data['date'].max()}")
logger.info(f" - Context period: last {context_len} periods of data")
logger.info(f" - Horizon period: {horizon_len} periods (padded with last known values)")
for column, data_type in self.data_definition.items():
if column == 'date' or data_type == 'target':
continue
if data_type == 'dynamic_numerical':
# Dynamic numerical: need context + horizon values
# Context: last context_len periods of available data
# Horizon: horizon_len periods (padded with last known value)
if len(data) < context_len:
logger.warning(f"Insufficient data for dynamic covariate '{column}': need {context_len} for context, have {len(data)}")
continue
# Get context values (last context_len periods)
context_values = data[column].iloc[-context_len:].tolist()
# Get horizon values (pad with last known value)
last_value = context_values[-1]
horizon_values = [last_value] * horizon_len
# Combine context + horizon
values = context_values + horizon_values
covariates['dynamic_numerical_covariates'][column] = [values]
logger.info(f"Added dynamic numerical covariate '{column}': {len(values)} values")
logger.info(f" - Context period: {len(context_values)} values (last {context_len} periods)")
logger.info(f" - Horizon period: {len(horizon_values)} values (padded with {last_value})")
elif data_type == 'dynamic_categorical':
# Dynamic categorical: need context + horizon values
# Context: last context_len periods of available data
# Horizon: horizon_len periods (padded with last known value)
if len(data) < context_len:
logger.warning(f"Insufficient data for dynamic covariate '{column}': need {context_len} for context, have {len(data)}")
continue
# Get context values (last context_len periods)
context_values = data[column].astype(str).iloc[-context_len:].tolist()
# Get horizon values (pad with last known value)
last_value = context_values[-1]
horizon_values = [last_value] * horizon_len
# Combine context + horizon
values = context_values + horizon_values
covariates['dynamic_categorical_covariates'][column] = [values]
logger.info(f"Added dynamic categorical covariate '{column}': {len(values)} values")
logger.info(f" - Context period: {len(context_values)} values (last {context_len} periods)")
logger.info(f" - Horizon period: {len(horizon_values)} values (padded with '{last_value}')")
elif data_type == 'static_numerical':
# Static numerical: single value per time series
value = float(data[column].iloc[0])
covariates['static_numerical_covariates'][column] = [value]
logger.info(f"Added static numerical covariate '{column}': {value}")
elif data_type == 'static_categorical':
# Static categorical: single value per time series
value = str(data[column].iloc[0])
covariates['static_categorical_covariates'][column] = [value]
logger.info(f"Added static categorical covariate '{column}': {value}")
# Remove empty covariate types
covariates = {k: v for k, v in covariates.items() if v}
return covariates
def get_data_summary(self) -> Dict[str, Any]:
"""
Get a summary of the loaded data.
Returns:
Dictionary containing data summary statistics
"""
if self.data is None:
return {"status": "No data loaded"}
summary = {
"status": "loaded",
"shape": self.data.shape,
"date_range": {
"start": str(self.data['date'].min().date()),
"end": str(self.data['date'].max().date()),
"total_periods": len(self.data)
},
"columns": list(self.data.columns),
"data_definition": self.data_definition
}
# Add column-specific statistics
column_stats = {}
for column in self.data.columns:
if column == 'date':
continue
col_data = self.data[column]
data_type = self.data_definition.get(column, 'unknown')
if data_type in ['target', 'dynamic_numerical', 'static_numerical']:
column_stats[column] = {
"type": data_type,
"dtype": str(col_data.dtype),
"min": float(col_data.min()) if not col_data.isnull().all() else None,
"max": float(col_data.max()) if not col_data.isnull().all() else None,
"mean": float(col_data.mean()) if not col_data.isnull().all() else None,
"null_count": int(col_data.isnull().sum())
}
else:
column_stats[column] = {
"type": data_type,
"dtype": str(col_data.dtype),
"unique_values": int(col_data.nunique()),
"null_count": int(col_data.isnull().sum()),
"sample_values": col_data.dropna().unique()[:5].tolist()
}
summary["column_statistics"] = column_stats
return summary
def validate_forecast_inputs(
self,
inputs: List[float],
covariates: Dict[str, Any],
context_len: int,
horizon_len: int
) -> bool:
"""
Validate that forecast inputs are properly formatted for TimesFM.
Args:
inputs: Target time series inputs
covariates: Covariates dictionary
context_len: Expected context length
horizon_len: Expected horizon length
Returns:
True if validation passes
Raises:
ValueError: If validation fails
"""
logger.info("Validating forecast inputs...")
# Validate inputs length
if len(inputs) != context_len:
raise ValueError(f"Input length {len(inputs)} doesn't match context_len {context_len}")
# Validate inputs are numeric
if not all(isinstance(x, (int, float)) and not np.isnan(x) for x in inputs):
raise ValueError("All inputs must be numeric and non-NaN")
# Validate covariates structure
total_len = context_len + horizon_len
for cov_type, cov_dict in covariates.items():
if cov_type in ['dynamic_numerical_covariates', 'dynamic_categorical_covariates']:
for name, values_list in cov_dict.items():
if len(values_list) != 1:
raise ValueError(f"Dynamic covariate '{name}' must have exactly 1 time series")
if len(values_list[0]) != total_len:
raise ValueError(f"Dynamic covariate '{name}' must have {total_len} values, got {len(values_list[0])}")
elif cov_type in ['static_numerical_covariates', 'static_categorical_covariates']:
for name, values_list in cov_dict.items():
if len(values_list) != 1:
raise ValueError(f"Static covariate '{name}' must have exactly 1 value")
logger.info("β
Forecast inputs validation passed")
return True
def create_sample_data_definition(self, output_path: str) -> None:
"""
Create a sample data definition JSON file.
Args:
output_path: Path where to save the sample JSON file
"""
sample_definition = {
"btc_price": "target",
"eth_price": "dynamic_numerical",
"vix_index": "dynamic_numerical",
"sp500_price": "dynamic_numerical",
"quarter": "dynamic_categorical",
"asset_category": "static_categorical",
"base_price": "static_numerical"
}
with open(output_path, 'w') as f:
json.dump(sample_definition, f, indent=2)
logger.info(f"Sample data definition saved to: {output_path}")
print(f"Sample data definition structure:")
print(json.dumps(sample_definition, indent=2))
def prepare_visualization_data(
processed_data: pd.DataFrame,
target_inputs: Union[List[float], List[List[float]]],
target_column: str,
context_len: int,
horizon_len: int,
extended_data: Optional[pd.DataFrame] = None
) -> Dict[str, Any]:
"""
Centralized function to prepare visualization data from processed data.
This function creates the visualization data structure used by both
the webapp and notebook for consistent data handling.
Args:
processed_data: Processed DataFrame with date column
target_inputs: Target input data used for forecasting (flattenable to a single series)
target_column: Name of the target column
context_len: Context length used for forecasting
horizon_len: Horizon length for forecasting
Returns:
Dictionary containing visualization data with keys:
- 'historical_data': Context window used for forecasting (chronologically ordered)
- 'dates_historical': Corresponding historical dates
- 'dates_future': Future dates aligned with the forecast horizon
- 'target_name': Name of the target column
- 'actual_future': Optional actual values for the forecast horizon (if available)
"""
if processed_data.empty:
return {
'historical_data': [],
'dates_historical': [],
'dates_future': [],
'target_name': target_column,
'actual_future': []
}
# Work on a chronologically sorted copy to ensure alignment
df = processed_data.dropna(axis=0).sort_values('date').reset_index(drop=True)
# Flatten target inputs (they may arrive as List[List[float]] or List[float])
if isinstance(target_inputs, (list, tuple)) and target_inputs:
if isinstance(target_inputs[0], (list, tuple, np.ndarray)):
target_inputs_flat = list(target_inputs[0])
else:
target_inputs_flat = list(target_inputs)
else:
target_inputs_flat = []
# Respect the actual context length used
context_len_effective = len(target_inputs_flat) or context_len
available_len = len(df)
# Use target_inputs as historical data to ensure exact alignment with forecasting
# This guarantees that the historical data in visualization matches what was used for forecasting
if target_inputs_flat:
historical_slice = list(map(float, target_inputs_flat))
# For dates, we need to find the corresponding dates for the target_inputs
# Since target_inputs represents the last context_len periods used for forecasting,
# we need to find the dates that correspond to those exact data points
if len(df) >= context_len_effective:
# Get the dates for the last context_len periods (same as target_inputs)
dates_historical = df['date'].iloc[-context_len_effective:].tolist()
else:
# If we don't have enough data, use what we have
dates_historical = df['date'].tolist()
logger.info(f"Using target_inputs for historical data to ensure forecasting alignment")
else:
# Fallback to data-based extraction if target_inputs not available
if len(df) >= context_len_effective:
historical_slice = df[target_column].iloc[-context_len_effective:].astype(float).tolist()
dates_historical = df['date'].iloc[-context_len_effective:].tolist()
else:
historical_slice = df[target_column].astype(float).tolist()
dates_historical = df['date'].tolist()
logger.info(f"Using data-based extraction for historical data")
logger.info(f"Visualization data preparation:")
logger.info(f" - Processed data shape: {df.shape}")
logger.info(f" - Target column: {target_column}")
logger.info(f" - Context length effective: {context_len_effective}")
logger.info(f" - Historical slice length: {len(historical_slice)}")
logger.info(f" - Target inputs flat length: {len(target_inputs_flat)}")
logger.info(f" - Dates historical length: {len(dates_historical)}")
logger.info(f" - Historical data range: {min(historical_slice) if historical_slice else 'N/A'} to {max(historical_slice) if historical_slice else 'N/A'}")
if dates_historical:
logger.info(f" - First historical date: {dates_historical[0]}")
logger.info(f" - Last historical date: {dates_historical[-1]}")
# For future dates, we need to generate them since we only have context data
# Extract actual future values when present (useful for overlaying actuals)
# The actual future values should start from the day after the last historical date
# Use extended_data if available (includes horizon period), otherwise use df
data_for_future_extraction = extended_data if extended_data is not None else df
if len(data_for_future_extraction) > context_len_effective and dates_historical:
# Find the last historical date (this is the context end date)
last_historical_date = dates_historical[-1]
# Find data points that come after the last historical date
future_mask = data_for_future_extraction['date'] > last_historical_date
future_data = data_for_future_extraction[future_mask]
if len(future_data) > 0:
# Take only the first horizon_len periods of future data
future_slice = future_data[target_column].iloc[:horizon_len].astype(float).tolist()
dates_future = future_data['date'].iloc[:horizon_len].tolist()
logger.info(f"Actual future values extracted:")
logger.info(f" - Data for extraction length: {len(data_for_future_extraction)}")
logger.info(f" - Context length effective: {context_len_effective}")
logger.info(f" - Last historical date (context end): {last_historical_date}")
logger.info(f" - Future data available: {len(future_data)} periods")
logger.info(f" - Future slice length: {len(future_slice)}")
logger.info(f" - Future dates length: {len(dates_future)}")
if future_slice and dates_future:
logger.info(f" - Future values range: {min(future_slice):.4f} to {max(future_slice):.4f}")
logger.info(f" - First future date: {dates_future[0]}")
logger.info(f" - Last future date: {dates_future[-1]}")
else:
future_slice = []
dates_future = []
logger.info("No actual future values available - no data after last historical date")
else:
# No actual future values available
future_slice = []
dates_future = []
logger.info("No actual future values available - data doesn't extend beyond context period")
if len(dates_future) < horizon_len:
# Generate future dates if the dataset stops at the forecast boundary
inferred_delta: Optional[pd.Timedelta] = None
if len(dates_historical) >= 2:
inferred_delta = dates_historical[-1] - dates_historical[-2]
last_date = dates_historical[-1]
if hasattr(last_date, 'to_pydatetime'):
last_date = last_date.to_pydatetime()
elif isinstance(last_date, np.datetime64):
last_date = pd.to_datetime(last_date).to_pydatetime()
step = inferred_delta if isinstance(inferred_delta, pd.Timedelta) and inferred_delta != pd.Timedelta(0) else timedelta(days=1)
dates_future = [last_date + step * (i + 1) for i in range(horizon_len)]
future_slice = [] # No actual future data in this case
visualization_data = {
'historical_data': historical_slice,
'dates_historical': [d.isoformat() if hasattr(d, 'isoformat') else str(d) for d in dates_historical],
'dates_future': [d.isoformat() if hasattr(d, 'isoformat') else str(d) for d in dates_future],
'target_name': target_column,
'actual_future': future_slice
}
return visualization_data
|