123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- from argparse import Namespace
- import contextlib
- from datetime import datetime, timedelta
- import io
- import json
- import sys
- from typing import List, Tuple, TYPE_CHECKING
- import warnings
- import numpy
- import tqdm
- from labours.plotting import apply_plot_style, deploy_plot, get_plot_path, import_pyplot
- from labours.utils import default_json, floor_datetime, import_pandas, parse_date
- if TYPE_CHECKING:
- from lifelines import KaplanMeierFitter
- from pandas.core.indexes.datetimes import DatetimeIndex
- def plot_burndown(
- args: Namespace,
- target: str,
- name: str,
- matrix: numpy.ndarray,
- date_range_sampling: 'DatetimeIndex',
- labels: List[int],
- granularity: int,
- sampling: int,
- resample: str,
- ) -> None:
- if args.output and args.output.endswith(".json"):
- data = locals().copy()
- del data["args"]
- data["type"] = "burndown"
- if args.mode == "project" and target == "project":
- output = args.output
- else:
- if target == "project":
- name = "project"
- output = get_plot_path(args.output, name)
- with open(output, "w") as fout:
- json.dump(data, fout, sort_keys=True, default=default_json)
- return
- matplotlib, pyplot = import_pyplot(args.backend, args.style)
- pyplot.stackplot(date_range_sampling, matrix, labels=labels)
- if args.relative:
- for i in range(matrix.shape[1]):
- matrix[:, i] /= matrix[:, i].sum()
- pyplot.ylim(0, 1)
- legend_loc = 3
- else:
- legend_loc = 2
- legend = pyplot.legend(loc=legend_loc, fontsize=args.font_size)
- pyplot.ylabel("Lines of code")
- pyplot.xlabel("Time")
- apply_plot_style(
- pyplot.gcf(), pyplot.gca(), legend, args.background, args.font_size, args.size
- )
- pyplot.xlim(
- parse_date(args.start_date, date_range_sampling[0]),
- parse_date(args.end_date, date_range_sampling[-1]),
- )
- locator = pyplot.gca().xaxis.get_major_locator()
- # set the optimal xticks locator
- if "M" not in resample:
- pyplot.gca().xaxis.set_major_locator(matplotlib.dates.YearLocator())
- locs = pyplot.gca().get_xticks().tolist()
- if len(locs) >= 16:
- pyplot.gca().xaxis.set_major_locator(matplotlib.dates.YearLocator())
- locs = pyplot.gca().get_xticks().tolist()
- if len(locs) >= 16:
- pyplot.gca().xaxis.set_major_locator(locator)
- if locs[0] < pyplot.xlim()[0]:
- del locs[0]
- endindex = -1
- if len(locs) >= 2 and pyplot.xlim()[1] - locs[-1] > (locs[-1] - locs[-2]) / 2:
- locs.append(pyplot.xlim()[1])
- endindex = len(locs) - 1
- startindex = -1
- if len(locs) >= 2 and locs[0] - pyplot.xlim()[0] > (locs[1] - locs[0]) / 2:
- locs.append(pyplot.xlim()[0])
- startindex = len(locs) - 1
- pyplot.gca().set_xticks(locs)
- # hacking time!
- labels = pyplot.gca().get_xticklabels()
- if startindex >= 0:
- labels[startindex].set_text(date_range_sampling[0].date())
- labels[startindex].set_text = lambda _: None
- labels[startindex].set_rotation(30)
- labels[startindex].set_ha("right")
- if endindex >= 0:
- labels[endindex].set_text(date_range_sampling[-1].date())
- labels[endindex].set_text = lambda _: None
- labels[endindex].set_rotation(30)
- labels[endindex].set_ha("right")
- title = "%s %d x %d (granularity %d, sampling %d)" % (
- (name,) + matrix.shape + (granularity, sampling)
- )
- output = args.output
- if output:
- if args.mode == "project" and target == "project":
- output = args.output
- else:
- if target == "project":
- name = "project"
- output = get_plot_path(args.output, name)
- deploy_plot(title, output, args.background)
- def plot_many_burndown(args: Namespace, target: str, header, parts):
- if not args.output:
- print("Warning: output not set, showing %d plots." % len(parts))
- stdout = io.StringIO()
- for name, matrix in tqdm.tqdm(parts):
- with contextlib.redirect_stdout(stdout):
- plot_burndown(
- args, target, *load_burndown(header, name, matrix, args.resample)
- )
- sys.stdout.write(stdout.getvalue())
- def fit_kaplan_meier(matrix: numpy.ndarray) -> 'KaplanMeierFitter':
- from lifelines import KaplanMeierFitter
- T = []
- W = []
- indexes = numpy.arange(matrix.shape[0], dtype=int)
- entries = numpy.zeros(matrix.shape[0], int)
- dead = set()
- for i in range(1, matrix.shape[1]):
- diff = matrix[:, i - 1] - matrix[:, i]
- entries[diff < 0] = i
- mask = diff > 0
- deaths = diff[mask]
- T.append(numpy.full(len(deaths), i) - entries[indexes[mask]])
- W.append(deaths)
- entered = entries > 0
- entered[0] = True
- dead = dead.union(set(numpy.where((matrix[:, i] == 0) & entered)[0]))
- # add the survivors as censored
- nnzind = entries != 0
- nnzind[0] = True
- nnzind[sorted(dead)] = False
- T.append(numpy.full(nnzind.sum(), matrix.shape[1]) - entries[nnzind])
- W.append(matrix[nnzind, -1])
- T = numpy.concatenate(T)
- E = numpy.ones(len(T), bool)
- E[-nnzind.sum() :] = 0
- W = numpy.concatenate(W)
- if T.size == 0:
- return None
- kmf = KaplanMeierFitter().fit(T, E, weights=W)
- return kmf
- def print_survival_function(kmf: 'KaplanMeierFitter', sampling: int) -> None:
- sf = kmf.survival_function_
- sf.index = [timedelta(days=d) for d in sf.index * sampling]
- sf.columns = ["Ratio of survived lines"]
- try:
- print(sf[len(sf) // 6 :: len(sf) // 6].append(sf.tail(1)))
- except ValueError:
- pass
- def interpolate_burndown_matrix(
- matrix: numpy.ndarray, granularity: int, sampling: int, progress: bool = False
- ) -> numpy.ndarray:
- daily = numpy.zeros(
- (matrix.shape[0] * granularity, matrix.shape[1] * sampling), dtype=numpy.float32
- )
- """
- ----------> samples, x
- |
- |
- |
- ⌄
- bands, y
- """
- for y in tqdm.tqdm(range(matrix.shape[0]), disable=(not progress)):
- for x in range(matrix.shape[1]):
- if y * granularity > (x + 1) * sampling:
- # the future is zeros
- continue
- def decay(start_index: int, start_val: float):
- if start_val == 0:
- return
- k = matrix[y][x] / start_val # <= 1
- scale = (x + 1) * sampling - start_index
- for i in range(y * granularity, (y + 1) * granularity):
- initial = daily[i][start_index - 1]
- for j in range(start_index, (x + 1) * sampling):
- daily[i][j] = initial * (
- 1 + (k - 1) * (j - start_index + 1) / scale
- )
- def grow(finish_index: int, finish_val: float):
- initial = matrix[y][x - 1] if x > 0 else 0
- start_index = x * sampling
- if start_index < y * granularity:
- start_index = y * granularity
- if finish_index == start_index:
- return
- avg = (finish_val - initial) / (finish_index - start_index)
- for j in range(x * sampling, finish_index):
- for i in range(start_index, j + 1):
- daily[i][j] = avg
- # copy [x*g..y*s)
- for j in range(x * sampling, finish_index):
- for i in range(y * granularity, x * sampling):
- daily[i][j] = daily[i][j - 1]
- if (y + 1) * granularity >= (x + 1) * sampling:
- # x*granularity <= (y+1)*sampling
- # 1. x*granularity <= y*sampling
- # y*sampling..(y+1)sampling
- #
- # x+1
- # /
- # /
- # / y+1 -|
- # / |
- # / y -|
- # /
- # / x
- #
- # 2. x*granularity > y*sampling
- # x*granularity..(y+1)sampling
- #
- # x+1
- # /
- # /
- # / y+1 -|
- # / |
- # / x -|
- # /
- # / y
- if y * granularity <= x * sampling:
- grow((x + 1) * sampling, matrix[y][x])
- elif (x + 1) * sampling > y * granularity:
- grow((x + 1) * sampling, matrix[y][x])
- avg = matrix[y][x] / ((x + 1) * sampling - y * granularity)
- for j in range(y * granularity, (x + 1) * sampling):
- for i in range(y * granularity, j + 1):
- daily[i][j] = avg
- elif (y + 1) * granularity >= x * sampling:
- # y*sampling <= (x+1)*granularity < (y+1)sampling
- # y*sampling..(x+1)*granularity
- # (x+1)*granularity..(y+1)sampling
- # x+1
- # /\
- # / \
- # / \
- # / y+1
- # /
- # y
- v1 = matrix[y][x - 1]
- v2 = matrix[y][x]
- delta = (y + 1) * granularity - x * sampling
- previous = 0
- if x > 0 and (x - 1) * sampling >= y * granularity:
- # x*g <= (y-1)*s <= y*s <= (x+1)*g <= (y+1)*s
- # |________|.......^
- if x > 1:
- previous = matrix[y][x - 2]
- scale = sampling
- else:
- # (y-1)*s < x*g <= y*s <= (x+1)*g <= (y+1)*s
- # |______|.......^
- scale = sampling if x == 0 else x * sampling - y * granularity
- peak = v1 + (v1 - previous) / scale * delta
- if v2 > peak:
- # we need to adjust the peak, it may not be less than the decayed value
- if x < matrix.shape[1] - 1:
- # y*s <= (x+1)*g <= (y+1)*s < (y+2)*s
- # ^.........|_________|
- k = (v2 - matrix[y][x + 1]) / sampling # > 0
- peak = matrix[y][x] + k * (
- (x + 1) * sampling - (y + 1) * granularity
- )
- # peak > v2 > v1
- else:
- peak = v2
- # not enough data to interpolate; this is at least not restricted
- grow((y + 1) * granularity, peak)
- decay((y + 1) * granularity, peak)
- else:
- # (x+1)*granularity < y*sampling
- # y*sampling..(y+1)sampling
- decay(x * sampling, matrix[y][x - 1])
- return daily
- def load_burndown(
- header: Tuple[int, int, int, int, float],
- name: str,
- matrix: numpy.ndarray,
- resample: str,
- report_survival: bool = True,
- interpolation_progress: bool = False,
- ) -> Tuple[str, numpy.ndarray, 'DatetimeIndex', List[int], int, int, str]:
- pandas = import_pandas()
- start, last, sampling, granularity, tick = header
- assert sampling > 0
- assert granularity > 0
- start = floor_datetime(datetime.fromtimestamp(start), tick)
- last = datetime.fromtimestamp(last)
- if report_survival:
- kmf = fit_kaplan_meier(matrix)
- if kmf is not None:
- print_survival_function(kmf, sampling)
- finish = start + timedelta(seconds=matrix.shape[1] * sampling * tick)
- if resample not in ("no", "raw"):
- print("resampling to %s, please wait..." % resample)
- # Interpolate the day x day matrix.
- # Each day brings equal weight in the granularity.
- # Sampling's interpolation is linear.
- daily = interpolate_burndown_matrix(
- matrix=matrix,
- granularity=granularity,
- sampling=sampling,
- progress=interpolation_progress,
- )
- daily[(last - start).days :] = 0
- # Resample the bands
- aliases = {"year": "A", "month": "M"}
- resample = aliases.get(resample, resample)
- periods = 0
- date_granularity_sampling = [start]
- while date_granularity_sampling[-1] < finish:
- periods += 1
- date_granularity_sampling = pandas.date_range(
- start, periods=periods, freq=resample
- )
- if date_granularity_sampling[0] > finish:
- if resample == "A":
- print("too loose resampling - by year, trying by month")
- return load_burndown(
- header, name, matrix, "month", report_survival=False
- )
- else:
- raise ValueError("Too loose resampling: %s. Try finer." % resample)
- date_range_sampling = pandas.date_range(
- date_granularity_sampling[0],
- periods=(finish - date_granularity_sampling[0]).days,
- freq="1D",
- )
- # Fill the new square matrix
- matrix = numpy.zeros(
- (len(date_granularity_sampling), len(date_range_sampling)),
- dtype=numpy.float32,
- )
- for i, gdt in enumerate(date_granularity_sampling):
- istart = (date_granularity_sampling[i - 1] - start).days if i > 0 else 0
- ifinish = (gdt - start).days
- for j, sdt in enumerate(date_range_sampling):
- if (sdt - start).days >= istart:
- break
- matrix[i, j:] = daily[istart:ifinish, (sdt - start).days :].sum(axis=0)
- # Hardcode some cases to improve labels' readability
- if resample in ("year", "A"):
- labels = [dt.year for dt in date_granularity_sampling]
- elif resample in ("month", "M"):
- labels = [dt.strftime("%Y %B") for dt in date_granularity_sampling]
- else:
- labels = [dt.date() for dt in date_granularity_sampling]
- else:
- labels = [
- "%s - %s"
- % (
- (start + timedelta(seconds=i * granularity * tick)).date(),
- (start + timedelta(seconds=(i + 1) * granularity * tick)).date(),
- )
- for i in range(matrix.shape[0])
- ]
- if len(labels) > 18:
- warnings.warn("Too many labels - consider resampling.")
- resample = "M" # fake resampling type is checked while plotting
- date_range_sampling = pandas.date_range(
- start + timedelta(seconds=sampling * tick),
- periods=matrix.shape[1],
- freq="%dD" % sampling,
- )
- return name, matrix, date_range_sampling, labels, granularity, sampling, resample
|