import pandas as pd import numpy as np import statsmodels.api as sm from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error from scipy import stats import plotly.graph_objs as go import cufflinks cufflinks.go_offline() def make_hist(df, x, category=None): """ Make an interactive histogram, optionally segmented by `category` :param df: dataframe of data :param x: string of column to use for plotting :param category: string representing column to segment by :return figure: a plotly histogram to show with iplot or plot """ if category is not None: data = [] for name, group in df.groupby(category): data.append(go.Histogram(dict(x=group[x], name=name))) else: data = [go.Histogram(dict(x=df[x]))] layout = go.Layout( yaxis=dict(title="Count"), xaxis=dict(title=x.replace('_', ' ').title()), title=f"{x.replace('_', ' ').title()} Distribution by {category.replace('_', ' ').title()}" if category else f"{x.replace('_', ' ').title()} Distribution", ) figure = go.Figure(data=data, layout=layout) return figure def make_cum_plot(df, y, category=None, ranges=False): """ Make an interactive cumulative plot, optionally segmented by `category` :param df: dataframe of data, must have a `published_date` column :param y: string of column to use for plotting or list of two strings for double y axis :param category: string representing column to segment by :param ranges: boolean for whether to add range slider and range selector :return figure: a plotly plot to show with iplot or plot """ if category is not None: data = [] for i, (name, group) in enumerate(df.groupby(category)): group.sort_values("published_date", inplace=True) data.append( go.Scatter( x=group["published_date"], y=group[y].cumsum(), mode="lines+markers", text=group["title"], name=name, marker=dict(size=10, opacity=0.8, symbol=i + 2), ) ) else: df.sort_values("published_date", inplace=True) if len(y) == 2: data = [ go.Scatter( x=df["published_date"], y=df[y[0]].cumsum(), name=y[0].title(), mode="lines+markers", text=df["title"], marker=dict(size=10, color='blue', opacity=0.6, line=dict(color='black'), )), go.Scatter( x=df["published_date"], y=df[y[1]].cumsum(), yaxis='y2', name=y[1].title(), mode="lines+markers", text=df["title"], marker=dict(size=10, color='red', opacity=0.6, line=dict(color='black'), )), ] else: data = [ go.Scatter( x=df["published_date"], y=df[y].cumsum(), mode="lines+markers", text=df["title"], marker=dict(size=12, color='blue', opacity=0.6, line=dict(color='black'), ), ) ] if len(y) == 2: layout = go.Layout( xaxis=dict(title="Published Date", type="date"), yaxis=dict(title=y[0].replace('_', ' ').title(), color='blue'), yaxis2=dict(title=y[1].replace('_', ' ').title(), color='red', overlaying='y', side='right'), font=dict(size=14), title=f"Cumulative {y[0].title()} and {y[1].title()}", ) else: layout = go.Layout( xaxis=dict(title="Published Date", type="date"), yaxis=dict(title=y.replace('_', ' ').title()), font=dict(size=14), title=f"Cumulative {y.replace('_', ' ').title()} by {category.replace('_', ' ').title()}" if category is not None else f"Cumulative {y.replace('_', ' ').title()}", ) # Add a rangeselector and rangeslider for a data xaxis if ranges: rangeselector = dict( buttons=list( [ dict(count=1, label="1m", step="month", stepmode="backward"), dict(count=6, label="6m", step="month", stepmode="backward"), dict(count=1, label="1y", step="year", stepmode="backward"), dict(step="all"), ] ) ) rangeslider = dict(visible=True) layout["xaxis"]["rangeselector"] = rangeselector layout["xaxis"]["rangeslider"] = rangeslider layout['width'] = 1000 layout['height'] = 600 figure = go.Figure(data=data, layout=layout) return figure def make_scatter_plot(df, x, y, fits=None, xlog=False, ylog=False, category=None, scale=None, sizeref=2, annotations=None, ranges=False, title_override=None): """ Make an interactive scatterplot, optionally segmented by `category` :param df: dataframe of data :param x: string of column to use for xaxis :param y: string of column to use for yaxis :param fits: list of strings of fits :param xlog: boolean for making a log xaxis :param ylog boolean for making a log yaxis :param category: string representing categorical column to segment by, this must be a categorical :param scale: string representing numerical column to size and color markers by, this must be numerical data :param sizeref: float or integer for setting the size of markers according to the scale, only used if scale is set :param annotations: text to display on the plot (dictionary) :param ranges: boolean for whether to add a range slider and selector :param title_override: String to override the title :return figure: a plotly plot to show with iplot or plot """ if category is not None: title = f"{y.replace('_', ' ').title()} vs {x.replace('_', ' ').title()} by {category.replace('_', ' ').title()}" data = [] for i, (name, group) in enumerate(df.groupby(category)): data.append(go.Scatter(x=group[x], y=group[y], mode='markers', text=group['title'], name=name, marker=dict(size=8, symbol=i + 2))) else: if scale is not None: title = f"{y.replace('_', ' ').title()} vs {x.replace('_', ' ').title()} Scaled by {scale.title()}" data = [go.Scatter(x=df[x], y=df[y], mode='markers', text=df['title'], marker=dict(size=df[scale], line=dict(color='black', width=0.5), sizemode='area', sizeref=sizeref, opacity=0.8, colorscale='Viridis', color=df[scale], showscale=True, sizemin=2))] else: df.sort_values(x, inplace=True) title = f"{y.replace('_', ' ').title()} vs {x.replace('_', ' ').title()}" data = [go.Scatter(x=df[x], y=df[y], mode='markers', text=df['title'], marker=dict( size=12, color='blue', opacity=0.8, line=dict(color='black')), name='observations')] if fits is not None: for fit in fits: data.append(go.Scatter(x=df[x], y=df[fit], text=df['title'], mode='lines+markers', marker=dict (size=8, opacity=0.6), line=dict(dash='dash'), name=fit)) title += ' with Fit' layout = go.Layout(annotations=annotations, xaxis=dict(title=x.replace('_', ' ').title() + (' (log scale)' if xlog else ''), type='log' if xlog else None), yaxis=dict(title=y.replace('_', ' ').title() + (' (log scale)' if ylog else ''), type='log' if ylog else None), font=dict(size=14), title=title if title_override is None else title_override, ) # Add a rangeselector and rangeslider for a data xaxis if ranges: rangeselector = dict( buttons=list( [ dict(count=1, label="1m", step="month", stepmode="backward"), dict(count=6, label="6m", step="month", stepmode="backward"), dict(count=1, label="1y", step="year", stepmode="backward"), dict(step="all"), ] ) ) rangeslider = dict(visible=True) layout["xaxis"]["rangeselector"] = rangeselector layout["xaxis"]["rangeslider"] = rangeslider layout['width'] = 1000 layout['height'] = 600 figure = go.Figure(data=data, layout=layout) return figure def make_linear_regression(df, x, y, intercept_0): """ Create a linear regression, either with the intercept set to 0 or the intercept allowed to be fitted :param df: dataframe with data :param x: string or list of stringsfor the name of the column with x data :param y: string for the name of the column with y data :param intercept_0: boolean indicating whether to set the intercept to 0 """ if isinstance(x, list): lin_model = LinearRegression() lin_model.fit(df[x], df[y]) slopes, intercept, = lin_model.coef_, lin_model.intercept_ df['predicted'] = lin_model.predict(df[x]) r2 = lin_model.score(df[x], df[y]) rmse = np.sqrt(mean_squared_error( y_true=df[y], y_pred=df['predicted'])) equation = f'{y.replace("_", " ")} =' names = ['r2', 'rmse', 'intercept'] values = [r2, rmse, intercept] for i, (p, s) in enumerate(zip(x, slopes)): if (i + 1) % 3 == 0: equation += f'
{s:.2f} * {p.replace("_", " ")} +' else: equation += f' {s:.2f} * {p.replace("_", " ")} +' names.append(p) values.append(s) equation += f' {intercept:.2f}' annotations = [dict(x=0.4 * df.index.max(), y=0.9 * df[y].max(), showarrow=False, text=equation, font=dict(size=10))] df['index'] = list(df.index) figure = make_scatter_plot(df, x='index', y=y, fits=[ 'predicted'], annotations=annotations) summary = pd.DataFrame({'name': names, 'value': values}) else: if intercept_0: lin_reg = sm.OLS(df[y], df[x]).fit() df['fit_values'] = lin_reg.fittedvalues summary = lin_reg.summary() slope = float(lin_reg.params) equation = f"${y.replace('_', ' ')} = {slope:.2f} * {x.replace('_', ' ')}$" else: lin_reg = stats.linregress(df[x], df[y]) intercept, slope = lin_reg.intercept, lin_reg.slope params = ['pvalue', 'rvalue', 'slope', 'intercept'] values = [] for p in params: values.append(getattr(lin_reg, p)) summary = pd.DataFrame({'param': params, 'value': values}) df['fit_values'] = df[x] * slope + intercept equation = f"${y.replace('_', ' ')} = {slope:.2f} * {x.replace('_', ' ')} + {intercept:.2f}$" annotations = [dict(x=0.75 * df[x].max(), y=0.9 * df[y].max(), showarrow=False, text=equation, font=dict(size=32))] figure = make_scatter_plot( df, x=x, y=y, fits=['fit_values'], annotations=annotations) return figure, summary def make_poly_fits(df, x, y, degree=6): """ Generate fits and make interactive plot with fits :param df: dataframe with data :param x: string representing x data column :param y: string representing y data column :param degree: integer degree of fits to go up to :return fit_stats: dataframe with information about fits :return figure: interactive plotly figure that can be shown with iplot or plot """ # Don't want to alter original data frame df = df.copy() fit_list = [] rmse = [] fit_params = [] # Make each fit for i in range(1, degree + 1): fit_name = f'fit degree = {i}' fit_list.append(fit_name) z, res, *rest = np.polyfit(df[x], df[y], i, full=True) fit_params.append(z) df.loc[:, fit_name] = np.poly1d(z)(df[x]) rmse.append(np.sqrt(res[0])) fit_stats = pd.DataFrame( {'fit': fit_list, 'rmse': rmse, 'params': fit_params}) figure = make_scatter_plot(df, x=x, y=y, fits=fit_list) return figure, fit_stats def make_extrapolation(df, y, years, degree=4): """ Extrapolate `y` into the future `years` with `degree` polynomial fit :param df: dataframe of data :param y: string of column to extrapolate :param years: number of years to extrapolate into the future :param degree: integer degree of polynomial fit :return figure: plotly figure for display using iplot or plot :return future_df: extrapolated numbers into the future """ df = df.copy() x = 'days_since_start' df['days_since_start'] = ( (df['published_date'] - df['published_date'].min()). dt.total_seconds() / (3600 * 24)).astype(int) cumy = f'cum_{y}' df[cumy] = df.sort_values(x)[y].cumsum() figure, summary = make_poly_fits(df, x, cumy, degree=degree) min_date = df['published_date'].min() max_date = df['published_date'].max() date_range = pd.date_range(start=min_date, end=max_date + pd.Timedelta(days=int(years * 365))) future_df = pd.DataFrame({'date': date_range}) future_df[x] = ( (future_df['date'] - future_df['date'].min()). dt.total_seconds() / (3600 * 24)).astype(int) newcumy = f'cumulative_{y}' future_df = future_df.merge(df[[x, cumy]], on=x, how='left').\ rename(columns={cumy: newcumy}) z = np.poly1d(summary.iloc[-1]['params']) pred_name = f'predicted_{y}' future_df[pred_name] = z(future_df[x]) future_df['title'] = '' last_date = future_df.loc[future_df['date'].idxmax()] prediction_text = ( f"On {last_date['date'].date()} the {y} will be {float(last_date[pred_name]):,.0f}.") annotations = [dict(x=future_df['date'].quantile(0.4), y=0.8 * future_df[pred_name].max(), text=prediction_text, showarrow=False, font=dict(size=16))] title_override = f'{y.replace("_", " ").title()} with Extrapolation {years} Years into the Future' figure = make_scatter_plot(future_df, 'date', newcumy, fits=[ pred_name], annotations=annotations, ranges=True, title_override=title_override) return figure, future_df