123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- import pandas as pd
- import numpy as np
- import statsmodels.api as sm
- from sklearn.linear_model import LinearRegression
- from sklearn.metrics import mean_squared_error
- from scipy import stats
- import plotly.graph_objs as go
- import cufflinks
- cufflinks.go_offline()
- def make_hist(df, x, category=None):
- """
- Make an interactive histogram, optionally segmented by `category`
- :param df: dataframe of data
- :param x: string of column to use for plotting
- :param category: string representing column to segment by
- :return figure: a plotly histogram to show with iplot or plot
- """
- if category is not None:
- data = []
- for name, group in df.groupby(category):
- data.append(go.Histogram(dict(x=group[x], name=name)))
- else:
- data = [go.Histogram(dict(x=df[x]))]
- layout = go.Layout(
- yaxis=dict(title="Count"),
- xaxis=dict(title=x.replace("_", " ").title()),
- title=f"{x.replace('_', ' ').title()} Distribution by {category.replace('_', ' ').title()}"
- if category
- else f"{x.replace('_', ' ').title()} Distribution",
- )
- figure = go.Figure(data=data, layout=layout)
- return figure
- def make_cum_plot(df, y, category=None, ranges=False):
- """
- Make an interactive cumulative plot, optionally segmented by `category`
- :param df: dataframe of data, must have a `published_date` column
- :param y: string of column to use for plotting or list of two strings for double y axis
- :param category: string representing column to segment by
- :param ranges: boolean for whether to add range slider and range selector
- :return figure: a plotly plot to show with iplot or plot
- """
- if category is not None:
- data = []
- for i, (name, group) in enumerate(df.groupby(category)):
- group.sort_values("published_date", inplace=True)
- data.append(
- go.Scatter(
- x=group["published_date"],
- y=group[y].cumsum(),
- mode="lines+markers",
- text=group["title"],
- name=name,
- marker=dict(size=10, opacity=0.8, symbol=i + 2),
- )
- )
- else:
- df.sort_values("published_date", inplace=True)
- if len(y) == 2:
- data = [
- go.Scatter(
- x=df["published_date"],
- y=df[y[0]].cumsum(),
- name=y[0].title(),
- mode="lines+markers",
- text=df["title"],
- marker=dict(
- size=10,
- color="blue",
- opacity=0.6,
- line=dict(color="black"),
- ),
- ),
- go.Scatter(
- x=df["published_date"],
- y=df[y[1]].cumsum(),
- yaxis="y2",
- name=y[1].title(),
- mode="lines+markers",
- text=df["title"],
- marker=dict(
- size=10,
- color="red",
- opacity=0.6,
- line=dict(color="black"),
- ),
- ),
- ]
- else:
- data = [
- go.Scatter(
- x=df["published_date"],
- y=df[y].cumsum(),
- mode="lines+markers",
- text=df["title"],
- marker=dict(
- size=12,
- color="blue",
- opacity=0.6,
- line=dict(color="black"),
- ),
- )
- ]
- if len(y) == 2:
- layout = go.Layout(
- xaxis=dict(title="Published Date", type="date"),
- yaxis=dict(title=y[0].replace("_", " ").title(), color="blue"),
- yaxis2=dict(
- title=y[1].replace("_", " ").title(),
- color="red",
- overlaying="y",
- side="right",
- ),
- font=dict(size=14),
- title=f"Cumulative {y[0].title()} and {y[1].title()}",
- )
- else:
- layout = go.Layout(
- xaxis=dict(title="Published Date", type="date"),
- yaxis=dict(title=y.replace("_", " ").title()),
- font=dict(size=14),
- title=f"Cumulative {y.replace('_', ' ').title()} by {category.replace('_', ' ').title()}"
- if category is not None
- else f"Cumulative {y.replace('_', ' ').title()}",
- )
- # Add a rangeselector and rangeslider for a data xaxis
- if ranges:
- rangeselector = dict(
- buttons=list(
- [
- dict(count=1, label="1m", step="month", stepmode="backward"),
- dict(count=6, label="6m", step="month", stepmode="backward"),
- dict(count=1, label="1y", step="year", stepmode="backward"),
- dict(step="all"),
- ]
- )
- )
- rangeslider = dict(visible=True)
- layout["xaxis"]["rangeselector"] = rangeselector
- layout["xaxis"]["rangeslider"] = rangeslider
- layout["width"] = 1000
- layout["height"] = 600
- figure = go.Figure(data=data, layout=layout)
- return figure
- def make_scatter_plot(
- df,
- x,
- y,
- fits=None,
- xlog=False,
- ylog=False,
- category=None,
- scale=None,
- sizeref=2,
- annotations=None,
- ranges=False,
- title_override=None,
- ):
- """
- Make an interactive scatterplot, optionally segmented by `category`
- :param df: dataframe of data
- :param x: string of column to use for xaxis
- :param y: string of column to use for yaxis
- :param fits: list of strings of fits
- :param xlog: boolean for making a log xaxis
- :param ylog boolean for making a log yaxis
- :param category: string representing categorical column to segment by, this must be a categorical
- :param scale: string representing numerical column to size and color markers by, this must be numerical data
- :param sizeref: float or integer for setting the size of markers according to the scale, only used if scale is set
- :param annotations: text to display on the plot (dictionary)
- :param ranges: boolean for whether to add a range slider and selector
- :param title_override: String to override the title
- :return figure: a plotly plot to show with iplot or plot
- """
- if category is not None:
- title = f"{y.replace('_', ' ').title()} vs {x.replace('_', ' ').title()} by {category.replace('_', ' ').title()}"
- data = []
- for i, (name, group) in enumerate(df.groupby(category)):
- data.append(
- go.Scatter(
- x=group[x],
- y=group[y],
- mode="markers",
- text=group["title"],
- name=name,
- marker=dict(size=8, symbol=i + 2),
- )
- )
- else:
- if scale is not None:
- title = f"{y.replace('_', ' ').title()} vs {x.replace('_', ' ').title()} Scaled by {scale.title()}"
- data = [
- go.Scatter(
- x=df[x],
- y=df[y],
- mode="markers",
- text=df["title"],
- marker=dict(
- size=df[scale],
- line=dict(color="black", width=0.5),
- sizemode="area",
- sizeref=sizeref,
- opacity=0.8,
- colorscale="Viridis",
- color=df[scale],
- showscale=True,
- sizemin=2,
- ),
- )
- ]
- else:
- df.sort_values(x, inplace=True)
- title = f"{y.replace('_', ' ').title()} vs {x.replace('_', ' ').title()}"
- data = [
- go.Scatter(
- x=df[x],
- y=df[y],
- mode="markers",
- text=df["title"],
- marker=dict(
- size=12, color="blue", opacity=0.8, line=dict(color="black")
- ),
- name="observations",
- )
- ]
- if fits is not None:
- for fit in fits:
- data.append(
- go.Scatter(
- x=df[x],
- y=df[fit],
- text=df["title"],
- mode="lines+markers",
- marker=dict(size=8, opacity=0.6),
- line=dict(dash="dash"),
- name=fit,
- )
- )
- title += " with Fit"
- layout = go.Layout(
- annotations=annotations,
- xaxis=dict(
- title=x.replace("_", " ").title() + (" (log scale)" if xlog else ""),
- type="log" if xlog else None,
- ),
- yaxis=dict(
- title=y.replace("_", " ").title() + (" (log scale)" if ylog else ""),
- type="log" if ylog else None,
- ),
- font=dict(size=14),
- title=title if title_override is None else title_override,
- )
- # Add a rangeselector and rangeslider for a data xaxis
- if ranges:
- rangeselector = dict(
- buttons=list(
- [
- dict(count=1, label="1m", step="month", stepmode="backward"),
- dict(count=6, label="6m", step="month", stepmode="backward"),
- dict(count=1, label="1y", step="year", stepmode="backward"),
- dict(step="all"),
- ]
- )
- )
- rangeslider = dict(visible=True)
- layout["xaxis"]["rangeselector"] = rangeselector
- layout["xaxis"]["rangeslider"] = rangeslider
- layout["width"] = 1000
- layout["height"] = 600
- figure = go.Figure(data=data, layout=layout)
- return figure
- def make_linear_regression(df, x, y, intercept_0):
- """
- Create a linear regression, either with the intercept set to 0 or
- the intercept allowed to be fitted
- :param df: dataframe with data
- :param x: string or list of stringsfor the name of the column with x data
- :param y: string for the name of the column with y data
- :param intercept_0: boolean indicating whether to set the intercept to 0
- """
- if isinstance(x, list):
- lin_model = LinearRegression()
- lin_model.fit(df[x], df[y])
- slopes, intercept, = (
- lin_model.coef_,
- lin_model.intercept_,
- )
- df["predicted"] = lin_model.predict(df[x])
- r2 = lin_model.score(df[x], df[y])
- rmse = np.sqrt(mean_squared_error(y_true=df[y], y_pred=df["predicted"]))
- equation = f'{y.replace("_", " ")} ='
- names = ["r2", "rmse", "intercept"]
- values = [r2, rmse, intercept]
- for i, (p, s) in enumerate(zip(x, slopes)):
- if (i + 1) % 3 == 0:
- equation += f'<br>{s:.2f} * {p.replace("_", " ")} +'
- else:
- equation += f' {s:.2f} * {p.replace("_", " ")} +'
- names.append(p)
- values.append(s)
- equation += f" {intercept:.2f}"
- annotations = [
- dict(
- x=0.4 * df.index.max(),
- y=0.9 * df[y].max(),
- showarrow=False,
- text=equation,
- font=dict(size=10),
- )
- ]
- df["index"] = list(df.index)
- figure = make_scatter_plot(
- df, x="index", y=y, fits=["predicted"], annotations=annotations
- )
- summary = pd.DataFrame({"name": names, "value": values})
- else:
- if intercept_0:
- lin_reg = sm.OLS(df[y], df[x]).fit()
- df["fit_values"] = lin_reg.fittedvalues
- summary = lin_reg.summary()
- slope = float(lin_reg.params)
- equation = f"${y.replace('_', ' ')} = {slope:.2f} * {x.replace('_', ' ')}$"
- else:
- lin_reg = stats.linregress(df[x], df[y])
- intercept, slope = lin_reg.intercept, lin_reg.slope
- params = ["pvalue", "rvalue", "slope", "intercept"]
- values = []
- for p in params:
- values.append(getattr(lin_reg, p))
- summary = pd.DataFrame({"param": params, "value": values})
- df["fit_values"] = df[x] * slope + intercept
- equation = f"${y.replace('_', ' ')} = {slope:.2f} * {x.replace('_', ' ')} + {intercept:.2f}$"
- annotations = [
- dict(
- x=0.75 * df[x].max(),
- y=0.9 * df[y].max(),
- showarrow=False,
- text=equation,
- font=dict(size=32),
- )
- ]
- figure = make_scatter_plot(
- df, x=x, y=y, fits=["fit_values"], annotations=annotations
- )
- return figure, summary
- def make_poly_fits(df, x, y, degree=6):
- """
- Generate fits and make interactive plot with fits
- :param df: dataframe with data
- :param x: string representing x data column
- :param y: string representing y data column
- :param degree: integer degree of fits to go up to
- :return fit_stats: dataframe with information about fits
- :return figure: interactive plotly figure that can be shown with iplot or plot
- """
- # Don't want to alter original data frame
- df = df.copy()
- fit_list = []
- rmse = []
- fit_params = []
- # Make each fit
- for i in range(1, degree + 1):
- fit_name = f"fit degree = {i}"
- fit_list.append(fit_name)
- z, res, *rest = np.polyfit(df[x], df[y], i, full=True)
- fit_params.append(z)
- df.loc[:, fit_name] = np.poly1d(z)(df[x])
- rmse.append(np.sqrt(res[0]))
- fit_stats = pd.DataFrame({"fit": fit_list, "rmse": rmse, "params": fit_params})
- figure = make_scatter_plot(df, x=x, y=y, fits=fit_list)
- return figure, fit_stats
- def make_extrapolation(df, y, years, degree=4):
- """
- Extrapolate `y` into the future `years` with `degree` polynomial fit
- :param df: dataframe of data
- :param y: string of column to extrapolate
- :param years: number of years to extrapolate into the future
- :param degree: integer degree of polynomial fit
- :return figure: plotly figure for display using iplot or plot
- :return future_df: extrapolated numbers into the future
- """
- df = df.copy()
- x = "days_since_start"
- df["days_since_start"] = (
- (df["published_date"] - df["published_date"].min()).dt.total_seconds()
- / (3600 * 24)
- ).astype(int)
- cumy = f"cum_{y}"
- df[cumy] = df.sort_values(x)[y].cumsum()
- figure, summary = make_poly_fits(df, x, cumy, degree=degree)
- min_date = df["published_date"].min()
- max_date = df["published_date"].max()
- date_range = pd.date_range(
- start=min_date, end=max_date + pd.Timedelta(days=int(years * 365))
- )
- future_df = pd.DataFrame({"date": date_range})
- future_df[x] = (
- (future_df["date"] - future_df["date"].min()).dt.total_seconds() / (3600 * 24)
- ).astype(int)
- newcumy = f"cumulative_{y}"
- future_df = future_df.merge(df[[x, cumy]], on=x, how="left").rename(
- columns={cumy: newcumy}
- )
- z = np.poly1d(summary.iloc[-1]["params"])
- pred_name = f"predicted_{y}"
- future_df[pred_name] = z(future_df[x])
- future_df["title"] = ""
- last_date = future_df.loc[future_df["date"].idxmax()]
- prediction_text = f"On {last_date['date'].date()} the {y} will be {float(last_date[pred_name]):,.0f}."
- annotations = [
- dict(
- x=future_df["date"].quantile(0.4),
- y=0.8 * future_df[pred_name].max(),
- text=prediction_text,
- showarrow=False,
- font=dict(size=16),
- )
- ]
- title_override = f'{y.replace("_", " ").title()} with Extrapolation {years} Years into the Future'
- figure = make_scatter_plot(
- future_df,
- "date",
- newcumy,
- fits=[pred_name],
- annotations=annotations,
- ranges=True,
- title_override=title_override,
- )
- return figure, future_df
|