| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 | from argparse import Namespaceimport contextlibfrom datetime import datetime, timedeltaimport ioimport jsonimport sysfrom typing import List, Tuple, TYPE_CHECKINGimport warningsimport numpyimport tqdmfrom labours.plotting import apply_plot_style, deploy_plot, get_plot_path, import_pyplotfrom labours.utils import default_json, floor_datetime, import_pandas, parse_dateif TYPE_CHECKING:    from lifelines import KaplanMeierFitter    from pandas.core.indexes.datetimes import DatetimeIndexdef plot_burndown(    args: Namespace,    target: str,    name: str,    matrix: numpy.ndarray,    date_range_sampling: 'DatetimeIndex',    labels: List[int],    granularity: int,    sampling: int,    resample: str,) -> None:    if args.output and args.output.endswith(".json"):        data = locals().copy()        del data["args"]        data["type"] = "burndown"        if args.mode == "project" and target == "project":            output = args.output        else:            if target == "project":                name = "project"            output = get_plot_path(args.output, name)        with open(output, "w") as fout:            json.dump(data, fout, sort_keys=True, default=default_json)        return    matplotlib, pyplot = import_pyplot(args.backend, args.style)    pyplot.stackplot(date_range_sampling, matrix, labels=labels)    if args.relative:        for i in range(matrix.shape[1]):            matrix[:, i] /= matrix[:, i].sum()        pyplot.ylim(0, 1)        legend_loc = 3    else:        legend_loc = 2    legend = pyplot.legend(loc=legend_loc, fontsize=args.font_size)    pyplot.ylabel("Lines of code")    pyplot.xlabel("Time")    apply_plot_style(        pyplot.gcf(), pyplot.gca(), legend, args.background, args.font_size, args.size    )    pyplot.xlim(        parse_date(args.start_date, date_range_sampling[0]),        parse_date(args.end_date, date_range_sampling[-1]),    )    locator = pyplot.gca().xaxis.get_major_locator()    # set the optimal xticks locator    if "M" not in resample:        pyplot.gca().xaxis.set_major_locator(matplotlib.dates.YearLocator())    locs = pyplot.gca().get_xticks().tolist()    if len(locs) >= 16:        pyplot.gca().xaxis.set_major_locator(matplotlib.dates.YearLocator())        locs = pyplot.gca().get_xticks().tolist()        if len(locs) >= 16:            pyplot.gca().xaxis.set_major_locator(locator)    if locs[0] < pyplot.xlim()[0]:        del locs[0]    endindex = -1    if len(locs) >= 2 and pyplot.xlim()[1] - locs[-1] > (locs[-1] - locs[-2]) / 2:        locs.append(pyplot.xlim()[1])        endindex = len(locs) - 1    startindex = -1    if len(locs) >= 2 and locs[0] - pyplot.xlim()[0] > (locs[1] - locs[0]) / 2:        locs.append(pyplot.xlim()[0])        startindex = len(locs) - 1    pyplot.gca().set_xticks(locs)    # hacking time!    labels = pyplot.gca().get_xticklabels()    if startindex >= 0:        labels[startindex].set_text(date_range_sampling[0].date())        labels[startindex].set_text = lambda _: None        labels[startindex].set_rotation(30)        labels[startindex].set_ha("right")    if endindex >= 0:        labels[endindex].set_text(date_range_sampling[-1].date())        labels[endindex].set_text = lambda _: None        labels[endindex].set_rotation(30)        labels[endindex].set_ha("right")    title = "%s %d x %d (granularity %d, sampling %d)" % (        (name,) + matrix.shape + (granularity, sampling)    )    output = args.output    if output:        if args.mode == "project" and target == "project":            output = args.output        else:            if target == "project":                name = "project"            output = get_plot_path(args.output, name)    deploy_plot(title, output, args.background)def plot_many_burndown(args: Namespace, target: str, header, parts):    if not args.output:        print("Warning: output not set, showing %d plots." % len(parts))    stdout = io.StringIO()    for name, matrix in tqdm.tqdm(parts):        with contextlib.redirect_stdout(stdout):            plot_burndown(                args, target, *load_burndown(header, name, matrix, args.resample)            )    sys.stdout.write(stdout.getvalue())def fit_kaplan_meier(matrix: numpy.ndarray) -> 'KaplanMeierFitter':    from lifelines import KaplanMeierFitter    T = []    W = []    indexes = numpy.arange(matrix.shape[0], dtype=int)    entries = numpy.zeros(matrix.shape[0], int)    dead = set()    for i in range(1, matrix.shape[1]):        diff = matrix[:, i - 1] - matrix[:, i]        entries[diff < 0] = i        mask = diff > 0        deaths = diff[mask]        T.append(numpy.full(len(deaths), i) - entries[indexes[mask]])        W.append(deaths)        entered = entries > 0        entered[0] = True        dead = dead.union(set(numpy.where((matrix[:, i] == 0) & entered)[0]))    # add the survivors as censored    nnzind = entries != 0    nnzind[0] = True    nnzind[sorted(dead)] = False    T.append(numpy.full(nnzind.sum(), matrix.shape[1]) - entries[nnzind])    W.append(matrix[nnzind, -1])    T = numpy.concatenate(T)    E = numpy.ones(len(T), bool)    E[-nnzind.sum() :] = 0    W = numpy.concatenate(W)    if T.size == 0:        return None    kmf = KaplanMeierFitter().fit(T, E, weights=W)    return kmfdef print_survival_function(kmf: 'KaplanMeierFitter', sampling: int) -> None:    sf = kmf.survival_function_    sf.index = [timedelta(days=d) for d in sf.index * sampling]    sf.columns = ["Ratio of survived lines"]    try:        print(sf[len(sf) // 6 :: len(sf) // 6].append(sf.tail(1)))    except ValueError:        passdef interpolate_burndown_matrix(    matrix: numpy.ndarray, granularity: int, sampling: int, progress: bool = False) -> numpy.ndarray:    daily = numpy.zeros(        (matrix.shape[0] * granularity, matrix.shape[1] * sampling), dtype=numpy.float32    )    """    ----------> samples, x    |    |    |    ⌄    bands, y    """    for y in tqdm.tqdm(range(matrix.shape[0]), disable=(not progress)):        for x in range(matrix.shape[1]):            if y * granularity > (x + 1) * sampling:                # the future is zeros                continue            def decay(start_index: int, start_val: float):                if start_val == 0:                    return                k = matrix[y][x] / start_val  # <= 1                scale = (x + 1) * sampling - start_index                for i in range(y * granularity, (y + 1) * granularity):                    initial = daily[i][start_index - 1]                    for j in range(start_index, (x + 1) * sampling):                        daily[i][j] = initial * (                            1 + (k - 1) * (j - start_index + 1) / scale                        )            def grow(finish_index: int, finish_val: float):                initial = matrix[y][x - 1] if x > 0 else 0                start_index = x * sampling                if start_index < y * granularity:                    start_index = y * granularity                if finish_index == start_index:                    return                avg = (finish_val - initial) / (finish_index - start_index)                for j in range(x * sampling, finish_index):                    for i in range(start_index, j + 1):                        daily[i][j] = avg                # copy [x*g..y*s)                for j in range(x * sampling, finish_index):                    for i in range(y * granularity, x * sampling):                        daily[i][j] = daily[i][j - 1]            if (y + 1) * granularity >= (x + 1) * sampling:                # x*granularity <= (y+1)*sampling                # 1. x*granularity <= y*sampling                #    y*sampling..(y+1)sampling                #                #       x+1                #        /                #       /                #      / y+1  -|                #     /        |                #    / y      -|                #   /                #  / x                #                # 2. x*granularity > y*sampling                #    x*granularity..(y+1)sampling                #                #       x+1                #        /                #       /                #      / y+1  -|                #     /        |                #    / x      -|                #   /                #  / y                if y * granularity <= x * sampling:                    grow((x + 1) * sampling, matrix[y][x])                elif (x + 1) * sampling > y * granularity:                    grow((x + 1) * sampling, matrix[y][x])                    avg = matrix[y][x] / ((x + 1) * sampling - y * granularity)                    for j in range(y * granularity, (x + 1) * sampling):                        for i in range(y * granularity, j + 1):                            daily[i][j] = avg            elif (y + 1) * granularity >= x * sampling:                # y*sampling <= (x+1)*granularity < (y+1)sampling                # y*sampling..(x+1)*granularity                # (x+1)*granularity..(y+1)sampling                #        x+1                #         /\                #        /  \                #       /    \                #      /    y+1                #     /                #    y                v1 = matrix[y][x - 1]                v2 = matrix[y][x]                delta = (y + 1) * granularity - x * sampling                previous = 0                if x > 0 and (x - 1) * sampling >= y * granularity:                    # x*g <= (y-1)*s <= y*s <= (x+1)*g <= (y+1)*s                    #           |________|.......^                    if x > 1:                        previous = matrix[y][x - 2]                    scale = sampling                else:                    # (y-1)*s < x*g <= y*s <= (x+1)*g <= (y+1)*s                    #            |______|.......^                    scale = sampling if x == 0 else x * sampling - y * granularity                peak = v1 + (v1 - previous) / scale * delta                if v2 > peak:                    # we need to adjust the peak, it may not be less than the decayed value                    if x < matrix.shape[1] - 1:                        # y*s <= (x+1)*g <= (y+1)*s < (y+2)*s                        #           ^.........|_________|                        k = (v2 - matrix[y][x + 1]) / sampling  # > 0                        peak = matrix[y][x] + k * (                            (x + 1) * sampling - (y + 1) * granularity                        )                        # peak > v2 > v1                    else:                        peak = v2                        # not enough data to interpolate; this is at least not restricted                grow((y + 1) * granularity, peak)                decay((y + 1) * granularity, peak)            else:                # (x+1)*granularity < y*sampling                # y*sampling..(y+1)sampling                decay(x * sampling, matrix[y][x - 1])    return dailydef load_burndown(    header: Tuple[int, int, int, int, float],    name: str,    matrix: numpy.ndarray,    resample: str,    report_survival: bool = True,    interpolation_progress: bool = False,) -> Tuple[str, numpy.ndarray, 'DatetimeIndex', List[int], int, int, str]:    pandas = import_pandas()    start, last, sampling, granularity, tick = header    assert sampling > 0    assert granularity > 0    start = floor_datetime(datetime.fromtimestamp(start), tick)    last = datetime.fromtimestamp(last)    if report_survival:        kmf = fit_kaplan_meier(matrix)        if kmf is not None:            print_survival_function(kmf, sampling)    finish = start + timedelta(seconds=matrix.shape[1] * sampling * tick)    if resample not in ("no", "raw"):        print("resampling to %s, please wait..." % resample)        # Interpolate the day x day matrix.        # Each day brings equal weight in the granularity.        # Sampling's interpolation is linear.        daily = interpolate_burndown_matrix(            matrix=matrix,            granularity=granularity,            sampling=sampling,            progress=interpolation_progress,        )        daily[(last - start).days :] = 0        # Resample the bands        aliases = {"year": "A", "month": "M"}        resample = aliases.get(resample, resample)        periods = 0        date_granularity_sampling = [start]        while date_granularity_sampling[-1] < finish:            periods += 1            date_granularity_sampling = pandas.date_range(                start, periods=periods, freq=resample            )        if date_granularity_sampling[0] > finish:            if resample == "A":                print("too loose resampling - by year, trying by month")                return load_burndown(                    header, name, matrix, "month", report_survival=False                )            else:                raise ValueError("Too loose resampling: %s. Try finer." % resample)        date_range_sampling = pandas.date_range(            date_granularity_sampling[0],            periods=(finish - date_granularity_sampling[0]).days,            freq="1D",        )        # Fill the new square matrix        matrix = numpy.zeros(            (len(date_granularity_sampling), len(date_range_sampling)),            dtype=numpy.float32,        )        for i, gdt in enumerate(date_granularity_sampling):            istart = (date_granularity_sampling[i - 1] - start).days if i > 0 else 0            ifinish = (gdt - start).days            for j, sdt in enumerate(date_range_sampling):                if (sdt - start).days >= istart:                    break            matrix[i, j:] = daily[istart:ifinish, (sdt - start).days :].sum(axis=0)        # Hardcode some cases to improve labels' readability        if resample in ("year", "A"):            labels = [dt.year for dt in date_granularity_sampling]        elif resample in ("month", "M"):            labels = [dt.strftime("%Y %B") for dt in date_granularity_sampling]        else:            labels = [dt.date() for dt in date_granularity_sampling]    else:        labels = [            "%s - %s"            % (                (start + timedelta(seconds=i * granularity * tick)).date(),                (start + timedelta(seconds=(i + 1) * granularity * tick)).date(),            )            for i in range(matrix.shape[0])        ]        if len(labels) > 18:            warnings.warn("Too many labels - consider resampling.")        resample = "M"  # fake resampling type is checked while plotting        date_range_sampling = pandas.date_range(            start + timedelta(seconds=sampling * tick),            periods=matrix.shape[1],            freq="%dD" % sampling,        )    return name, matrix, date_range_sampling, labels, granularity, sampling, resample
 |