123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- from argparse import Namespace
- from collections import defaultdict
- from datetime import datetime, timedelta
- import sys
- from typing import Dict, List, Set, Tuple
- import numpy
- import tqdm
- from labours.objects import DevDay
- from labours.plotting import apply_plot_style, deploy_plot, get_plot_path, import_pyplot
- from labours.utils import _format_number
- def show_devs(
- args: Namespace,
- name: str,
- start_date: int,
- end_date: int,
- people: List[str],
- days: Dict[int, Dict[int, DevDay]],
- max_people: int = 50,
- ) -> None:
- from scipy.signal import convolve, slepian
- if len(people) > max_people:
- print("Picking top %s developers by commit count" % max_people)
- # pick top N developers by commit count
- commits = defaultdict(int)
- for devs in days.values():
- for dev, stats in devs.items():
- commits[dev] += stats.Commits
- commits = sorted(((v, k) for k, v in commits.items()), reverse=True)
- chosen_people = {people[k] for _, k in commits[:max_people]}
- else:
- chosen_people = set(people)
- dists, devseries, devstats, route = order_commits(chosen_people, days, people)
- route_map = {v: i for i, v in enumerate(route)}
- # determine clusters
- clusters = hdbscan_cluster_routed_series(dists, route)
- keys = list(devseries.keys())
- route = [keys[node] for node in route]
- print("Plotting")
- # smooth time series
- start_date = datetime.fromtimestamp(start_date)
- start_date = datetime(start_date.year, start_date.month, start_date.day)
- end_date = datetime.fromtimestamp(end_date)
- end_date = datetime(end_date.year, end_date.month, end_date.day)
- size = (end_date - start_date).days + 1
- plot_x = [start_date + timedelta(days=i) for i in range(size)]
- resolution = 64
- window = slepian(size // resolution, 0.5)
- final = numpy.zeros((len(devseries), size), dtype=numpy.float32)
- for i, s in enumerate(devseries.values()):
- arr = numpy.array(s).transpose()
- full_history = numpy.zeros(size, dtype=numpy.float32)
- mask = arr[0] < size
- full_history[arr[0][mask]] = arr[1][mask]
- final[route_map[i]] = convolve(full_history, window, "same")
- matplotlib, pyplot = import_pyplot(args.backend, args.style)
- pyplot.rcParams["figure.figsize"] = (32, 16)
- pyplot.rcParams["font.size"] = args.font_size
- prop_cycle = pyplot.rcParams["axes.prop_cycle"]
- colors = prop_cycle.by_key()["color"]
- fig, axes = pyplot.subplots(final.shape[0], 1)
- backgrounds = (
- ("#C4FFDB", "#FFD0CD") if args.background == "white" else ("#05401C", "#40110E")
- )
- max_cluster = numpy.max(clusters)
- for ax, series, cluster, dev_i in zip(axes, final, clusters, route):
- if cluster >= 0:
- color = colors[cluster % len(colors)]
- i = 1
- while color == "#777777":
- color = colors[(max_cluster + i) % len(colors)]
- i += 1
- else:
- # outlier
- color = "#777777"
- ax.fill_between(plot_x, series, color=color)
- ax.set_axis_off()
- author = people[dev_i]
- ax.text(
- 0.03,
- 0.5,
- author[:36] + (author[36:] and "..."),
- horizontalalignment="right",
- verticalalignment="center",
- transform=ax.transAxes,
- fontsize=args.font_size,
- color="black" if args.background == "white" else "white",
- )
- ds = devstats[dev_i]
- stats = "%5d %8s %8s" % (
- ds[0],
- _format_number(ds[1] - ds[2]),
- _format_number(ds[3]),
- )
- ax.text(
- 0.97,
- 0.5,
- stats,
- horizontalalignment="left",
- verticalalignment="center",
- transform=ax.transAxes,
- fontsize=args.font_size,
- family="monospace",
- backgroundcolor=backgrounds[ds[1] <= ds[2]],
- color="black" if args.background == "white" else "white",
- )
- axes[0].text(
- 0.97,
- 1.75,
- " cmts delta changed",
- horizontalalignment="left",
- verticalalignment="center",
- transform=axes[0].transAxes,
- fontsize=args.font_size,
- family="monospace",
- color="black" if args.background == "white" else "white",
- )
- axes[-1].set_axis_on()
- target_num_labels = 12
- num_months = (
- (end_date.year - start_date.year) * 12 + end_date.month - start_date.month
- )
- interval = int(numpy.ceil(num_months / target_num_labels))
- if interval >= 8:
- interval = int(numpy.ceil(num_months / (12 * target_num_labels)))
- axes[-1].xaxis.set_major_locator(
- matplotlib.dates.YearLocator(base=max(1, interval // 12))
- )
- axes[-1].xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%Y"))
- else:
- axes[-1].xaxis.set_major_locator(
- matplotlib.dates.MonthLocator(interval=interval)
- )
- axes[-1].xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%Y-%m"))
- for tick in axes[-1].xaxis.get_major_ticks():
- tick.label.set_fontsize(args.font_size)
- axes[-1].spines["left"].set_visible(False)
- axes[-1].spines["right"].set_visible(False)
- axes[-1].spines["top"].set_visible(False)
- axes[-1].get_yaxis().set_visible(False)
- axes[-1].set_facecolor((1.0,) * 3 + (0.0,))
- title = ("%s commits" % name) if not args.output else ""
- if args.mode == "all" and args.output:
- output = get_plot_path(args.output, "time_series")
- else:
- output = args.output
- deploy_plot(title, output, args.background)
- def order_commits(
- chosen_people: Set[str], days: Dict[int, Dict[int, DevDay]], people: List[str]
- ) -> Tuple[numpy.ndarray, defaultdict, defaultdict, List[int]]:
- from seriate import seriate
- try:
- from fastdtw import fastdtw
- except ImportError as e:
- print(
- "Cannot import fastdtw: %s\nInstall it from https://github.com/slaypni/fastdtw"
- % e
- )
- sys.exit(1)
- # FIXME(vmarkovtsev): remove once https://github.com/slaypni/fastdtw/pull/28 is merged&released
- try:
- sys.modules[
- "fastdtw.fastdtw"
- ].__norm = lambda p: lambda a, b: numpy.linalg.norm(
- numpy.atleast_1d(a) - numpy.atleast_1d(b), p
- )
- except KeyError:
- # the native extension does not have this bug
- pass
- devseries = defaultdict(list)
- devstats = defaultdict(lambda: DevDay(0, 0, 0, 0, {}))
- for day, devs in sorted(days.items()):
- for dev, stats in devs.items():
- if people[dev] in chosen_people:
- devseries[dev].append((day, stats.Commits))
- devstats[dev] = devstats[dev].add(stats)
- print("Calculating the distance matrix")
- # max-normalize the time series using a sliding window
- series = list(devseries.values())
- for i, s in enumerate(series):
- arr = numpy.array(s).transpose().astype(numpy.float32)
- arr[1] /= arr[1].sum()
- series[i] = arr.transpose()
- # calculate the distance matrix using dynamic time warping
- dists = numpy.full((len(series),) * 2, -100500, dtype=numpy.float32)
- # TODO: what's the total for this progress bar?
- with tqdm.tqdm() as pb:
- for x, serx in enumerate(series):
- dists[x, x] = 0
- for y, sery in enumerate(series[x + 1 :], start=x + 1):
- min_day = int(min(serx[0][0], sery[0][0]))
- max_day = int(max(serx[-1][0], sery[-1][0]))
- arrx = numpy.zeros(max_day - min_day + 1, dtype=numpy.float32)
- arry = numpy.zeros_like(arrx)
- arrx[serx[:, 0].astype(int) - min_day] = serx[:, 1]
- arry[sery[:, 0].astype(int) - min_day] = sery[:, 1]
- # L1 norm
- dist, _ = fastdtw(arrx, arry, radius=5, dist=1)
- dists[x, y] = dists[y, x] = dist
- pb.update()
- print("Ordering the series")
- route = seriate(dists)
- return dists, devseries, devstats, route
- def hdbscan_cluster_routed_series(
- dists: numpy.ndarray, route: List[int]
- ) -> numpy.ndarray:
- try:
- from hdbscan import HDBSCAN
- except ImportError as e:
- print("Cannot import hdbscan: %s" % e)
- sys.exit(1)
- opt_dist_chain = numpy.cumsum(
- numpy.array(
- [0] + [dists[route[i], route[i + 1]] for i in range(len(route) - 1)]
- )
- )
- clusters = HDBSCAN(min_cluster_size=2).fit_predict(opt_dist_chain[:, numpy.newaxis])
- return clusters
- def show_devs_efforts(
- args: Namespace,
- name: str,
- start_date: int,
- end_date: int,
- people: List[str],
- days: Dict[int, Dict[int, DevDay]],
- max_people: int,
- ) -> None:
- from scipy.signal import convolve, slepian
- start_date = datetime.fromtimestamp(start_date)
- start_date = datetime(start_date.year, start_date.month, start_date.day)
- end_date = datetime.fromtimestamp(end_date)
- end_date = datetime(end_date.year, end_date.month, end_date.day)
- efforts_by_dev = defaultdict(int)
- for day, devs in days.items():
- for dev, stats in devs.items():
- efforts_by_dev[dev] += stats.Added + stats.Removed + stats.Changed
- if len(efforts_by_dev) > max_people:
- chosen = {
- v
- for k, v in sorted(
- ((v, k) for k, v in efforts_by_dev.items()), reverse=True
- )[:max_people]
- }
- print("Warning: truncated people to the most active %d" % max_people)
- else:
- chosen = set(efforts_by_dev)
- chosen_efforts = sorted(((efforts_by_dev[k], k) for k in chosen), reverse=True)
- chosen_order = {k: i for i, (_, k) in enumerate(chosen_efforts)}
- efforts = numpy.zeros(
- (len(chosen) + 1, (end_date - start_date).days + 1), dtype=numpy.float32
- )
- for day, devs in days.items():
- if day < efforts.shape[1]:
- for dev, stats in devs.items():
- dev = chosen_order.get(dev, len(chosen_order))
- efforts[dev][day] += stats.Added + stats.Removed + stats.Changed
- efforts_cum = numpy.cumsum(efforts, axis=1)
- window = slepian(10, 0.5)
- window /= window.sum()
- for e in (efforts, efforts_cum):
- for i in range(e.shape[0]):
- ending = e[i][-len(window) * 2 :].copy()
- e[i] = convolve(e[i], window, "same")
- e[i][-len(ending) :] = ending
- matplotlib, pyplot = import_pyplot(args.backend, args.style)
- plot_x = [start_date + timedelta(days=i) for i in range(efforts.shape[1])]
- people = [people[k] for _, k in chosen_efforts] + ["others"]
- for i, name in enumerate(people):
- if len(name) > 40:
- people[i] = name[:37] + "..."
- polys = pyplot.stackplot(plot_x, efforts_cum, labels=people)
- if len(polys) == max_people + 1:
- polys[-1].set_hatch("/")
- polys = pyplot.stackplot(plot_x, -efforts * efforts_cum.max() / efforts.max())
- if len(polys) == max_people + 1:
- polys[-1].set_hatch("/")
- yticks = []
- for tick in pyplot.gca().yaxis.iter_ticks():
- if tick[1] >= 0:
- yticks.append(tick[1])
- pyplot.gca().yaxis.set_ticks(yticks)
- legend = pyplot.legend(loc=2, ncol=2, fontsize=args.font_size)
- apply_plot_style(
- pyplot.gcf(),
- pyplot.gca(),
- legend,
- args.background,
- args.font_size,
- args.size or "16,10",
- )
- if args.mode == "all" and args.output:
- output = get_plot_path(args.output, "efforts")
- else:
- output = args.output
- deploy_plot("Efforts through time (changed lines of code)", output, args.background)
|