123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- from argparse import Namespace
- from importlib import import_module
- import re
- import sys
- from typing import Any, Dict, List, Tuple, TYPE_CHECKING
- import numpy
- import yaml
- from labours.objects import DevDay
- if TYPE_CHECKING:
- from scipy.sparse.csr import csr_matrix
- class Reader(object):
- def read(self, file):
- raise NotImplementedError
- def get_name(self):
- raise NotImplementedError
- def get_header(self):
- raise NotImplementedError
- def get_burndown_parameters(self):
- raise NotImplementedError
- def get_project_burndown(self):
- raise NotImplementedError
- def get_files_burndown(self):
- raise NotImplementedError
- def get_people_burndown(self):
- raise NotImplementedError
- def get_ownership_burndown(self):
- raise NotImplementedError
- def get_people_interaction(self):
- raise NotImplementedError
- def get_files_coocc(self):
- raise NotImplementedError
- def get_people_coocc(self):
- raise NotImplementedError
- def get_shotness_coocc(self):
- raise NotImplementedError
- def get_shotness(self):
- raise NotImplementedError
- def get_sentiment(self):
- raise NotImplementedError
- def get_devs(self):
- raise NotImplementedError
- class YamlReader(Reader):
- def read(self, file: str):
- yaml.reader.Reader.NON_PRINTABLE = re.compile(r"(?!x)x")
- try:
- loader = yaml.CLoader
- except AttributeError:
- print(
- "Warning: failed to import yaml.CLoader, falling back to slow yaml.Loader"
- )
- loader = yaml.Loader
- try:
- if file != "-":
- with open(file) as fin:
- data = yaml.load(fin, Loader=loader)
- else:
- data = yaml.load(sys.stdin, Loader=loader)
- except (UnicodeEncodeError, yaml.reader.ReaderError) as e:
- print(
- "\nInvalid unicode in the input: %s\nPlease filter it through "
- "fix_yaml_unicode.py" % e
- )
- sys.exit(1)
- if data is None:
- print("\nNo data has been read - has Hercules crashed?")
- sys.exit(1)
- self.data = data
- def get_run_times(self):
- return {}
- def get_name(self):
- return self.data["hercules"]["repository"]
- def get_header(self):
- header = self.data["hercules"]
- return header["begin_unix_time"], header["end_unix_time"]
- def get_burndown_parameters(self):
- header = self.data["Burndown"]
- return header["sampling"], header["granularity"], header["tick_size"]
- def get_project_burndown(self):
- return (
- self.data["hercules"]["repository"],
- self._parse_burndown_matrix(self.data["Burndown"]["project"]).T,
- )
- def get_files_burndown(self):
- return [
- (p[0], self._parse_burndown_matrix(p[1]).T)
- for p in self.data["Burndown"]["files"].items()
- ]
- def get_people_burndown(self):
- return [
- (p[0], self._parse_burndown_matrix(p[1]).T)
- for p in self.data["Burndown"]["people"].items()
- ]
- def get_ownership_burndown(self):
- return (
- self.data["Burndown"]["people_sequence"].copy(),
- {
- p[0]: self._parse_burndown_matrix(p[1])
- for p in self.data["Burndown"]["people"].items()
- },
- )
- def get_people_interaction(self):
- return (
- self.data["Burndown"]["people_sequence"].copy(),
- self._parse_burndown_matrix(self.data["Burndown"]["people_interaction"]),
- )
- def get_files_coocc(self):
- coocc = self.data["Couples"]["files_coocc"]
- return coocc["index"], self._parse_coocc_matrix(coocc["matrix"])
- def get_people_coocc(self):
- coocc = self.data["Couples"]["people_coocc"]
- return coocc["index"], self._parse_coocc_matrix(coocc["matrix"])
- def get_shotness_coocc(self):
- shotness = self.data["Shotness"]
- index = ["%s:%s" % (i["file"], i["name"]) for i in shotness]
- indptr = numpy.zeros(len(shotness) + 1, dtype=numpy.int64)
- indices = []
- data = []
- for i, record in enumerate(shotness):
- pairs = [(int(k), v) for k, v in record["counters"].items()]
- pairs.sort()
- indptr[i + 1] = indptr[i] + len(pairs)
- for k, v in pairs:
- indices.append(k)
- data.append(v)
- indices = numpy.array(indices, dtype=numpy.int32)
- data = numpy.array(data, dtype=numpy.int32)
- from scipy.sparse import csr_matrix
- return index, csr_matrix((data, indices, indptr), shape=(len(shotness),) * 2)
- def get_shotness(self):
- from munch import munchify
- obj = munchify(self.data["Shotness"])
- # turn strings into ints
- for item in obj:
- item.counters = {int(k): v for k, v in item.counters.items()}
- if len(obj) == 0:
- raise KeyError
- return obj
- def get_sentiment(self):
- from munch import munchify
- return munchify(
- {
- int(key): {
- "Comments": vals[2].split("|"),
- "Commits": vals[1],
- "Value": float(vals[0]),
- }
- for key, vals in self.data["Sentiment"].items()
- }
- )
- def get_devs(self):
- people = self.data["Devs"]["people"]
- days = {
- int(d): {
- int(dev): DevDay(*(int(x) for x in day[:-1]), day[-1])
- for dev, day in devs.items()
- }
- for d, devs in self.data["Devs"]["ticks"].items()
- }
- return people, days
- def _parse_burndown_matrix(self, matrix):
- return numpy.array(
- [numpy.fromstring(line, dtype=int, sep=" ") for line in matrix.split("\n")]
- )
- def _parse_coocc_matrix(self, matrix):
- from scipy.sparse import csr_matrix
- data = []
- indices = []
- indptr = [0]
- for row in matrix:
- for k, v in sorted(row.items()):
- data.append(v)
- indices.append(k)
- indptr.append(indptr[-1] + len(row))
- return csr_matrix((data, indices, indptr), shape=(len(matrix),) * 2)
- class ProtobufReader(Reader):
- def read(self, file: str) -> None:
- try:
- from labours.pb_pb2 import AnalysisResults
- except ImportError as e:
- print(
- "\n\n>>> You need to generate python/hercules/pb/pb_pb2.py - run \"make\"\n",
- file=sys.stderr,
- )
- raise e from None
- self.data = AnalysisResults()
- if file != "-":
- with open(file, "rb") as fin:
- bytes = fin.read()
- else:
- bytes = sys.stdin.buffer.read()
- if not bytes:
- raise ValueError("empty input")
- self.data.ParseFromString(bytes)
- self.contents = {}
- for key, val in self.data.contents.items():
- try:
- mod, name = PB_MESSAGES[key].rsplit(".", 1)
- except KeyError:
- sys.stderr.write(
- "Warning: there is no registered PB decoder for %s\n" % key
- )
- continue
- cls = getattr(import_module(mod), name)
- self.contents[key] = msg = cls()
- msg.ParseFromString(val)
- def get_run_times(self):
- return {key: val for key, val in self.data.header.run_time_per_item.items()}
- def get_name(self) -> str:
- return self.data.header.repository
- def get_header(self) -> Tuple[int, int]:
- header = self.data.header
- return header.begin_unix_time, header.end_unix_time
- def get_burndown_parameters(self) -> Tuple[int, int, float]:
- burndown = self.contents["Burndown"]
- return burndown.sampling, burndown.granularity, burndown.tick_size / 1000000000
- def get_project_burndown(self) -> Tuple[str, numpy.ndarray]:
- return self._parse_burndown_matrix(self.contents["Burndown"].project)
- def get_files_burndown(self):
- return [self._parse_burndown_matrix(i) for i in self.contents["Burndown"].files]
- def get_people_burndown(self) -> List[Any]:
- return [
- self._parse_burndown_matrix(i) for i in self.contents["Burndown"].people
- ]
- def get_ownership_burndown(self) -> Tuple[List[Any], Dict[Any, Any]]:
- people = self.get_people_burndown()
- return [p[0] for p in people], {p[0]: p[1].T for p in people}
- def get_people_interaction(self):
- burndown = self.contents["Burndown"]
- return (
- [i.name for i in burndown.people],
- self._parse_sparse_matrix(burndown.people_interaction).toarray(),
- )
- def get_files_coocc(self) -> Tuple[List[str], 'csr_matrix']:
- node = self.contents["Couples"].file_couples
- return list(node.index), self._parse_sparse_matrix(node.matrix)
- def get_people_coocc(self) -> Tuple[List[str], 'csr_matrix']:
- node = self.contents["Couples"].people_couples
- return list(node.index), self._parse_sparse_matrix(node.matrix)
- def get_shotness_coocc(self):
- shotness = self.get_shotness()
- index = ["%s:%s" % (i.file, i.name) for i in shotness]
- indptr = numpy.zeros(len(shotness) + 1, dtype=numpy.int32)
- indices = []
- data = []
- for i, record in enumerate(shotness):
- pairs = list(record.counters.items())
- pairs.sort()
- indptr[i + 1] = indptr[i] + len(pairs)
- for k, v in pairs:
- indices.append(k)
- data.append(v)
- indices = numpy.array(indices, dtype=numpy.int32)
- data = numpy.array(data, dtype=numpy.int32)
- from scipy.sparse import csr_matrix
- return index, csr_matrix((data, indices, indptr), shape=(len(shotness),) * 2)
- def get_shotness(self):
- records = self.contents["Shotness"].records
- if len(records) == 0:
- raise KeyError
- return records
- def get_sentiment(self):
- byday = self.contents["Sentiment"].SentimentByDay
- if len(byday) == 0:
- raise KeyError
- return byday
- def get_devs(self) -> Tuple[List[str], Dict[int, Dict[int, DevDay]]]:
- people = list(self.contents["Devs"].dev_index)
- days = {
- d: {
- dev: DevDay(
- stats.commits,
- stats.stats.added,
- stats.stats.removed,
- stats.stats.changed,
- {
- k: [v.added, v.removed, v.changed]
- for k, v in stats.languages.items()
- },
- )
- for dev, stats in day.devs.items()
- }
- for d, day in self.contents["Devs"].ticks.items()
- }
- return people, days
- def _parse_burndown_matrix(self, matrix):
- dense = numpy.zeros(
- (matrix.number_of_rows, matrix.number_of_columns), dtype=int
- )
- for y, row in enumerate(matrix.rows):
- for x, col in enumerate(row.columns):
- dense[y, x] = col
- return matrix.name, dense.T
- def _parse_sparse_matrix(self, matrix):
- from scipy.sparse import csr_matrix
- return csr_matrix(
- (list(matrix.data), list(matrix.indices), list(matrix.indptr)),
- shape=(matrix.number_of_rows, matrix.number_of_columns),
- )
- READERS = {"yaml": YamlReader, "yml": YamlReader, "pb": ProtobufReader}
- PB_MESSAGES = {
- "Burndown": "labours.pb_pb2.BurndownAnalysisResults",
- "Couples": "labours.pb_pb2.CouplesAnalysisResults",
- "Shotness": "labours.pb_pb2.ShotnessAnalysisResults",
- "Devs": "labours.pb_pb2.DevsAnalysisResults",
- }
- def read_input(args: Namespace) -> ProtobufReader:
- sys.stdout.write("Reading the input... ")
- sys.stdout.flush()
- if args.input != "-":
- if args.input_format == "auto":
- try:
- args.input_format = args.input.rsplit(".", 1)[1]
- except IndexError:
- try:
- with open(args.input) as f:
- f.read(1 << 16)
- args.input_format = "yaml"
- except UnicodeDecodeError:
- args.input_format = "pb"
- elif args.input_format == "auto":
- args.input_format = "yaml"
- reader = READERS[args.input_format]()
- reader.read(args.input)
- print("done")
- return reader
|