rkovashikawa's picture
Initial Hugging Face Spaces deployment
48abd32
"""
Data Processing and Validation Module
This module provides comprehensive data processing capabilities for TimesFM,
including CSV loading, covariate preparation, and data validation.
Key Features:
- CSV data loading with flexible column configuration
- Automatic data type inference and conversion
- Covariates data preparation and validation
- Data structure formatting for TimesFM input requirements
- Support for dynamic and static, numerical and categorical covariates
"""
import pandas as pd
import numpy as np
import json
import logging
from typing import Dict, List, Union, Optional, Tuple, Any
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class DataProcessor:
"""
Handles data loading, processing, and validation for TimesFM forecasting.
This class provides methods to load CSV data, process covariates according
to TimesFM requirements, and validate data structures before forecasting.
Example:
>>> processor = DataProcessor()
>>> data = processor.load_csv_data("data.csv", data_definition)
>>> inputs, covariates = processor.prepare_forecast_data(
... data, context_len=100, horizon_len=24
... )
"""
def __init__(self):
"""Initialize the DataProcessor."""
self.data = None
self.data_definition = None
self.processed_data = None
def load_csv_data(
self,
csv_file_path: str,
data_definition: Union[str, Dict[str, str]]
) -> pd.DataFrame:
"""
Load CSV data with proper column type conversion based on data definition.
Args:
csv_file_path: Path to the CSV file
data_definition: Either JSON file path or dictionary defining column types
Returns:
Loaded and processed DataFrame
Raises:
FileNotFoundError: If CSV or JSON file not found
ValueError: If data definition is invalid
"""
logger.info(f"Loading CSV data from: {csv_file_path}")
# Load data definition
if isinstance(data_definition, str):
with open(data_definition, 'r') as f:
self.data_definition = json.load(f)
else:
self.data_definition = data_definition.copy()
logger.info(f"Data definition: {self.data_definition}")
# Load CSV
try:
self.data = pd.read_csv(csv_file_path).dropna(axis=0)
logger.info(f"Loaded CSV with shape: {self.data.shape}")
logger.info(f"Columns: {list(self.data.columns)}")
except FileNotFoundError:
raise FileNotFoundError(f"CSV file not found: {csv_file_path}")
# Validate that 'date' column exists
if 'date' not in self.data.columns:
raise ValueError("CSV file must contain a 'date' column as the first column")
# Convert date column
self.data['date'] = pd.to_datetime(self.data['date'])
logger.info(f"Date range: {self.data['date'].min()} to {self.data['date'].max()}")
# Apply data type conversions based on definition
self._apply_data_types()
# Validate data definition
self._validate_data_definition()
logger.info("✅ CSV data loaded and processed successfully")
return self.data.copy()
def _apply_data_types(self) -> None:
"""Apply appropriate data types based on the data definition."""
logger.info("Applying data type conversions...")
for column, data_type in self.data_definition.items():
if column not in self.data.columns:
logger.warning(f"Column '{column}' in data definition not found in CSV")
continue
try:
if data_type in ['target', 'dynamic_numerical', 'static_numerical']:
# Convert to float
self.data[column] = pd.to_numeric(self.data[column], errors='coerce')
logger.info(f"Converted '{column}' to numerical (float)")
elif data_type in ['dynamic_categorical', 'static_categorical']:
# Convert to string
self.data[column] = self.data[column].astype(str)
logger.info(f"Converted '{column}' to categorical (string)")
else:
logger.warning(f"Unknown data type '{data_type}' for column '{column}'")
except Exception as e:
logger.error(f"Failed to convert column '{column}': {str(e)}")
raise
def _validate_data_definition(self) -> None:
"""Validate the data definition against the loaded data."""
logger.info("Validating data definition...")
# Check for required data types
target_columns = [col for col, dtype in self.data_definition.items() if dtype == 'target']
if not target_columns:
raise ValueError("Data definition must contain at least one 'target' column")
if len(target_columns) > 1:
logger.warning(f"Multiple target columns found: {target_columns}. Using first one for univariate forecasting.")
# Validate column existence
missing_columns = set(self.data_definition.keys()) - set(self.data.columns)
if missing_columns:
raise ValueError(f"Columns defined in data_definition but missing from CSV: {missing_columns}")
# Check for data quality issues
for column in target_columns:
if self.data[column].isnull().any():
null_count = self.data[column].isnull().sum()
logger.warning(f"Target column '{column}' has {null_count} null values")
logger.info("✅ Data definition validation passed")
def prepare_forecast_data(
self,
data: pd.DataFrame,
context_len: int,
horizon_len: int,
target_column: Optional[str] = None
) -> Tuple[List[float], Dict[str, Any]]:
"""
Prepare data for TimesFM forecasting with covariates.
Args:
data: Input DataFrame
context_len: Length of context window for forecasting
horizon_len: Length of forecast horizon
target_column: Target column name (auto-detected if None)
Returns:
Tuple of (target_inputs, covariates_dict)
Raises:
ValueError: If insufficient data or invalid configuration
"""
logger.info(f"Preparing forecast data (context: {context_len}, horizon: {horizon_len})")
# Auto-detect target column if not specified
if target_column is None:
target_columns = [col for col, dtype in self.data_definition.items() if dtype == 'target']
if not target_columns:
raise ValueError("No target column found in data definition")
target_column = target_columns[0]
logger.info(f"Using target column: {target_column}")
# Validate data length - only need context_len for the data
if len(data) < context_len:
raise ValueError(f"Insufficient data: need {context_len} points, have {len(data)}")
# Prepare target inputs using the most recent context window
target_series = data[target_column].values
context_start = max(0, len(data) - context_len)
context_end = len(data) # Use last context_len periods
target_inputs = target_series[context_start:context_end].tolist()
logger.info(f"Target data preparation:")
logger.info(f" - Target column: {target_column}")
logger.info(f" - Context start index: {context_start}")
logger.info(f" - Context end index: {context_end}")
logger.info(f" - Target inputs length: {len(target_inputs)}")
logger.info(f" - Target range: {min(target_inputs):.2f} - {max(target_inputs):.2f}")
# Prepare covariates
covariates = self._prepare_covariates(data, context_len, horizon_len)
logger.info(f"✅ Prepared forecast data:")
logger.info(f" Target inputs length: {len(target_inputs)}")
logger.info(f" Target range: {min(target_inputs):.2f} - {max(target_inputs):.2f}")
logger.info(f" Covariates: {list(covariates.keys())}")
return target_inputs, covariates
def _prepare_covariates(
self,
data: pd.DataFrame,
context_len: int,
horizon_len: int
) -> Dict[str, Dict[str, List]]:
"""
Prepare covariates data structure for TimesFM.
Args:
data: Input DataFrame
context_len: Context window length
horizon_len: Forecast horizon length
Returns:
Dictionary containing organized covariates
"""
covariates = {
'dynamic_numerical_covariates': {},
'dynamic_categorical_covariates': {},
'static_numerical_covariates': {},
'static_categorical_covariates': {}
}
# For dynamic covariates, we need context_len + horizon_len total periods
# Context period: last context_len periods of available data
# Horizon period: horizon_len periods (padded with last known values)
total_len = context_len + horizon_len
logger.info(f"Covariate preparation debug:")
logger.info(f" - Data length: {len(data)}")
logger.info(f" - Context length: {context_len}")
logger.info(f" - Horizon length: {horizon_len}")
logger.info(f" - Total periods needed: {total_len}")
logger.info(f" - Data date range: {data['date'].min()} to {data['date'].max()}")
logger.info(f" - Context period: last {context_len} periods of data")
logger.info(f" - Horizon period: {horizon_len} periods (padded with last known values)")
for column, data_type in self.data_definition.items():
if column == 'date' or data_type == 'target':
continue
if data_type == 'dynamic_numerical':
# Dynamic numerical: need context + horizon values
# Context: last context_len periods of available data
# Horizon: horizon_len periods (padded with last known value)
if len(data) < context_len:
logger.warning(f"Insufficient data for dynamic covariate '{column}': need {context_len} for context, have {len(data)}")
continue
# Get context values (last context_len periods)
context_values = data[column].iloc[-context_len:].tolist()
# Get horizon values (pad with last known value)
last_value = context_values[-1]
horizon_values = [last_value] * horizon_len
# Combine context + horizon
values = context_values + horizon_values
covariates['dynamic_numerical_covariates'][column] = [values]
logger.info(f"Added dynamic numerical covariate '{column}': {len(values)} values")
logger.info(f" - Context period: {len(context_values)} values (last {context_len} periods)")
logger.info(f" - Horizon period: {len(horizon_values)} values (padded with {last_value})")
elif data_type == 'dynamic_categorical':
# Dynamic categorical: need context + horizon values
# Context: last context_len periods of available data
# Horizon: horizon_len periods (padded with last known value)
if len(data) < context_len:
logger.warning(f"Insufficient data for dynamic covariate '{column}': need {context_len} for context, have {len(data)}")
continue
# Get context values (last context_len periods)
context_values = data[column].astype(str).iloc[-context_len:].tolist()
# Get horizon values (pad with last known value)
last_value = context_values[-1]
horizon_values = [last_value] * horizon_len
# Combine context + horizon
values = context_values + horizon_values
covariates['dynamic_categorical_covariates'][column] = [values]
logger.info(f"Added dynamic categorical covariate '{column}': {len(values)} values")
logger.info(f" - Context period: {len(context_values)} values (last {context_len} periods)")
logger.info(f" - Horizon period: {len(horizon_values)} values (padded with '{last_value}')")
elif data_type == 'static_numerical':
# Static numerical: single value per time series
value = float(data[column].iloc[0])
covariates['static_numerical_covariates'][column] = [value]
logger.info(f"Added static numerical covariate '{column}': {value}")
elif data_type == 'static_categorical':
# Static categorical: single value per time series
value = str(data[column].iloc[0])
covariates['static_categorical_covariates'][column] = [value]
logger.info(f"Added static categorical covariate '{column}': {value}")
# Remove empty covariate types
covariates = {k: v for k, v in covariates.items() if v}
return covariates
def get_data_summary(self) -> Dict[str, Any]:
"""
Get a summary of the loaded data.
Returns:
Dictionary containing data summary statistics
"""
if self.data is None:
return {"status": "No data loaded"}
summary = {
"status": "loaded",
"shape": self.data.shape,
"date_range": {
"start": str(self.data['date'].min().date()),
"end": str(self.data['date'].max().date()),
"total_periods": len(self.data)
},
"columns": list(self.data.columns),
"data_definition": self.data_definition
}
# Add column-specific statistics
column_stats = {}
for column in self.data.columns:
if column == 'date':
continue
col_data = self.data[column]
data_type = self.data_definition.get(column, 'unknown')
if data_type in ['target', 'dynamic_numerical', 'static_numerical']:
column_stats[column] = {
"type": data_type,
"dtype": str(col_data.dtype),
"min": float(col_data.min()) if not col_data.isnull().all() else None,
"max": float(col_data.max()) if not col_data.isnull().all() else None,
"mean": float(col_data.mean()) if not col_data.isnull().all() else None,
"null_count": int(col_data.isnull().sum())
}
else:
column_stats[column] = {
"type": data_type,
"dtype": str(col_data.dtype),
"unique_values": int(col_data.nunique()),
"null_count": int(col_data.isnull().sum()),
"sample_values": col_data.dropna().unique()[:5].tolist()
}
summary["column_statistics"] = column_stats
return summary
def validate_forecast_inputs(
self,
inputs: List[float],
covariates: Dict[str, Any],
context_len: int,
horizon_len: int
) -> bool:
"""
Validate that forecast inputs are properly formatted for TimesFM.
Args:
inputs: Target time series inputs
covariates: Covariates dictionary
context_len: Expected context length
horizon_len: Expected horizon length
Returns:
True if validation passes
Raises:
ValueError: If validation fails
"""
logger.info("Validating forecast inputs...")
# Validate inputs length
if len(inputs) != context_len:
raise ValueError(f"Input length {len(inputs)} doesn't match context_len {context_len}")
# Validate inputs are numeric
if not all(isinstance(x, (int, float)) and not np.isnan(x) for x in inputs):
raise ValueError("All inputs must be numeric and non-NaN")
# Validate covariates structure
total_len = context_len + horizon_len
for cov_type, cov_dict in covariates.items():
if cov_type in ['dynamic_numerical_covariates', 'dynamic_categorical_covariates']:
for name, values_list in cov_dict.items():
if len(values_list) != 1:
raise ValueError(f"Dynamic covariate '{name}' must have exactly 1 time series")
if len(values_list[0]) != total_len:
raise ValueError(f"Dynamic covariate '{name}' must have {total_len} values, got {len(values_list[0])}")
elif cov_type in ['static_numerical_covariates', 'static_categorical_covariates']:
for name, values_list in cov_dict.items():
if len(values_list) != 1:
raise ValueError(f"Static covariate '{name}' must have exactly 1 value")
logger.info("✅ Forecast inputs validation passed")
return True
def create_sample_data_definition(self, output_path: str) -> None:
"""
Create a sample data definition JSON file.
Args:
output_path: Path where to save the sample JSON file
"""
sample_definition = {
"btc_price": "target",
"eth_price": "dynamic_numerical",
"vix_index": "dynamic_numerical",
"sp500_price": "dynamic_numerical",
"quarter": "dynamic_categorical",
"asset_category": "static_categorical",
"base_price": "static_numerical"
}
with open(output_path, 'w') as f:
json.dump(sample_definition, f, indent=2)
logger.info(f"Sample data definition saved to: {output_path}")
print(f"Sample data definition structure:")
print(json.dumps(sample_definition, indent=2))
def prepare_visualization_data(
processed_data: pd.DataFrame,
target_inputs: Union[List[float], List[List[float]]],
target_column: str,
context_len: int,
horizon_len: int,
extended_data: Optional[pd.DataFrame] = None
) -> Dict[str, Any]:
"""
Centralized function to prepare visualization data from processed data.
This function creates the visualization data structure used by both
the webapp and notebook for consistent data handling.
Args:
processed_data: Processed DataFrame with date column
target_inputs: Target input data used for forecasting (flattenable to a single series)
target_column: Name of the target column
context_len: Context length used for forecasting
horizon_len: Horizon length for forecasting
Returns:
Dictionary containing visualization data with keys:
- 'historical_data': Context window used for forecasting (chronologically ordered)
- 'dates_historical': Corresponding historical dates
- 'dates_future': Future dates aligned with the forecast horizon
- 'target_name': Name of the target column
- 'actual_future': Optional actual values for the forecast horizon (if available)
"""
if processed_data.empty:
return {
'historical_data': [],
'dates_historical': [],
'dates_future': [],
'target_name': target_column,
'actual_future': []
}
# Work on a chronologically sorted copy to ensure alignment
df = processed_data.dropna(axis=0).sort_values('date').reset_index(drop=True)
# Flatten target inputs (they may arrive as List[List[float]] or List[float])
if isinstance(target_inputs, (list, tuple)) and target_inputs:
if isinstance(target_inputs[0], (list, tuple, np.ndarray)):
target_inputs_flat = list(target_inputs[0])
else:
target_inputs_flat = list(target_inputs)
else:
target_inputs_flat = []
# Respect the actual context length used
context_len_effective = len(target_inputs_flat) or context_len
available_len = len(df)
# Use target_inputs as historical data to ensure exact alignment with forecasting
# This guarantees that the historical data in visualization matches what was used for forecasting
if target_inputs_flat:
historical_slice = list(map(float, target_inputs_flat))
# For dates, we need to find the corresponding dates for the target_inputs
# Since target_inputs represents the last context_len periods used for forecasting,
# we need to find the dates that correspond to those exact data points
if len(df) >= context_len_effective:
# Get the dates for the last context_len periods (same as target_inputs)
dates_historical = df['date'].iloc[-context_len_effective:].tolist()
else:
# If we don't have enough data, use what we have
dates_historical = df['date'].tolist()
logger.info(f"Using target_inputs for historical data to ensure forecasting alignment")
else:
# Fallback to data-based extraction if target_inputs not available
if len(df) >= context_len_effective:
historical_slice = df[target_column].iloc[-context_len_effective:].astype(float).tolist()
dates_historical = df['date'].iloc[-context_len_effective:].tolist()
else:
historical_slice = df[target_column].astype(float).tolist()
dates_historical = df['date'].tolist()
logger.info(f"Using data-based extraction for historical data")
logger.info(f"Visualization data preparation:")
logger.info(f" - Processed data shape: {df.shape}")
logger.info(f" - Target column: {target_column}")
logger.info(f" - Context length effective: {context_len_effective}")
logger.info(f" - Historical slice length: {len(historical_slice)}")
logger.info(f" - Target inputs flat length: {len(target_inputs_flat)}")
logger.info(f" - Dates historical length: {len(dates_historical)}")
logger.info(f" - Historical data range: {min(historical_slice) if historical_slice else 'N/A'} to {max(historical_slice) if historical_slice else 'N/A'}")
if dates_historical:
logger.info(f" - First historical date: {dates_historical[0]}")
logger.info(f" - Last historical date: {dates_historical[-1]}")
# For future dates, we need to generate them since we only have context data
# Extract actual future values when present (useful for overlaying actuals)
# The actual future values should start from the day after the last historical date
# Use extended_data if available (includes horizon period), otherwise use df
data_for_future_extraction = extended_data if extended_data is not None else df
if len(data_for_future_extraction) > context_len_effective and dates_historical:
# Find the last historical date (this is the context end date)
last_historical_date = dates_historical[-1]
# Find data points that come after the last historical date
future_mask = data_for_future_extraction['date'] > last_historical_date
future_data = data_for_future_extraction[future_mask]
if len(future_data) > 0:
# Take only the first horizon_len periods of future data
future_slice = future_data[target_column].iloc[:horizon_len].astype(float).tolist()
dates_future = future_data['date'].iloc[:horizon_len].tolist()
logger.info(f"Actual future values extracted:")
logger.info(f" - Data for extraction length: {len(data_for_future_extraction)}")
logger.info(f" - Context length effective: {context_len_effective}")
logger.info(f" - Last historical date (context end): {last_historical_date}")
logger.info(f" - Future data available: {len(future_data)} periods")
logger.info(f" - Future slice length: {len(future_slice)}")
logger.info(f" - Future dates length: {len(dates_future)}")
if future_slice and dates_future:
logger.info(f" - Future values range: {min(future_slice):.4f} to {max(future_slice):.4f}")
logger.info(f" - First future date: {dates_future[0]}")
logger.info(f" - Last future date: {dates_future[-1]}")
else:
future_slice = []
dates_future = []
logger.info("No actual future values available - no data after last historical date")
else:
# No actual future values available
future_slice = []
dates_future = []
logger.info("No actual future values available - data doesn't extend beyond context period")
if len(dates_future) < horizon_len:
# Generate future dates if the dataset stops at the forecast boundary
inferred_delta: Optional[pd.Timedelta] = None
if len(dates_historical) >= 2:
inferred_delta = dates_historical[-1] - dates_historical[-2]
last_date = dates_historical[-1]
if hasattr(last_date, 'to_pydatetime'):
last_date = last_date.to_pydatetime()
elif isinstance(last_date, np.datetime64):
last_date = pd.to_datetime(last_date).to_pydatetime()
step = inferred_delta if isinstance(inferred_delta, pd.Timedelta) and inferred_delta != pd.Timedelta(0) else timedelta(days=1)
dates_future = [last_date + step * (i + 1) for i in range(horizon_len)]
future_slice = [] # No actual future data in this case
visualization_data = {
'historical_data': historical_slice,
'dates_historical': [d.isoformat() if hasattr(d, 'isoformat') else str(d) for d in dates_historical],
'dates_future': [d.isoformat() if hasattr(d, 'isoformat') else str(d) for d in dates_future],
'target_name': target_column,
'actual_future': future_slice
}
return visualization_data