123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287 |
- # Quandl for financial analysis, pandas and numpy for data manipulation
- # fbprophet for additive models, #pytrends for Google trend data
- import quandl
- import pandas as pd
- import numpy as np
- import fbprophet
- import pytrends
- from pytrends.request import TrendReq
- # matplotlib pyplot for plotting
- import matplotlib.pyplot as plt
- import matplotlib
- # Class for analyzing and (attempting) to predict future prices
- # Contains a number of visualizations and analysis methods
- class Stocker:
- # Initialization requires a ticker symbol
- def __init__(self, ticker, exchange="WIKI"):
- # Enforce capitalization
- ticker = ticker.upper()
- # Symbol is used for labeling plots
- self.symbol = ticker
- # Use Personal Api Key
- # quandl.ApiConfig.api_key = 'YourKeyHere'
- # Retrieval the financial data
- try:
- stock = quandl.get("%s/%s" % (exchange, ticker))
- except Exception as e:
- print("Error Retrieving Data.")
- print(e)
- return
- # Set the index to a column called Date
- stock = stock.reset_index(level=0)
- # Columns required for prophet
- stock["ds"] = stock["Date"]
- if "Adj. Close" not in stock.columns:
- stock["Adj. Close"] = stock["Close"]
- stock["Adj. Open"] = stock["Open"]
- stock["y"] = stock["Adj. Close"]
- stock["Daily Change"] = stock["Adj. Close"] - stock["Adj. Open"]
- # Data assigned as class attribute
- self.stock = stock.copy()
- # Minimum and maximum date in range
- self.min_date = min(stock["Date"])
- self.max_date = max(stock["Date"])
- # Find max and min prices and dates on which they occurred
- self.max_price = np.max(self.stock["y"])
- self.min_price = np.min(self.stock["y"])
- self.min_price_date = self.stock[self.stock["y"] == self.min_price]["Date"]
- self.min_price_date = self.min_price_date[self.min_price_date.index[0]]
- self.max_price_date = self.stock[self.stock["y"] == self.max_price]["Date"]
- self.max_price_date = self.max_price_date[self.max_price_date.index[0]]
- # The starting price (starting with the opening price)
- self.starting_price = float(self.stock.loc[0, "Adj. Open"])
- # The most recent price
- self.most_recent_price = float(self.stock.loc[self.stock.index[-1], "y"])
- # Whether or not to round dates
- self.round_dates = True
- # Number of years of data to train on
- self.training_years = 3
- # Prophet parameters
- # Default prior from library
- self.changepoint_prior_scale = 0.05
- self.weekly_seasonality = False
- self.daily_seasonality = False
- self.monthly_seasonality = True
- self.yearly_seasonality = True
- self.changepoints = None
- print(
- "{} Stocker Initialized. Data covers {} to {}.".format(
- self.symbol, self.min_date, self.max_date
- )
- )
- """
- Make sure start and end dates are in the range and can be
- converted to pandas datetimes. Returns dates in the correct format
- """
- def handle_dates(self, start_date, end_date):
- # Default start and end date are the beginning and end of data
- if start_date is None:
- start_date = self.min_date
- if end_date is None:
- end_date = self.max_date
- try:
- # Convert to pandas datetime for indexing dataframe
- start_date = pd.to_datetime(start_date)
- end_date = pd.to_datetime(end_date)
- except Exception as e:
- print("Enter valid pandas date format.")
- print(e)
- return
- valid_start = False
- valid_end = False
- # User will continue to enter dates until valid dates are met
- while (not valid_start) & (not valid_end):
- valid_end = True
- valid_start = True
- if end_date < start_date:
- print("End Date must be later than start date.")
- start_date = pd.to_datetime(input("Enter a new start date: "))
- end_date = pd.to_datetime(input("Enter a new end date: "))
- valid_end = False
- valid_start = False
- else:
- if end_date > self.max_date:
- print("End Date exceeds data range")
- end_date = pd.to_datetime(input("Enter a new end date: "))
- valid_end = False
- if start_date < self.min_date:
- print("Start Date is before date range")
- start_date = pd.to_datetime(input("Enter a new start date: "))
- valid_start = False
- return start_date, end_date
- """
- Return the dataframe trimmed to the specified range.
- """
- def make_df(self, start_date, end_date, df=None):
- # Default is to use the object stock data
- if not df:
- df = self.stock.copy()
- start_date, end_date = self.handle_dates(start_date, end_date)
- # keep track of whether the start and end dates are in the data
- start_in = True
- end_in = True
- # If user wants to round dates (default behavior)
- if self.round_dates:
- # Record if start and end date are in df
- if start_date not in list(df["Date"]):
- start_in = False
- if end_date not in list(df["Date"]):
- end_in = False
- # If both are not in dataframe, round both
- if (not end_in) & (not start_in):
- trim_df = df[(df["Date"] >= start_date) & (df["Date"] <= end_date)]
- else:
- # If both are in dataframe, round neither
- if (end_in) & (start_in):
- trim_df = df[(df["Date"] >= start_date) & (df["Date"] <= end_date)]
- else:
- # If only start is missing, round start
- if not start_in:
- trim_df = df[
- (df["Date"] > start_date) & (df["Date"] <= end_date)
- ]
- # If only end is imssing round end
- elif not end_in:
- trim_df = df[
- (df["Date"] >= start_date) & (df["Date"] < end_date)
- ]
- else:
- valid_start = False
- valid_end = False
- while (not valid_start) & (not valid_end):
- start_date, end_date = self.handle_dates(start_date, end_date)
- # No round dates, if either data not in, print message and return
- if start_date in list(df["Date"]):
- valid_start = True
- if end_date in list(df["Date"]):
- valid_end = True
- # Check to make sure dates are in the data
- if start_date not in list(df["Date"]):
- print(
- "Start Date not in data (either out of range or not a trading day.)"
- )
- start_date = pd.to_datetime(
- input(prompt="Enter a new start date: ")
- )
- elif end_date not in list(df["Date"]):
- print(
- "End Date not in data (either out of range or not a trading day.)"
- )
- end_date = pd.to_datetime(input(prompt="Enter a new end date: "))
- # Dates are not rounded
- trim_df = df[(df["Date"] >= start_date) & (df["Date"] <= end_date.date)]
- return trim_df
- # Basic Historical Plots and Basic Statistics
- def plot_stock(
- self, start_date=None, end_date=None, stats=["Adj. Close"], plot_type="basic"
- ):
- self.reset_plot()
- if start_date is None:
- start_date = self.min_date
- if end_date is None:
- end_date = self.max_date
- stock_plot = self.make_df(start_date, end_date)
- colors = ["r", "b", "g", "y", "c", "m"]
- for i, stat in enumerate(stats):
- stat_min = min(stock_plot[stat])
- stat_max = max(stock_plot[stat])
- stat_avg = np.mean(stock_plot[stat])
- date_stat_min = stock_plot[stock_plot[stat] == stat_min]["Date"]
- date_stat_min = date_stat_min[date_stat_min.index[0]]
- date_stat_max = stock_plot[stock_plot[stat] == stat_max]["Date"]
- date_stat_max = date_stat_max[date_stat_max.index[0]]
- print("Maximum {} = {:.2f} on {}.".format(stat, stat_max, date_stat_max))
- print("Minimum {} = {:.2f} on {}.".format(stat, stat_min, date_stat_min))
- print(
- "Current {} = {:.2f} on {}.\n".format(
- stat, self.stock.loc[self.stock.index[-1], stat], self.max_date
- )
- )
- # Percentage y-axis
- if plot_type == "pct":
- # Simple Plot
- plt.style.use("fivethirtyeight")
- if stat == "Daily Change":
- plt.plot(
- stock_plot["Date"],
- 100 * stock_plot[stat],
- color=colors[i],
- linewidth=2.4,
- alpha=0.9,
- label=stat,
- )
- else:
- plt.plot(
- stock_plot["Date"],
- 100 * (stock_plot[stat] - stat_avg) / stat_avg,
- color=colors[i],
- linewidth=2.4,
- alpha=0.9,
- label=stat,
- )
- plt.xlabel("Date")
- plt.ylabel("Change Relative to Average (%)")
- plt.title("%s Stock History" % self.symbol)
- plt.legend(prop={"size": 10})
- plt.grid(color="k", alpha=0.4)
- # Stat y-axis
- elif plot_type == "basic":
- plt.style.use("fivethirtyeight")
- plt.plot(
- stock_plot["Date"],
- stock_plot[stat],
- color=colors[i],
- linewidth=3,
- label=stat,
- alpha=0.8,
- )
- plt.xlabel("Date")
- plt.ylabel("US $")
- plt.title("%s Stock History" % self.symbol)
- plt.legend(prop={"size": 10})
- plt.grid(color="k", alpha=0.4)
- plt.show()
- # Reset the plotting parameters to clear style formatting
- # Not sure if this should be a static method
- @staticmethod
- def reset_plot():
- # Restore default parameters
- matplotlib.rcdefaults()
- # Adjust a few parameters to liking
- matplotlib.rcParams["figure.figsize"] = (8, 5)
- matplotlib.rcParams["axes.labelsize"] = 10
- matplotlib.rcParams["xtick.labelsize"] = 8
- matplotlib.rcParams["ytick.labelsize"] = 8
- matplotlib.rcParams["axes.titlesize"] = 14
- matplotlib.rcParams["text.color"] = "k"
- # Method to linearly interpolate prices on the weekends
- def resample(self, dataframe):
- # Change the index and resample at daily level
- dataframe = dataframe.set_index("ds")
- dataframe = dataframe.resample("D")
- # Reset the index and interpolate nan values
- dataframe = dataframe.reset_index(level=0)
- dataframe = dataframe.interpolate()
- return dataframe
- # Remove weekends from a dataframe
- def remove_weekends(self, dataframe):
- # Reset index to use ix
- dataframe = dataframe.reset_index(drop=True)
- weekends = []
- # Find all of the weekends
- for i, date in enumerate(dataframe["ds"]):
- if (date.weekday()) == 5 or (date.weekday() == 6):
- weekends.append(i)
- # Drop the weekends
- dataframe = dataframe.drop(weekends, axis=0)
- return dataframe
- # Calculate and plot profit from buying and holding shares for specified date range
- def buy_and_hold(self, start_date=None, end_date=None, nshares=1):
- self.reset_plot()
- start_date, end_date = self.handle_dates(start_date, end_date)
- # Find starting and ending price of stock
- start_price = float(self.stock[self.stock["Date"] == start_date]["Adj. Open"])
- end_price = float(self.stock[self.stock["Date"] == end_date]["Adj. Close"])
- # Make a profit dataframe and calculate profit column
- profits = self.make_df(start_date, end_date)
- profits["hold_profit"] = nshares * (profits["Adj. Close"] - start_price)
- # Total profit
- total_hold_profit = nshares * (end_price - start_price)
- print(
- "{} Total buy and hold profit from {} to {} for {} shares = ${:.2f}".format(
- self.symbol, start_date, end_date, nshares, total_hold_profit
- )
- )
- # Plot the total profits
- plt.style.use("dark_background")
- # Location for number of profit
- text_location = end_date - pd.DateOffset(months=1)
- # Plot the profits over time
- plt.plot(profits["Date"], profits["hold_profit"], "b", linewidth=3)
- plt.ylabel("Profit ($)")
- plt.xlabel("Date")
- plt.title(
- "Buy and Hold Profits for {} {} to {}".format(
- self.symbol, start_date, end_date
- )
- )
- # Display final value on graph
- plt.text(
- x=text_location,
- y=total_hold_profit + (total_hold_profit / 40),
- s="$%d" % total_hold_profit,
- color="g" if total_hold_profit > 0 else "r",
- size=14,
- )
- plt.grid(alpha=0.2)
- plt.show()
- # Create a prophet model without training
- def create_model(self):
- # Make the model
- model = fbprophet.Prophet(
- daily_seasonality=self.daily_seasonality,
- weekly_seasonality=self.weekly_seasonality,
- yearly_seasonality=self.yearly_seasonality,
- changepoint_prior_scale=self.changepoint_prior_scale,
- changepoints=self.changepoints,
- )
- if self.monthly_seasonality:
- # Add monthly seasonality
- model.add_seasonality(name="monthly", period=30.5, fourier_order=5)
- return model
- # Graph the effects of altering the changepoint prior scale (cps)
- def changepoint_prior_analysis(
- self,
- changepoint_priors=[0.001, 0.05, 0.1, 0.2],
- colors=["b", "r", "grey", "gold"],
- ):
- # Training and plotting with specified years of data
- train = self.stock[
- (
- self.stock["Date"]
- > (max(self.stock["Date"]) - pd.DateOffset(years=self.training_years))
- )
- ]
- # Iterate through all the changepoints and make models
- for i, prior in enumerate(changepoint_priors):
- # Select the changepoint
- self.changepoint_prior_scale = prior
- # Create and train a model with the specified cps
- model = self.create_model()
- model.fit(train)
- future = model.make_future_dataframe(periods=180, freq="D")
- # Make a dataframe to hold predictions
- if i == 0:
- predictions = future.copy()
- future = model.predict(future)
- # Fill in prediction dataframe
- predictions["%.3f_yhat_upper" % prior] = future["yhat_upper"]
- predictions["%.3f_yhat_lower" % prior] = future["yhat_lower"]
- predictions["%.3f_yhat" % prior] = future["yhat"]
- # Remove the weekends
- predictions = self.remove_weekends(predictions)
- # Plot set-up
- self.reset_plot()
- plt.style.use("fivethirtyeight")
- fig, ax = plt.subplots(1, 1)
- # Actual observations
- ax.plot(train["ds"], train["y"], "ko", ms=4, label="Observations")
- color_dict = {prior: color for prior, color in zip(changepoint_priors, colors)}
- # Plot each of the changepoint predictions
- for prior in changepoint_priors:
- # Plot the predictions themselves
- ax.plot(
- predictions["ds"],
- predictions["%.3f_yhat" % prior],
- linewidth=1.2,
- color=color_dict[prior],
- label="%.3f prior scale" % prior,
- )
- # Plot the uncertainty interval
- ax.fill_between(
- predictions["ds"].dt.to_pydatetime(),
- predictions["%.3f_yhat_upper" % prior],
- predictions["%.3f_yhat_lower" % prior],
- facecolor=color_dict[prior],
- alpha=0.3,
- edgecolor="k",
- linewidth=0.6,
- )
- # Plot labels
- plt.legend(loc=2, prop={"size": 10})
- plt.xlabel("Date")
- plt.ylabel("Stock Price ($)")
- plt.title("Effect of Changepoint Prior Scale")
- plt.show()
- # Basic prophet model for specified number of days
- def create_prophet_model(self, days=0, resample=False):
- self.reset_plot()
- model = self.create_model()
- # Fit on the stock history for self.training_years number of years
- stock_history = self.stock[
- self.stock["Date"]
- > (self.max_date - pd.DateOffset(years=self.training_years))
- ]
- if resample:
- stock_history = self.resample(stock_history)
- model.fit(stock_history)
- # Make and predict for next year with future dataframe
- future = model.make_future_dataframe(periods=days, freq="D")
- future = model.predict(future)
- if days > 0:
- # Print the predicted price
- print(
- "Predicted Price on {} = ${:.2f}".format(
- future.loc[future.index[-1], "ds"],
- future.loc[future.index[-1], "yhat"],
- )
- )
- title = "%s Historical and Predicted Stock Price" % self.symbol
- else:
- title = "%s Historical and Modeled Stock Price" % self.symbol
- # Set up the plot
- fig, ax = plt.subplots(1, 1)
- # Plot the actual values
- ax.plot(
- stock_history["ds"],
- stock_history["y"],
- "ko-",
- linewidth=1.4,
- alpha=0.8,
- ms=1.8,
- label="Observations",
- )
- # Plot the predicted values
- ax.plot(
- future["ds"], future["yhat"], "forestgreen", linewidth=2.4, label="Modeled"
- )
- # Plot the uncertainty interval as ribbon
- ax.fill_between(
- future["ds"].dt.to_pydatetime(),
- future["yhat_upper"],
- future["yhat_lower"],
- alpha=0.3,
- facecolor="g",
- edgecolor="k",
- linewidth=1.4,
- label="Confidence Interval",
- )
- # Plot formatting
- plt.legend(loc=2, prop={"size": 10})
- plt.xlabel("Date")
- plt.ylabel("Price $")
- plt.grid(linewidth=0.6, alpha=0.6)
- plt.title(title)
- plt.show()
- return model, future
- # Evaluate prediction model for one year
- def evaluate_prediction(self, start_date=None, end_date=None, nshares=None):
- # Default start date is one year before end of data
- # Default end date is end date of data
- if start_date is None:
- start_date = self.max_date - pd.DateOffset(years=1)
- if end_date is None:
- end_date = self.max_date
- start_date, end_date = self.handle_dates(start_date, end_date)
- # Training data starts self.training_years years before start date and goes up to start date
- train = self.stock[
- (self.stock["Date"] < start_date)
- & (
- self.stock["Date"]
- > (start_date - pd.DateOffset(years=self.training_years))
- )
- ]
- # Testing data is specified in the range
- test = self.stock[
- (self.stock["Date"] >= start_date) & (self.stock["Date"] <= end_date)
- ]
- # Create and train the model
- model = self.create_model()
- model.fit(train)
- # Make a future dataframe and predictions
- future = model.make_future_dataframe(periods=365, freq="D")
- future = model.predict(future)
- # Merge predictions with the known values
- test = pd.merge(test, future, on="ds", how="inner")
- train = pd.merge(train, future, on="ds", how="inner")
- # Calculate the differences between consecutive measurements
- test["pred_diff"] = test["yhat"].diff()
- test["real_diff"] = test["y"].diff()
- # Correct is when we predicted the correct direction
- test["correct"] = (
- np.sign(test["pred_diff"][1:]) == np.sign(test["real_diff"][1:])
- ) * 1
- # Accuracy when we predict increase and decrease
- increase_accuracy = 100 * np.mean(test[test["pred_diff"] > 0]["correct"])
- decrease_accuracy = 100 * np.mean(test[test["pred_diff"] < 0]["correct"])
- # Calculate mean absolute error
- test_errors = abs(test["y"] - test["yhat"])
- test_mean_error = np.mean(test_errors)
- train_errors = abs(train["y"] - train["yhat"])
- train_mean_error = np.mean(train_errors)
- # Calculate percentage of time actual value within prediction range
- test["in_range"] = False
- for i in test.index:
- if (test.loc[i, "y"] < test.loc[i, "yhat_upper"]) & (
- test.loc[i, "y"] > test.loc[i, "yhat_lower"]
- ):
- test.loc[i, "in_range"] = True
- in_range_accuracy = 100 * np.mean(test["in_range"])
- if not nshares:
- # Date range of predictions
- print("\nPrediction Range: {} to {}.".format(start_date, end_date))
- # Final prediction vs actual value
- print(
- "\nPredicted price on {} = ${:.2f}.".format(
- max(future["ds"]), future.loc[future.index[-1], "yhat"]
- )
- )
- print(
- "Actual price on {} = ${:.2f}.\n".format(
- max(test["ds"]), test.loc[test.index[-1], "y"]
- )
- )
- print(
- "Average Absolute Error on Training Data = ${:.2f}.".format(
- train_mean_error
- )
- )
- print(
- "Average Absolute Error on Testing Data = ${:.2f}.\n".format(
- test_mean_error
- )
- )
- # Direction accuracy
- print(
- "When the model predicted an increase, the price increased {:.2f}% of the time.".format(
- increase_accuracy
- )
- )
- print(
- "When the model predicted a decrease, the price decreased {:.2f}% of the time.\n".format(
- decrease_accuracy
- )
- )
- print(
- "The actual value was within the {:d}% confidence interval {:.2f}% of the time.".format(
- int(100 * model.interval_width), in_range_accuracy
- )
- )
- # Reset the plot
- self.reset_plot()
- # Set up the plot
- fig, ax = plt.subplots(1, 1)
- # Plot the actual values
- ax.plot(
- train["ds"],
- train["y"],
- "ko-",
- linewidth=1.4,
- alpha=0.8,
- ms=1.8,
- label="Observations",
- )
- ax.plot(
- test["ds"],
- test["y"],
- "ko-",
- linewidth=1.4,
- alpha=0.8,
- ms=1.8,
- label="Observations",
- )
- # Plot the predicted values
- ax.plot(
- future["ds"], future["yhat"], "navy", linewidth=2.4, label="Predicted"
- )
- # Plot the uncertainty interval as ribbon
- ax.fill_between(
- future["ds"].dt.to_pydatetime(),
- future["yhat_upper"],
- future["yhat_lower"],
- alpha=0.6,
- facecolor="gold",
- edgecolor="k",
- linewidth=1.4,
- label="Confidence Interval",
- )
- # Put a vertical line at the start of predictions
- plt.vlines(
- x=min(test["ds"]),
- ymin=min(future["yhat_lower"]),
- ymax=max(future["yhat_upper"]),
- colors="r",
- linestyles="dashed",
- label="Prediction Start",
- )
- # Plot formatting
- plt.legend(loc=2, prop={"size": 8})
- plt.xlabel("Date")
- plt.ylabel("Price $")
- plt.grid(linewidth=0.6, alpha=0.6)
- plt.title(
- "{} Model Evaluation from {} to {}.".format(
- self.symbol, start_date, end_date
- )
- )
- plt.show()
- # If a number of shares is specified, play the game
- elif nshares:
- # Only playing the stocks when we predict the stock will increase
- test_pred_increase = test[test["pred_diff"] > 0]
- test_pred_increase.reset_index(inplace=True)
- prediction_profit = []
- # Iterate through all the predictions and calculate profit from playing
- for i, correct in enumerate(test_pred_increase["correct"]):
- # If we predicted up and the price goes up, we gain the difference
- if correct == 1:
- prediction_profit.append(
- nshares * test_pred_increase.loc[i, "real_diff"]
- )
- # If we predicted up and the price goes down, we lose the difference
- else:
- prediction_profit.append(
- nshares * test_pred_increase.loc[i, "real_diff"]
- )
- test_pred_increase["pred_profit"] = prediction_profit
- # Put the profit into the test dataframe
- test = pd.merge(
- test, test_pred_increase[["ds", "pred_profit"]], on="ds", how="left"
- )
- test.loc[0, "pred_profit"] = 0
- # Profit for either method at all dates
- test["pred_profit"] = test["pred_profit"].cumsum().ffill()
- test["hold_profit"] = nshares * (test["y"] - float(test.loc[0, "y"]))
- # Display information
- print(
- "You played the stock market in {} from {} to {} with {} shares.\n".format(
- self.symbol, start_date, end_date, nshares
- )
- )
- print(
- "When the model predicted an increase, the price increased {:.2f}% of the time.".format(
- increase_accuracy
- )
- )
- print(
- "When the model predicted a decrease, the price decreased {:.2f}% of the time.\n".format(
- decrease_accuracy
- )
- )
- # Display some friendly information about the perils of playing the stock market
- print(
- "The total profit using the Prophet model = ${:.2f}.".format(
- np.sum(prediction_profit)
- )
- )
- print(
- "The Buy and Hold strategy profit = ${:.2f}.".format(
- float(test.loc[test.index[-1], "hold_profit"])
- )
- )
- print("\nThanks for playing the stock market!\n")
- # Plot the predicted and actual profits over time
- self.reset_plot()
- # Final profit and final smart used for locating text
- final_profit = test.loc[test.index[-1], "pred_profit"]
- final_smart = test.loc[test.index[-1], "hold_profit"]
- # text location
- last_date = test.loc[test.index[-1], "ds"]
- text_location = last_date - pd.DateOffset(months=1)
- plt.style.use("dark_background")
- # Plot smart profits
- plt.plot(
- test["ds"],
- test["hold_profit"],
- "b",
- linewidth=1.8,
- label="Buy and Hold Strategy",
- )
- # Plot prediction profits
- plt.plot(
- test["ds"],
- test["pred_profit"],
- color="g" if final_profit > 0 else "r",
- linewidth=1.8,
- label="Prediction Strategy",
- )
- # Display final values on graph
- plt.text(
- x=text_location,
- y=final_profit + (final_profit / 40),
- s="$%d" % final_profit,
- color="g" if final_profit > 0 else "r",
- size=18,
- )
- plt.text(
- x=text_location,
- y=final_smart + (final_smart / 40),
- s="$%d" % final_smart,
- color="g" if final_smart > 0 else "r",
- size=18,
- )
- # Plot formatting
- plt.ylabel("Profit (US $)")
- plt.xlabel("Date")
- plt.title("Predicted versus Buy and Hold Profits")
- plt.legend(loc=2, prop={"size": 10})
- plt.grid(alpha=0.2)
- plt.show()
- def retrieve_google_trends(self, search, date_range):
- # Set up the trend fetching object
- pytrends = TrendReq(hl="en-US", tz=360)
- kw_list = [search]
- try:
- # Create the search object
- pytrends.build_payload(
- kw_list, cat=0, timeframe=date_range[0], geo="", gprop="news"
- )
- # Retrieve the interest over time
- trends = pytrends.interest_over_time()
- related_queries = pytrends.related_queries()
- except Exception as e:
- print("\nGoogle Search Trend retrieval failed.")
- print(e)
- return
- return trends, related_queries
- def changepoint_date_analysis(self, search=None):
- self.reset_plot()
- model = self.create_model()
- # Use past self.training_years years of data
- train = self.stock[
- self.stock["Date"]
- > (self.max_date - pd.DateOffset(years=self.training_years))
- ]
- model.fit(train)
- # Predictions of the training data (no future periods)
- future = model.make_future_dataframe(periods=0, freq="D")
- future = model.predict(future)
- train = pd.merge(train, future[["ds", "yhat"]], on="ds", how="inner")
- changepoints = model.changepoints
- train = train.reset_index(drop=True)
- # Create dataframe of only changepoints
- change_indices = []
- for changepoint in changepoints:
- change_indices.append(train[train["ds"] == changepoint].index[0])
- c_data = train.loc[change_indices, :]
- deltas = model.params["delta"][0]
- c_data["delta"] = deltas
- c_data["abs_delta"] = abs(c_data["delta"])
- # Sort the values by maximum change
- c_data = c_data.sort_values(by="abs_delta", ascending=False)
- # Limit to 10 largest changepoints
- c_data = c_data[:10]
- # Separate into negative and positive changepoints
- cpos_data = c_data[c_data["delta"] > 0]
- cneg_data = c_data[c_data["delta"] < 0]
- # Changepoints and data
- if not search:
- print("\nChangepoints sorted by slope rate of change (2nd derivative):\n")
- print(c_data.loc[:, ["Date", "Adj. Close", "delta"]][:5])
- # Line plot showing actual values, estimated values, and changepoints
- self.reset_plot()
- # Set up line plot
- plt.plot(train["ds"], train["y"], "ko", ms=4, label="Stock Price")
- plt.plot(
- future["ds"],
- future["yhat"],
- color="navy",
- linewidth=2.0,
- label="Modeled",
- )
- # Changepoints as vertical lines
- plt.vlines(
- cpos_data["ds"].dt.to_pydatetime(),
- ymin=min(train["y"]),
- ymax=max(train["y"]),
- linestyles="dashed",
- color="r",
- linewidth=1.2,
- label="Negative Changepoints",
- )
- plt.vlines(
- cneg_data["ds"].dt.to_pydatetime(),
- ymin=min(train["y"]),
- ymax=max(train["y"]),
- linestyles="dashed",
- color="darkgreen",
- linewidth=1.2,
- label="Positive Changepoints",
- )
- plt.legend(prop={"size": 10})
- plt.xlabel("Date")
- plt.ylabel("Price ($)")
- plt.title("Stock Price with Changepoints")
- plt.show()
- # Search for search term in google news
- # Show related queries, rising related queries
- # Graph changepoints, search frequency, stock price
- if search:
- date_range = ["%s %s" % (str(min(train["Date"])), str(max(train["Date"])))]
- # Get the Google Trends for specified terms and join to training dataframe
- trends, related_queries = self.retrieve_google_trends(search, date_range)
- if (trends is None) or (related_queries is None):
- print("No search trends found for %s" % search)
- return
- print("\n Top Related Queries: \n")
- print(related_queries[search]["top"].head())
- print("\n Rising Related Queries: \n")
- print(related_queries[search]["rising"].head())
- # Upsample the data for joining with training data
- trends = trends.resample("D")
- trends = trends.reset_index(level=0)
- trends = trends.rename(columns={"date": "ds", search: "freq"})
- # Interpolate the frequency
- trends["freq"] = trends["freq"].interpolate()
- # Merge with the training data
- train = pd.merge(train, trends, on="ds", how="inner")
- # Normalize values
- train["y_norm"] = train["y"] / max(train["y"])
- train["freq_norm"] = train["freq"] / max(train["freq"])
- self.reset_plot()
- # Plot the normalized stock price and normalize search frequency
- plt.plot(train["ds"], train["y_norm"], "k-", label="Stock Price")
- plt.plot(
- train["ds"],
- train["freq_norm"],
- color="goldenrod",
- label="Search Frequency",
- )
- # Changepoints as vertical lines
- plt.vlines(
- cpos_data["ds"].dt.to_pydatetime(),
- ymin=0,
- ymax=1,
- linestyles="dashed",
- color="r",
- linewidth=1.2,
- label="Negative Changepoints",
- )
- plt.vlines(
- cneg_data["ds"].dt.to_pydatetime(),
- ymin=0,
- ymax=1,
- linestyles="dashed",
- color="darkgreen",
- linewidth=1.2,
- label="Positive Changepoints",
- )
- # Plot formatting
- plt.legend(prop={"size": 10})
- plt.xlabel("Date")
- plt.ylabel("Normalized Values")
- plt.title(
- "%s Stock Price and Search Frequency for %s" % (self.symbol, search)
- )
- plt.show()
- # Predict the future price for a given range of days
- def predict_future(self, days=30):
- # Use past self.training_years years for training
- train = self.stock[
- self.stock["Date"]
- > (max(self.stock["Date"]) - pd.DateOffset(years=self.training_years))
- ]
- model = self.create_model()
- model.fit(train)
- # Future dataframe with specified number of days to predict
- future = model.make_future_dataframe(periods=days, freq="D")
- future = model.predict(future)
- # Only concerned with future dates
- future = future[future["ds"] >= max(self.stock["Date"])]
- # Remove the weekends
- future = self.remove_weekends(future)
- # Calculate whether increase or not
- future["diff"] = future["yhat"].diff()
- future = future.dropna()
- # Find the prediction direction and create separate dataframes
- future["direction"] = (future["diff"] > 0) * 1
- # Rename the columns for presentation
- future = future.rename(
- columns={
- "ds": "Date",
- "yhat": "estimate",
- "diff": "change",
- "yhat_upper": "upper",
- "yhat_lower": "lower",
- }
- )
- future_increase = future[future["direction"] == 1]
- future_decrease = future[future["direction"] == 0]
- # Print out the dates
- print("\nPredicted Increase: \n")
- print(future_increase[["Date", "estimate", "change", "upper", "lower"]])
- print("\nPredicted Decrease: \n")
- print(future_decrease[["Date", "estimate", "change", "upper", "lower"]])
- self.reset_plot()
- # Set up plot
- plt.style.use("fivethirtyeight")
- matplotlib.rcParams["axes.labelsize"] = 10
- matplotlib.rcParams["xtick.labelsize"] = 8
- matplotlib.rcParams["ytick.labelsize"] = 8
- matplotlib.rcParams["axes.titlesize"] = 12
- # Plot the predictions and indicate if increase or decrease
- fig, ax = plt.subplots(1, 1, figsize=(8, 6))
- # Plot the estimates
- ax.plot(
- future_increase["Date"],
- future_increase["estimate"],
- "g^",
- ms=12,
- label="Pred. Increase",
- )
- ax.plot(
- future_decrease["Date"],
- future_decrease["estimate"],
- "rv",
- ms=12,
- label="Pred. Decrease",
- )
- # Plot errorbars
- ax.errorbar(
- future["Date"].dt.to_pydatetime(),
- future["estimate"],
- yerr=future["upper"] - future["lower"],
- capthick=1.4,
- color="k",
- linewidth=2,
- ecolor="darkblue",
- capsize=4,
- elinewidth=1,
- label="Pred with Range",
- )
- # Plot formatting
- plt.legend(loc=2, prop={"size": 10})
- plt.xticks(rotation="45")
- plt.ylabel("Predicted Stock Price (US $)")
- plt.xlabel("Date")
- plt.title("Predictions for %s" % self.symbol)
- plt.show()
- def changepoint_prior_validation(
- self, start_date=None, end_date=None, changepoint_priors=[0.001, 0.05, 0.1, 0.2]
- ):
- # Default start date is two years before end of data
- # Default end date is one year before end of data
- if start_date is None:
- start_date = self.max_date - pd.DateOffset(years=2)
- if end_date is None:
- end_date = self.max_date - pd.DateOffset(years=1)
- # Convert to pandas datetime for indexing dataframe
- start_date = pd.to_datetime(start_date)
- end_date = pd.to_datetime(end_date)
- start_date, end_date = self.handle_dates(start_date, end_date)
- # Select self.training_years number of years
- train = self.stock[
- (
- self.stock["Date"]
- > (start_date - pd.DateOffset(years=self.training_years))
- )
- & (self.stock["Date"] < start_date)
- ]
- # Testing data is specified by range
- test = self.stock[
- (self.stock["Date"] >= start_date) & (self.stock["Date"] <= end_date)
- ]
- eval_days = (max(test["Date"]) - min(test["Date"])).days
- results = pd.DataFrame(
- 0,
- index=list(range(len(changepoint_priors))),
- columns=["cps", "train_err", "train_range", "test_err", "test_range"],
- )
- print(
- "\nValidation Range {} to {}.\n".format(
- min(test["Date"]), max(test["Date"])
- )
- )
- # Iterate through all the changepoints and make models
- for i, prior in enumerate(changepoint_priors):
- results.loc[i, "cps"] = prior
- # Select the changepoint
- self.changepoint_prior_scale = prior
- # Create and train a model with the specified cps
- model = self.create_model()
- model.fit(train)
- future = model.make_future_dataframe(periods=eval_days, freq="D")
- future = model.predict(future)
- # Training results and metrics
- train_results = pd.merge(
- train,
- future[["ds", "yhat", "yhat_upper", "yhat_lower"]],
- on="ds",
- how="inner",
- )
- avg_train_error = np.mean(abs(train_results["y"] - train_results["yhat"]))
- avg_train_uncertainty = np.mean(
- abs(train_results["yhat_upper"] - train_results["yhat_lower"])
- )
- results.loc[i, "train_err"] = avg_train_error
- results.loc[i, "train_range"] = avg_train_uncertainty
- # Testing results and metrics
- test_results = pd.merge(
- test,
- future[["ds", "yhat", "yhat_upper", "yhat_lower"]],
- on="ds",
- how="inner",
- )
- avg_test_error = np.mean(abs(test_results["y"] - test_results["yhat"]))
- avg_test_uncertainty = np.mean(
- abs(test_results["yhat_upper"] - test_results["yhat_lower"])
- )
- results.loc[i, "test_err"] = avg_test_error
- results.loc[i, "test_range"] = avg_test_uncertainty
- print(results)
- # Plot of training and testing average errors
- self.reset_plot()
- plt.plot(results["cps"], results["train_err"], "bo-", ms=8, label="Train Error")
- plt.plot(results["cps"], results["test_err"], "r*-", ms=8, label="Test Error")
- plt.xlabel("Changepoint Prior Scale")
- plt.ylabel("Avg. Absolute Error ($)")
- plt.title("Training and Testing Curves as Function of CPS")
- plt.grid(color="k", alpha=0.3)
- plt.xticks(results["cps"], results["cps"])
- plt.legend(prop={"size": 10})
- plt.show()
- # Plot of training and testing average uncertainty
- self.reset_plot()
- plt.plot(
- results["cps"], results["train_range"], "bo-", ms=8, label="Train Range"
- )
- plt.plot(results["cps"], results["test_range"], "r*-", ms=8, label="Test Range")
- plt.xlabel("Changepoint Prior Scale")
- plt.ylabel("Avg. Uncertainty ($)")
- plt.title("Uncertainty in Estimate as Function of CPS")
- plt.grid(color="k", alpha=0.3)
- plt.xticks(results["cps"], results["cps"])
- plt.legend(prop={"size": 10})
- plt.show()
|