123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- # Copyright (c) 2017-present, Facebook, Inc.
- # All rights reserved.
- #
- # This source code is licensed under the BSD-style license found in the
- # LICENSE file in the root directory of this source tree. An additional grant
- # of patent rights can be found in the PATENTS file in the same directory.
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- from __future__ import unicode_literals
- import itertools
- import numpy as np
- import pandas as pd
- # fb-block 1 start
- import os
- from unittest import TestCase
- from fbprophet import Prophet
- from fbprophet import diagnostics
- DATA_all = pd.read_csv(
- os.path.join(os.path.dirname(__file__), 'data.csv'), parse_dates=['ds']
- )
- DATA = DATA_all.head(100)
- # fb-block 1 end
- # fb-block 2
- class TestDiagnostics(TestCase):
- def __init__(self, *args, **kwargs):
- super(TestDiagnostics, self).__init__(*args, **kwargs)
- # Use first 100 record in data.csv
- self.__df = DATA
- def test_simulated_historical_forecasts(self):
- m = Prophet()
- m.fit(self.__df)
- k = 2
- for p in [1, 10]:
- for h in [1, 3]:
- period = '{} days'.format(p)
- horizon = '{} days'.format(h)
- df_shf = diagnostics.simulated_historical_forecasts(
- m, horizon=horizon, k=k, period=period)
- # All cutoff dates should be less than ds dates
- self.assertTrue((df_shf['cutoff'] < df_shf['ds']).all())
- # The unique size of output cutoff should be equal to 'k'
- self.assertEqual(len(np.unique(df_shf['cutoff'])), k)
- self.assertEqual(
- max(df_shf['ds'] - df_shf['cutoff']),
- pd.Timedelta(horizon),
- )
- dc = df_shf['cutoff'].diff()
- dc = dc[dc > pd.Timedelta(0)].min()
- self.assertTrue(dc >= pd.Timedelta(period))
- # Each y in df_shf and self.__df with same ds should be equal
- df_merged = pd.merge(df_shf, self.__df, 'left', on='ds')
- self.assertAlmostEqual(
- np.sum((df_merged['y_x'] - df_merged['y_y']) ** 2), 0.0)
- def test_simulated_historical_forecasts_logistic(self):
- m = Prophet(growth='logistic')
- df = self.__df.copy()
- df['cap'] = 40
- m.fit(df)
- df_shf = diagnostics.simulated_historical_forecasts(
- m, horizon='3 days', k=2, period='3 days')
- # All cutoff dates should be less than ds dates
- self.assertTrue((df_shf['cutoff'] < df_shf['ds']).all())
- # The unique size of output cutoff should be equal to 'k'
- self.assertEqual(len(np.unique(df_shf['cutoff'])), 2)
- # Each y in df_shf and self.__df with same ds should be equal
- df_merged = pd.merge(df_shf, df, 'left', on='ds')
- self.assertAlmostEqual(
- np.sum((df_merged['y_x'] - df_merged['y_y']) ** 2), 0.0)
- def test_simulated_historical_forecasts_extra_regressors(self):
- m = Prophet()
- m.add_seasonality(name='monthly', period=30.5, fourier_order=5)
- m.add_regressor('extra')
- df = self.__df.copy()
- df['cap'] = 40
- df['extra'] = range(df.shape[0])
- m.fit(df)
- df_shf = diagnostics.simulated_historical_forecasts(
- m, horizon='3 days', k=2, period='3 days')
- # All cutoff dates should be less than ds dates
- self.assertTrue((df_shf['cutoff'] < df_shf['ds']).all())
- # The unique size of output cutoff should be equal to 'k'
- self.assertEqual(len(np.unique(df_shf['cutoff'])), 2)
- # Each y in df_shf and self.__df with same ds should be equal
- df_merged = pd.merge(df_shf, df, 'left', on='ds')
- self.assertAlmostEqual(
- np.sum((df_merged['y_x'] - df_merged['y_y']) ** 2), 0.0)
- def test_simulated_historical_forecasts_default_value_check(self):
- m = Prophet()
- m.fit(self.__df)
- # Default value of period should be equal to 0.5 * horizon
- df_shf1 = diagnostics.simulated_historical_forecasts(
- m, horizon='10 days', k=1)
- df_shf2 = diagnostics.simulated_historical_forecasts(
- m, horizon='10 days', k=1, period='5 days')
- self.assertAlmostEqual(
- ((df_shf1['y'] - df_shf2['y']) ** 2).sum(), 0.0)
- self.assertAlmostEqual(
- ((df_shf1['yhat'] - df_shf2['yhat']) ** 2).sum(), 0.0)
- def test_cross_validation(self):
- m = Prophet()
- m.fit(self.__df)
- # Calculate the number of cutoff points(k)
- horizon = pd.Timedelta('4 days')
- period = pd.Timedelta('10 days')
- k = 5
- df_cv = diagnostics.cross_validation(
- m, horizon='4 days', period='10 days', initial='90 days')
- # The unique size of output cutoff should be equal to 'k'
- self.assertEqual(len(np.unique(df_cv['cutoff'])), k)
- self.assertEqual(max(df_cv['ds'] - df_cv['cutoff']), horizon)
- dc = df_cv['cutoff'].diff()
- dc = dc[dc > pd.Timedelta(0)].min()
- self.assertTrue(dc >= period)
- def test_cross_validation_default_value_check(self):
- m = Prophet()
- m.fit(self.__df)
- # Default value of initial should be equal to 3 * horizon
- df_cv1 = diagnostics.cross_validation(
- m, horizon='32 days', period='10 days')
- df_cv2 = diagnostics.cross_validation(
- m, horizon='32 days', period='10 days', initial='96 days')
- self.assertAlmostEqual(
- ((df_cv1['y'] - df_cv2['y']) ** 2).sum(), 0.0)
- self.assertAlmostEqual(
- ((df_cv1['yhat'] - df_cv2['yhat']) ** 2).sum(), 0.0)
- def test_performance_metrics(self):
- m = Prophet()
- m.fit(self.__df)
- df_cv = diagnostics.cross_validation(
- m, horizon='4 days', period='10 days', initial='90 days')
- # Aggregation level none
- df_none = diagnostics.performance_metrics(df_cv, rolling_window=0)
- self.assertEqual(
- set(df_none.columns),
- {'horizon', 'coverage', 'mae', 'mape', 'mse', 'rmse'},
- )
- self.assertEqual(df_none.shape[0], 14)
- # Aggregation level 0.2
- df_horizon = diagnostics.performance_metrics(df_cv, rolling_window=0.2)
- self.assertEqual(len(df_horizon['horizon'].unique()), 4)
- self.assertEqual(df_horizon.shape[0], 13)
- # Aggregation level all
- df_all = diagnostics.performance_metrics(df_cv, rolling_window=1)
- self.assertEqual(df_all.shape[0], 1)
- for metric in ['mse', 'mape', 'mae', 'coverage']:
- self.assertEqual(df_all[metric].values[0], df_none[metric].mean())
- # Custom list of metrics
- df_horizon = diagnostics.performance_metrics(
- df_cv, metrics=['coverage', 'mse'],
- )
- self.assertEqual(
- set(df_horizon.columns),
- {'coverage', 'mse', 'horizon'},
- )
- def test_copy(self):
- df = DATA_all.copy()
- df['cap'] = 200.
- df['binary_feature'] = [0] * 255 + [1] * 255
- # These values are created except for its default values
- holiday = pd.DataFrame(
- {'ds': pd.to_datetime(['2016-12-25']), 'holiday': ['x']})
- products = itertools.product(
- ['linear', 'logistic'], # growth
- [None, pd.to_datetime(['2016-12-25'])], # changepoints
- [3], # n_changepoints
- [0.9], # changepoint_range
- [True, False], # yearly_seasonality
- [True, False], # weekly_seasonality
- [True, False], # daily_seasonality
- [None, holiday], # holidays
- ['additive', 'multiplicative'], # seasonality_mode
- [1.1], # seasonality_prior_scale
- [1.1], # holidays_prior_scale
- [0.1], # changepoint_prior_scale
- [100], # mcmc_samples
- [0.9], # interval_width
- [200] # uncertainty_samples
- )
- # Values should be copied correctly
- for product in products:
- m1 = Prophet(*product)
- m1.history = m1.setup_dataframe(
- df.copy(), initialize_scales=True)
- m1.set_auto_seasonalities()
- m2 = diagnostics.prophet_copy(m1)
- self.assertEqual(m1.growth, m2.growth)
- self.assertEqual(m1.n_changepoints, m2.n_changepoints)
- self.assertEqual(m1.changepoint_range, m2.changepoint_range)
- self.assertEqual(m1.changepoints, m2.changepoints)
- self.assertEqual(False, m2.yearly_seasonality)
- self.assertEqual(False, m2.weekly_seasonality)
- self.assertEqual(False, m2.daily_seasonality)
- self.assertEqual(
- m1.yearly_seasonality, 'yearly' in m2.seasonalities)
- self.assertEqual(
- m1.weekly_seasonality, 'weekly' in m2.seasonalities)
- self.assertEqual(
- m1.daily_seasonality, 'daily' in m2.seasonalities)
- if m1.holidays is None:
- self.assertEqual(m1.holidays, m2.holidays)
- else:
- self.assertTrue((m1.holidays == m2.holidays).values.all())
- self.assertEqual(m1.seasonality_mode, m2.seasonality_mode)
- self.assertEqual(m1.seasonality_prior_scale, m2.seasonality_prior_scale)
- self.assertEqual(m1.changepoint_prior_scale, m2.changepoint_prior_scale)
- self.assertEqual(m1.holidays_prior_scale, m2.holidays_prior_scale)
- self.assertEqual(m1.mcmc_samples, m2.mcmc_samples)
- self.assertEqual(m1.interval_width, m2.interval_width)
- self.assertEqual(m1.uncertainty_samples, m2.uncertainty_samples)
- # Check for cutoff and custom seasonality and extra regressors
- changepoints = pd.date_range('2012-06-15', '2012-09-15')
- cutoff = pd.Timestamp('2012-07-25')
- m1 = Prophet(changepoints=changepoints)
- m1.add_seasonality('custom', 10, 5)
- m1.add_regressor('binary_feature')
- m1.fit(df)
- m2 = diagnostics.prophet_copy(m1, cutoff=cutoff)
- changepoints = changepoints[changepoints <= cutoff]
- self.assertTrue((changepoints == m2.changepoints).all())
- self.assertTrue('custom' in m2.seasonalities)
- self.assertTrue('binary_feature' in m2.extra_regressors)
|