瀏覽代碼

Merge pull request #31 from vmarkovtsev/master

[WIP] Implement burndown merges
Vadim Markovtsev 7 年之前
父節點
當前提交
690906f553
共有 7 個文件被更改,包括 962 次插入60 次删除
  1. 436 23
      burndown.go
  2. 439 2
      burndown_test.go
  3. 19 18
      cmd/hercules-combine/main.go
  4. 6 7
      labours.py
  5. 3 0
      pb/utils.go
  6. 32 9
      pipeline.go
  7. 27 1
      pipeline_test.go

+ 436 - 23
burndown.go

@@ -6,6 +6,7 @@ import (
 	"io"
 	"os"
 	"sort"
+	"sync"
 	"unicode/utf8"
 
 	"github.com/gogo/protobuf/proto"
@@ -68,18 +69,22 @@ type BurndownAnalysis struct {
 }
 
 type BurndownResult struct {
-	GlobalHistory   [][]int64
-	FileHistories   map[string][][]int64
-	PeopleHistories [][][]int64
-	PeopleMatrix    [][]int64
+	GlobalHistory      [][]int64
+	FileHistories      map[string][][]int64
+	PeopleHistories    [][][]int64
+	PeopleMatrix       [][]int64
+	reversedPeopleDict []string
+	sampling           int
+	granularity        int
 }
 
 const (
-	ConfigBurndownGranularity = "Burndown.Granularity"
-	ConfigBurndownSampling    = "Burndown.Sampling"
-	ConfigBurndownTrackFiles  = "Burndown.TrackFiles"
-	ConfigBurndownTrackPeople = "Burndown.TrackPeople"
-	ConfigBurndownDebug       = "Burndown.Debug"
+	ConfigBurndownGranularity  = "Burndown.Granularity"
+	ConfigBurndownSampling     = "Burndown.Sampling"
+	ConfigBurndownTrackFiles   = "Burndown.TrackFiles"
+	ConfigBurndownTrackPeople  = "Burndown.TrackPeople"
+	ConfigBurndownDebug        = "Burndown.Debug"
+	DefaultBurndownGranularity = 30
 )
 
 func (analyser *BurndownAnalysis) Name() string {
@@ -101,12 +106,12 @@ func (analyser *BurndownAnalysis) ListConfigurationOptions() []ConfigurationOpti
 		Description: "How many days there are in a single band.",
 		Flag:        "granularity",
 		Type:        IntConfigurationOption,
-		Default:     30}, {
+		Default:     DefaultBurndownGranularity}, {
 		Name:        ConfigBurndownSampling,
 		Description: "How frequently to record the state in days.",
 		Flag:        "sampling",
 		Type:        IntConfigurationOption,
-		Default:     30}, {
+		Default:     DefaultBurndownGranularity}, {
 		Name:        ConfigBurndownTrackFiles,
 		Description: "Record detailed statistics per each file.",
 		Flag:        "burndown-files",
@@ -155,12 +160,19 @@ func (analyser *BurndownAnalysis) Flag() string {
 
 func (analyser *BurndownAnalysis) Initialize(repository *git.Repository) {
 	if analyser.Granularity <= 0 {
-		fmt.Fprintln(os.Stderr, "Warning: adjusted the granularity to 30 days")
-		analyser.Granularity = 30
+		fmt.Fprintf(os.Stderr, "Warning: adjusted the granularity to %d days\n",
+			DefaultBurndownGranularity)
+		analyser.Granularity = DefaultBurndownGranularity
 	}
 	if analyser.Sampling <= 0 {
-		fmt.Fprintln(os.Stderr, "Warning: adjusted the sampling to 30 days")
-		analyser.Sampling = 30
+		fmt.Fprintf(os.Stderr, "Warning: adjusted the sampling to %d days\n",
+			DefaultBurndownGranularity)
+		analyser.Sampling = DefaultBurndownGranularity
+	}
+	if analyser.Sampling > analyser.Granularity {
+		fmt.Fprintf(os.Stderr, "Warning: granularity may not be less than sampling, adjusted to %d\n",
+			analyser.Granularity)
+		analyser.Sampling = analyser.Granularity
 	}
 	analyser.repository = repository
 	analyser.globalStatus = map[int]int64{}
@@ -243,10 +255,11 @@ func (analyser *BurndownAnalysis) Finalize() interface{} {
 		}
 	}
 	return BurndownResult{
-		GlobalHistory:   analyser.globalHistory,
-		FileHistories:   analyser.fileHistories,
-		PeopleHistories: analyser.peopleHistories,
-		PeopleMatrix:    peopleMatrix,
+		GlobalHistory:      analyser.globalHistory,
+		FileHistories:      analyser.fileHistories,
+		PeopleHistories:    analyser.peopleHistories,
+		PeopleMatrix:       peopleMatrix,
+		reversedPeopleDict: analyser.reversedPeopleDict,
 	}
 }
 
@@ -259,6 +272,402 @@ func (analyser *BurndownAnalysis) Serialize(result interface{}, binary bool, wri
 	return nil
 }
 
+func (analyser *BurndownAnalysis) Deserialize(pbmessage []byte) (interface{}, error) {
+	msg := pb.BurndownAnalysisResults{}
+	err := proto.Unmarshal(pbmessage, &msg)
+	if err != nil {
+		return nil, err
+	}
+	result := BurndownResult{}
+	convertCSR := func(mat *pb.BurndownSparseMatrix) [][]int64 {
+		res := make([][]int64, mat.NumberOfRows)
+		for i := 0; i < int(mat.NumberOfRows); i++ {
+			res[i] = make([]int64, mat.NumberOfColumns)
+			for j := 0; j < len(mat.Rows[i].Columns); j++ {
+				res[i][j] = int64(mat.Rows[i].Columns[j])
+			}
+		}
+		return res
+	}
+	result.GlobalHistory = convertCSR(msg.Project)
+	result.FileHistories = map[string][][]int64{}
+	for _, mat := range msg.Files {
+		result.FileHistories[mat.Name] = convertCSR(mat)
+	}
+	result.reversedPeopleDict = make([]string, len(msg.People))
+	result.PeopleHistories = make([][][]int64, len(msg.People))
+	for i, mat := range msg.People {
+		result.PeopleHistories[i] = convertCSR(mat)
+		result.reversedPeopleDict[i] = mat.Name
+	}
+	if msg.PeopleInteraction != nil {
+		result.PeopleMatrix = make([][]int64, msg.PeopleInteraction.NumberOfRows)
+	}
+	for i := 0; i < len(result.PeopleMatrix); i++ {
+		result.PeopleMatrix[i] = make([]int64, msg.PeopleInteraction.NumberOfColumns)
+		for j := int(msg.PeopleInteraction.Indptr[i]); j < int(msg.PeopleInteraction.Indptr[i+1]); j++ {
+			result.PeopleMatrix[i][msg.PeopleInteraction.Indices[j]] = msg.PeopleInteraction.Data[j]
+		}
+	}
+	result.sampling = int(msg.Sampling)
+	result.granularity = int(msg.Granularity)
+	return result, nil
+}
+
+func (analyser *BurndownAnalysis) MergeResults(
+	r1, r2 interface{}, c1, c2 *CommonAnalysisResult) interface{} {
+	bar1 := r1.(BurndownResult)
+	bar2 := r2.(BurndownResult)
+	merged := BurndownResult{}
+	if bar1.sampling < bar2.sampling {
+		merged.sampling = bar1.sampling
+	} else {
+		merged.sampling = bar2.sampling
+	}
+	if bar1.granularity < bar2.granularity {
+		merged.granularity = bar1.granularity
+	} else {
+		merged.granularity = bar2.granularity
+	}
+	people := map[string][3]int{}
+	for i, pid := range bar1.reversedPeopleDict {
+		ptrs := people[pid]
+		ptrs[0] = len(people)
+		ptrs[1] = i
+		ptrs[2] = -1
+		people[pid] = ptrs
+	}
+	for i, pid := range bar2.reversedPeopleDict {
+		ptrs, exists := people[pid]
+		if !exists {
+			ptrs[0] = len(people)
+			ptrs[1] = -1
+		}
+		ptrs[2] = i
+		people[pid] = ptrs
+	}
+	merged.reversedPeopleDict = make([]string, len(people))
+	for name, ptrs := range people {
+		merged.reversedPeopleDict[ptrs[0]] = name
+	}
+	var wg sync.WaitGroup
+	if len(bar1.GlobalHistory) > 0 || len(bar2.GlobalHistory) > 0 {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			merged.GlobalHistory = mergeMatrices(
+				bar1.GlobalHistory, bar2.GlobalHistory,
+				bar1.granularity, bar1.sampling,
+				bar2.granularity, bar2.sampling,
+				c1, c2)
+		}()
+	}
+	if len(bar1.FileHistories) > 0 || len(bar2.FileHistories) > 0 {
+		merged.FileHistories = map[string][][]int64{}
+		historyMutex := sync.Mutex{}
+		for key, fh1 := range bar1.FileHistories {
+			if fh2, exists := bar2.FileHistories[key]; exists {
+				wg.Add(1)
+				go func(fh1, fh2 [][]int64, key string) {
+					defer wg.Done()
+					historyMutex.Lock()
+					defer historyMutex.Unlock()
+					merged.FileHistories[key] = mergeMatrices(
+						fh1, fh2, bar1.granularity, bar1.sampling, bar2.granularity, bar2.sampling, c1, c2)
+				}(fh1, fh2, key)
+			} else {
+				historyMutex.Lock()
+				merged.FileHistories[key] = fh1
+				historyMutex.Unlock()
+			}
+		}
+		for key, fh2 := range bar2.FileHistories {
+			if _, exists := bar1.FileHistories[key]; !exists {
+				historyMutex.Lock()
+				merged.FileHistories[key] = fh2
+				historyMutex.Unlock()
+			}
+		}
+	}
+	if len(merged.reversedPeopleDict) > 0 {
+		merged.PeopleHistories = make([][][]int64, len(merged.reversedPeopleDict))
+		for i, key := range merged.reversedPeopleDict {
+			ptrs := people[key]
+			if ptrs[1] < 0 {
+				if len(bar2.PeopleHistories) > 0 {
+					merged.PeopleHistories[i] = bar2.PeopleHistories[ptrs[2]]
+				}
+			} else if ptrs[2] < 0 {
+				if len(bar1.PeopleHistories) > 0 {
+					merged.PeopleHistories[i] = bar1.PeopleHistories[ptrs[1]]
+				}
+			} else {
+				wg.Add(1)
+				go func(i int) {
+					defer wg.Done()
+					var m1, m2 [][]int64
+					if len(bar1.PeopleHistories) > 0 {
+						m1 = bar1.PeopleHistories[ptrs[1]]
+					}
+					if len(bar2.PeopleHistories) > 0 {
+						m2 = bar2.PeopleHistories[ptrs[2]]
+					}
+					merged.PeopleHistories[i] = mergeMatrices(
+						m1, m2,
+						bar1.granularity, bar1.sampling,
+						bar2.granularity, bar2.sampling,
+						c1, c2,
+					)
+				}(i)
+			}
+		}
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			if len(bar2.PeopleMatrix) == 0 {
+				merged.PeopleMatrix = bar1.PeopleMatrix
+				// extend the matrix in both directions
+				for i := 0; i < len(merged.PeopleMatrix); i++ {
+					for j := len(bar1.reversedPeopleDict); j < len(merged.reversedPeopleDict); j++ {
+						merged.PeopleMatrix[i] = append(merged.PeopleMatrix[i], 0)
+					}
+				}
+				for i := len(bar1.reversedPeopleDict); i < len(merged.reversedPeopleDict); i++ {
+					merged.PeopleMatrix = append(
+						merged.PeopleMatrix, make([]int64, len(merged.reversedPeopleDict)+2))
+				}
+			} else {
+				merged.PeopleMatrix = make([][]int64, len(merged.reversedPeopleDict))
+				for i := range merged.PeopleMatrix {
+					merged.PeopleMatrix[i] = make([]int64, len(merged.reversedPeopleDict)+2)
+				}
+				for i, key := range bar1.reversedPeopleDict {
+					mi := people[key][0] // index in merged.reversedPeopleDict
+					copy(merged.PeopleMatrix[mi][:2], bar1.PeopleMatrix[i][:2])
+					for j, val := range bar1.PeopleMatrix[i][2:] {
+						merged.PeopleMatrix[mi][2+people[bar1.reversedPeopleDict[j]][0]] = val
+					}
+				}
+				for i, key := range bar2.reversedPeopleDict {
+					mi := people[key][0] // index in merged.reversedPeopleDict
+					merged.PeopleMatrix[mi][0] += bar2.PeopleMatrix[i][0]
+					merged.PeopleMatrix[mi][1] += bar2.PeopleMatrix[i][1]
+					for j, val := range bar2.PeopleMatrix[i][2:] {
+						merged.PeopleMatrix[mi][2+people[bar2.reversedPeopleDict[j]][0]] += val
+					}
+				}
+			}
+		}()
+	}
+	wg.Wait()
+	return merged
+}
+
+func mergeMatrices(m1, m2 [][]int64, granularity1, sampling1, granularity2, sampling2 int,
+	c1, c2 *CommonAnalysisResult) [][]int64 {
+	commonMerged := *c1
+	commonMerged.Merge(c2)
+
+	var granularity, sampling int
+	if sampling1 < sampling2 {
+		sampling = sampling1
+	} else {
+		sampling = sampling2
+	}
+	if granularity1 < granularity2 {
+		granularity = granularity1
+	} else {
+		granularity = granularity2
+	}
+
+	size := int((commonMerged.EndTime - commonMerged.BeginTime) / (3600 * 24))
+	daily := make([][]float32, size+granularity)
+	for i := range daily {
+		daily[i] = make([]float32, size+sampling)
+	}
+	if len(m1) > 0 {
+		addBurndownMatrix(m1, granularity1, sampling1, daily,
+			int(c1.BeginTime-commonMerged.BeginTime)/(3600*24))
+	}
+	if len(m2) > 0 {
+		addBurndownMatrix(m2, granularity2, sampling2, daily,
+			int(c2.BeginTime-commonMerged.BeginTime)/(3600*24))
+	}
+
+	// convert daily to [][]in(t64
+	result := make([][]int64, (size+sampling-1)/sampling)
+	for i := range result {
+		result[i] = make([]int64, (size+granularity-1)/granularity)
+		sampledIndex := i * sampling
+		if i == len(result)-1 {
+			sampledIndex = size - 1
+		}
+		for j := 0; j < len(result[i]); j++ {
+			accum := float32(0)
+			for k := j * granularity; k < (j+1)*granularity && k < size; k++ {
+				accum += daily[sampledIndex][k]
+			}
+			result[i][j] = int64(accum)
+		}
+	}
+	return result
+}
+
+// Explode `matrix` so that it is daily sampled and has daily bands, shift by `offset` days
+// and add to the accumulator. `daily` size is square and is guaranteed to fit `matrix` by
+// the caller.
+// Rows: *at least* len(matrix) * sampling + offset
+// Columns: *at least* len(matrix[...]) * granularity + offset
+// `matrix` can be sparse, so that the last columns which are equal to 0 are truncated.
+func addBurndownMatrix(matrix [][]int64, granularity, sampling int, daily [][]float32, offset int) {
+	// Determine the maximum number of bands; the actual one may be larger but we do not care
+	maxCols := 0
+	for _, row := range matrix {
+		if maxCols < len(row) {
+			maxCols = len(row)
+		}
+	}
+	neededRows := len(matrix)*sampling + offset
+	if len(daily) < neededRows {
+		panic(fmt.Sprintf("merge bug: too few daily rows: required %d, have %d",
+			neededRows, len(daily)))
+	}
+	if len(daily[0]) < maxCols {
+		panic(fmt.Sprintf("merge bug: too few daily cols: required %d, have %d",
+			maxCols, len(daily[0])))
+	}
+	for x := 0; x < maxCols; x++ {
+		for y := 0; y < len(matrix); y++ {
+			if x*granularity > (y+1)*sampling {
+				// the future is zeros
+				continue
+			}
+			decay := func(startIndex int, startVal float32) {
+				k := float32(matrix[y][x]) / startVal // <= 1
+				scale := float32((y+1)*sampling - startIndex)
+				for i := x * granularity; i < (x+1)*granularity; i++ {
+					initial := daily[startIndex-1+offset][i+offset]
+					for j := startIndex; j < (y+1)*sampling; j++ {
+						daily[j+offset][i+offset] = initial * (1 + (k-1)*float32(j-startIndex+1)/scale)
+					}
+				}
+			}
+			raise := func(finishIndex int, finishVal float32) {
+				var initial float32
+				if y > 0 {
+					initial = float32(matrix[y-1][x])
+				}
+				startIndex := y * sampling
+				if startIndex < x*granularity {
+					startIndex = x * granularity
+				}
+				avg := (finishVal - initial) / float32(finishIndex-startIndex)
+				for j := y * sampling; j < finishIndex; j++ {
+					for i := startIndex; i <= j; i++ {
+						daily[j+offset][i+offset] = avg
+					}
+				}
+				// copy [x*g..y*s)
+				for j := y * sampling; j < finishIndex; j++ {
+					for i := x * granularity; i < y*sampling; i++ {
+						daily[j+offset][i+offset] = daily[j-1+offset][i+offset]
+					}
+				}
+			}
+			if (x+1)*granularity >= (y+1)*sampling {
+				// x*granularity <= (y+1)*sampling
+				// 1. x*granularity <= y*sampling
+				//    y*sampling..(y+1)sampling
+				//
+				//       x+1
+				//        /
+				//       /
+				//      / y+1  -|
+				//     /        |
+				//    / y      -|
+				//   /
+				//  / x
+				//
+				// 2. x*granularity > y*sampling
+				//    x*granularity..(y+1)sampling
+				//
+				//       x+1
+				//        /
+				//       /
+				//      / y+1  -|
+				//     /        |
+				//    / x      -|
+				//   /
+				//  / y
+				if x*granularity <= y*sampling {
+					raise((y+1)*sampling, float32(matrix[y][x]))
+				} else {
+					raise((y+1)*sampling, float32(matrix[y][x]))
+					avg := float32(matrix[y][x]) / float32((y+1)*sampling-x*granularity)
+					for j := x * granularity; j < (y+1)*sampling; j++ {
+						for i := x * granularity; i <= j; i++ {
+							daily[j+offset][i+offset] = avg
+						}
+					}
+				}
+			} else if (x+1)*granularity >= y*sampling {
+				// y*sampling <= (x+1)*granularity < (y+1)sampling
+				// y*sampling..(x+1)*granularity
+				// (x+1)*granularity..(y+1)sampling
+				//        x+1
+				//         /\
+				//        /  \
+				//       /    \
+				//      /    y+1
+				//     /
+				//    y
+				v1 := float32(matrix[y-1][x])
+				v2 := float32(matrix[y][x])
+				var peak float32
+				delta := float32((x+1)*granularity - y*sampling)
+				var scale float32
+				var previous float32
+				if y > 0 && (y-1)*sampling >= x*granularity {
+					// x*g <= (y-1)*s <= y*s <= (x+1)*g <= (y+1)*s
+					//           |________|.......^
+					if y > 1 {
+						previous = float32(matrix[y-2][x])
+					}
+					scale = float32(sampling)
+				} else {
+					// (y-1)*s < x*g <= y*s <= (x+1)*g <= (y+1)*s
+					//            |______|.......^
+					if y == 0 {
+						scale = float32(sampling)
+					} else {
+						scale = float32(y*sampling - x*granularity)
+					}
+				}
+				peak = v1 + (v1-previous)/scale*delta
+				if v2 > peak {
+					// we need to adjust the peak, it may not be less than the decayed value
+					if y < len(matrix)-1 {
+						// y*s <= (x+1)*g <= (y+1)*s < (y+2)*s
+						//           ^.........|_________|
+						k := (v2 - float32(matrix[y+1][x])) / float32(sampling) // > 0
+						peak = float32(matrix[y][x]) + k*float32((y+1)*sampling-(x+1)*granularity)
+						// peak > v2 > v1
+					} else {
+						peak = v2
+						// not enough data to interpolate; this is at least not restricted
+					}
+				}
+				raise((x+1)*granularity, peak)
+				decay((x+1)*granularity, peak)
+			} else {
+				// (x+1)*granularity < y*sampling
+				// y*sampling..(y+1)sampling
+				decay(y*sampling, float32(matrix[y-1][x]))
+			}
+		}
+	}
+}
+
 func (analyser *BurndownAnalysis) serializeText(result *BurndownResult, writer io.Writer) {
 	fmt.Fprintln(writer, "  granularity:", analyser.Granularity)
 	fmt.Fprintln(writer, "  sampling:", analyser.Sampling)
@@ -274,11 +683,11 @@ func (analyser *BurndownAnalysis) serializeText(result *BurndownResult, writer i
 	if len(result.PeopleHistories) > 0 {
 		fmt.Fprintln(writer, "  people_sequence:")
 		for key := range result.PeopleHistories {
-			fmt.Fprintln(writer, "    - "+yaml.SafeString(analyser.reversedPeopleDict[key]))
+			fmt.Fprintln(writer, "    - "+yaml.SafeString(result.reversedPeopleDict[key]))
 		}
 		fmt.Fprintln(writer, "  people:")
 		for key, val := range result.PeopleHistories {
-			yaml.PrintMatrix(writer, val, 4, analyser.reversedPeopleDict[key], true)
+			yaml.PrintMatrix(writer, val, 4, result.reversedPeopleDict[key], true)
 		}
 		fmt.Fprintln(writer, "  people_interaction: |-")
 		yaml.PrintMatrix(writer, result.PeopleMatrix, 4, "", false)
@@ -289,7 +698,9 @@ func (analyser *BurndownAnalysis) serializeBinary(result *BurndownResult, writer
 	message := pb.BurndownAnalysisResults{
 		Granularity: int32(analyser.Granularity),
 		Sampling:    int32(analyser.Sampling),
-		Project:     pb.ToBurndownSparseMatrix(result.GlobalHistory, "project"),
+	}
+	if len(result.GlobalHistory) > 0 {
+		message.Project = pb.ToBurndownSparseMatrix(result.GlobalHistory, "project")
 	}
 	if len(result.FileHistories) > 0 {
 		message.Files = make([]*pb.BurndownSparseMatrix, len(result.FileHistories))
@@ -306,7 +717,9 @@ func (analyser *BurndownAnalysis) serializeBinary(result *BurndownResult, writer
 		message.People = make(
 			[]*pb.BurndownSparseMatrix, len(result.PeopleHistories))
 		for key, val := range result.PeopleHistories {
-			message.People[key] = pb.ToBurndownSparseMatrix(val, analyser.reversedPeopleDict[key])
+			if len(val) > 0 {
+				message.People[key] = pb.ToBurndownSparseMatrix(val, result.reversedPeopleDict[key])
+			}
 		}
 		message.PeopleInteraction = pb.DenseToCompressedSparseRowMatrix(result.PeopleMatrix)
 	}

+ 439 - 2
burndown_test.go

@@ -3,6 +3,7 @@ package hercules
 import (
 	"bytes"
 	"io"
+	"io/ioutil"
 	"testing"
 
 	"github.com/gogo/protobuf/proto"
@@ -73,6 +74,25 @@ func TestBurndownRegistration(t *testing.T) {
 	assert.Equal(t, tp.Elem().Name(), "BurndownAnalysis")
 }
 
+func TestBurndownInitialize(t *testing.T) {
+	burndown := BurndownAnalysis{}
+	burndown.Sampling = -10
+	burndown.Granularity = DefaultBurndownGranularity
+	burndown.Initialize(testRepository)
+	assert.Equal(t, burndown.Sampling, DefaultBurndownGranularity)
+	assert.Equal(t, burndown.Granularity, DefaultBurndownGranularity)
+	burndown.Sampling = 0
+	burndown.Granularity = DefaultBurndownGranularity - 1
+	burndown.Initialize(testRepository)
+	assert.Equal(t, burndown.Sampling, DefaultBurndownGranularity-1)
+	assert.Equal(t, burndown.Granularity, DefaultBurndownGranularity-1)
+	burndown.Sampling = DefaultBurndownGranularity - 1
+	burndown.Granularity = -10
+	burndown.Initialize(testRepository)
+	assert.Equal(t, burndown.Sampling, DefaultBurndownGranularity-1)
+	assert.Equal(t, burndown.Granularity, DefaultBurndownGranularity)
+}
+
 func TestBurndownConsumeFinalize(t *testing.T) {
 	burndown := BurndownAnalysis{
 		Granularity:  30,
@@ -435,11 +455,11 @@ func TestBurndownAnalysisSerialize(t *testing.T) {
 	fd = fixtureFileDiff()
 	result, _ = fd.Consume(deps)
 	deps["file_diff"] = result["file_diff"]
+	people := [...]string{"one@srcd", "two@srcd"}
+	burndown.reversedPeopleDict = people[:]
 	burndown.Consume(deps)
 	out := burndown.Finalize().(BurndownResult)
 
-	people := [...]string{"one@srcd", "two@srcd"}
-	burndown.reversedPeopleDict = people[:]
 	buffer := &bytes.Buffer{}
 	burndown.Serialize(out, false, buffer)
 	assert.Equal(t, buffer.String(), `  granularity: 30
@@ -525,3 +545,420 @@ func TestCheckClose(t *testing.T) {
 	closer := panickingCloser{}
 	assert.Panics(t, func() { checkClose(closer) })
 }
+
+func TestBurndownAddMatrix(t *testing.T) {
+	size := 5*3 + 1
+	daily := make([][]float32, size)
+	for i := range daily {
+		daily[i] = make([]float32, size)
+	}
+	added := make([][]int64, 5)
+	for i := range added {
+		added[i] = make([]int64, 3)
+		switch i {
+		case 0:
+			added[i][0] = 10
+		case 1:
+			added[i][0] = 18
+			added[i][1] = 2
+		case 2:
+			added[i][0] = 12
+			added[i][1] = 14
+		case 3:
+			added[i][0] = 10
+			added[i][1] = 12
+			added[i][2] = 6
+		case 4:
+			added[i][0] = 8
+			added[i][1] = 9
+			added[i][2] = 13
+		}
+	}
+	assert.Panics(t, func() {
+		daily2 := make([][]float32, 16)
+		for i := range daily2 {
+			daily2[i] = make([]float32, 15)
+		}
+		addBurndownMatrix(added, 5, 3, daily2, 1)
+	})
+	assert.Panics(t, func() {
+		daily2 := make([][]float32, 15)
+		for i := range daily2 {
+			daily2[i] = make([]float32, 16)
+		}
+		addBurndownMatrix(added, 5, 3, daily2, 1)
+	})
+	// yaml.PrintMatrix(os.Stdout, added, 0, "test", true)
+	/*
+			"test": |-
+		  10  0  0
+		  18  2  0
+		  12 14  0
+		  10 12  6
+		   8  9 13
+	*/
+	addBurndownMatrix(added, 5, 3, daily, 1)
+	for i := range daily[0] {
+		assert.Equal(t, daily[0][i], float32(0))
+	}
+	for i := range daily {
+		assert.Equal(t, daily[i][0], float32(0))
+	}
+	/*for _, row := range daily {
+		fmt.Println(row)
+	}*/
+	// check pinned points
+	for y := 0; y < 5; y++ {
+		for x := 0; x < 3; x++ {
+			var sum float32
+			for i := x * 5; i < (x+1)*5; i++ {
+				sum += daily[(y+1)*3][i+1]
+			}
+			assert.InDelta(t, sum, added[y][x], 0.00001)
+		}
+	}
+	// check overall trend: 0 -> const -> peak -> decay
+	for x := 0; x < 15; x++ {
+		for y := 0; y < x; y++ {
+			assert.Zero(t, daily[y+1][x+1])
+		}
+		var prev float32
+		for y := x; y < ((x+3)/5)*5; y++ {
+			if prev == 0 {
+				prev = daily[y+1][x+1]
+			}
+			assert.Equal(t, daily[y+1][x+1], prev)
+		}
+		for y := ((x + 3) / 5) * 5; y < 15; y++ {
+			if prev == 0 {
+				prev = daily[y+1][x+1]
+			}
+			assert.True(t, daily[y+1][x+1] <= prev)
+			prev = daily[y+1][x+1]
+		}
+	}
+}
+
+func TestBurndownAddMatrixCrazy(t *testing.T) {
+	size := 5 * 3
+	daily := make([][]float32, size)
+	for i := range daily {
+		daily[i] = make([]float32, size)
+	}
+	added := make([][]int64, 5)
+	for i := range added {
+		added[i] = make([]int64, 3)
+		switch i {
+		case 0:
+			added[i][0] = 10
+		case 1:
+			added[i][0] = 9
+			added[i][1] = 2
+		case 2:
+			added[i][0] = 8
+			added[i][1] = 16
+		case 3:
+			added[i][0] = 7
+			added[i][1] = 12
+			added[i][2] = 6
+		case 4:
+			added[i][0] = 6
+			added[i][1] = 9
+			added[i][2] = 13
+		}
+	}
+	// yaml.PrintMatrix(os.Stdout, added, 0, "test", true)
+	/*
+			"test": |-
+		  10  0  0
+		  9  2  0
+		  8 16  0
+		  7 12  6
+		  6  9 13
+	*/
+	addBurndownMatrix(added, 5, 3, daily, 0)
+	/*for _, row := range daily {
+		fmt.Println(row)
+	}*/
+	// check pinned points
+	for y := 0; y < 5; y++ {
+		for x := 0; x < 3; x++ {
+			var sum float32
+			for i := x * 5; i < (x+1)*5; i++ {
+				sum += daily[(y+1)*3-1][i]
+			}
+			assert.InDelta(t, sum, added[y][x], 0.00001)
+		}
+	}
+	// check overall trend: 0 -> const -> peak -> decay
+	for x := 0; x < 15; x++ {
+		for y := 0; y < x; y++ {
+			assert.Zero(t, daily[y][x])
+		}
+		var prev float32
+		for y := x; y < ((x+3)/5)*5; y++ {
+			if prev == 0 {
+				prev = daily[y][x]
+			}
+			assert.Equal(t, daily[y][x], prev)
+		}
+		for y := ((x + 3) / 5) * 5; y < 15; y++ {
+			if prev == 0 {
+				prev = daily[y][x]
+			}
+			assert.True(t, daily[y][x] <= prev)
+			prev = daily[y][x]
+		}
+	}
+}
+
+func TestBurndownMergeGlobalHistory(t *testing.T) {
+	people1 := [...]string{"one", "two"}
+	res1 := BurndownResult{
+		GlobalHistory:      [][]int64{},
+		FileHistories:      map[string][][]int64{},
+		PeopleHistories:    [][][]int64{},
+		PeopleMatrix:       [][]int64{},
+		reversedPeopleDict: people1[:],
+		sampling:           15,
+		granularity:        20,
+	}
+	c1 := CommonAnalysisResult{
+		BeginTime:     600566400, // 1989 Jan 12
+		EndTime:       604713600, // 1989 March 1
+		CommitsNumber: 10,
+		RunTime:       100000,
+	}
+	// 48 days
+	res1.GlobalHistory = make([][]int64, 48/15+1 /* 4 samples */)
+	for i := range res1.GlobalHistory {
+		res1.GlobalHistory[i] = make([]int64, 48/20+1 /* 3 bands */)
+		switch i {
+		case 0:
+			res1.GlobalHistory[i][0] = 1000
+		case 1:
+			res1.GlobalHistory[i][0] = 1100
+			res1.GlobalHistory[i][1] = 400
+		case 2:
+			res1.GlobalHistory[i][0] = 900
+			res1.GlobalHistory[i][1] = 750
+			res1.GlobalHistory[i][2] = 100
+		case 3:
+			res1.GlobalHistory[i][0] = 850
+			res1.GlobalHistory[i][1] = 700
+			res1.GlobalHistory[i][2] = 150
+		}
+	}
+	res1.FileHistories["file1"] = res1.GlobalHistory
+	res1.FileHistories["file2"] = res1.GlobalHistory
+	res1.PeopleHistories = append(res1.PeopleHistories, res1.GlobalHistory)
+	res1.PeopleHistories = append(res1.PeopleHistories, res1.GlobalHistory)
+	res1.PeopleMatrix = append(res1.PeopleMatrix, make([]int64, 4))
+	res1.PeopleMatrix = append(res1.PeopleMatrix, make([]int64, 4))
+	res1.PeopleMatrix[0][0] = 10
+	res1.PeopleMatrix[0][1] = 20
+	res1.PeopleMatrix[0][2] = 30
+	res1.PeopleMatrix[0][3] = 40
+	res1.PeopleMatrix[1][0] = 50
+	res1.PeopleMatrix[1][1] = 60
+	res1.PeopleMatrix[1][2] = 70
+	res1.PeopleMatrix[1][3] = 80
+	people2 := [...]string{"two", "three"}
+	res2 := BurndownResult{
+		GlobalHistory:      [][]int64{},
+		FileHistories:      map[string][][]int64{},
+		PeopleHistories:    [][][]int64{},
+		PeopleMatrix:       [][]int64{},
+		reversedPeopleDict: people2[:],
+		sampling:           14,
+		granularity:        19,
+	}
+	c2 := CommonAnalysisResult{
+		BeginTime:     601084800, // 1989 Jan 18
+		EndTime:       605923200, // 1989 March 15
+		CommitsNumber: 10,
+		RunTime:       100000,
+	}
+	// 56 days
+	res2.GlobalHistory = make([][]int64, 56/14 /* 4 samples */)
+	for i := range res2.GlobalHistory {
+		res2.GlobalHistory[i] = make([]int64, 56/19+1 /* 3 bands */)
+		switch i {
+		case 0:
+			res2.GlobalHistory[i][0] = 900
+		case 1:
+			res2.GlobalHistory[i][0] = 1100
+			res2.GlobalHistory[i][1] = 400
+		case 2:
+			res2.GlobalHistory[i][0] = 900
+			res2.GlobalHistory[i][1] = 750
+			res2.GlobalHistory[i][2] = 100
+		case 3:
+			res2.GlobalHistory[i][0] = 800
+			res2.GlobalHistory[i][1] = 600
+			res2.GlobalHistory[i][2] = 600
+		}
+	}
+	res2.FileHistories["file2"] = res2.GlobalHistory
+	res2.FileHistories["file3"] = res2.GlobalHistory
+	res2.PeopleHistories = append(res2.PeopleHistories, res2.GlobalHistory)
+	res2.PeopleHistories = append(res2.PeopleHistories, res2.GlobalHistory)
+	res2.PeopleMatrix = append(res2.PeopleMatrix, make([]int64, 4))
+	res2.PeopleMatrix = append(res2.PeopleMatrix, make([]int64, 4))
+	res2.PeopleMatrix[0][0] = 100
+	res2.PeopleMatrix[0][1] = 200
+	res2.PeopleMatrix[0][2] = 300
+	res2.PeopleMatrix[0][3] = 400
+	res2.PeopleMatrix[1][0] = 500
+	res2.PeopleMatrix[1][1] = 600
+	res2.PeopleMatrix[1][2] = 700
+	res2.PeopleMatrix[1][3] = 800
+	burndown := BurndownAnalysis{}
+	merged := burndown.MergeResults(res1, res2, &c1, &c2).(BurndownResult)
+	assert.Equal(t, merged.granularity, 19)
+	assert.Equal(t, merged.sampling, 14)
+	assert.Len(t, merged.GlobalHistory, 5)
+	for _, row := range merged.GlobalHistory {
+		assert.Len(t, row, 4)
+	}
+	assert.Equal(t, merged.FileHistories["file1"], res1.GlobalHistory)
+	assert.Equal(t, merged.FileHistories["file2"], merged.GlobalHistory)
+	assert.Equal(t, merged.FileHistories["file3"], res2.GlobalHistory)
+	assert.Len(t, merged.reversedPeopleDict, 3)
+	assert.Equal(t, merged.PeopleHistories[0], res1.GlobalHistory)
+	assert.Equal(t, merged.PeopleHistories[1], merged.GlobalHistory)
+	assert.Equal(t, merged.PeopleHistories[2], res2.GlobalHistory)
+	assert.Len(t, merged.PeopleMatrix, 3)
+	for _, row := range merged.PeopleMatrix {
+		assert.Len(t, row, 5)
+	}
+	assert.Equal(t, merged.PeopleMatrix[0][0], int64(10))
+	assert.Equal(t, merged.PeopleMatrix[0][1], int64(20))
+	assert.Equal(t, merged.PeopleMatrix[0][2], int64(30))
+	assert.Equal(t, merged.PeopleMatrix[0][3], int64(40))
+	assert.Equal(t, merged.PeopleMatrix[0][4], int64(0))
+
+	assert.Equal(t, merged.PeopleMatrix[1][0], int64(150))
+	assert.Equal(t, merged.PeopleMatrix[1][1], int64(260))
+	assert.Equal(t, merged.PeopleMatrix[1][2], int64(70))
+	assert.Equal(t, merged.PeopleMatrix[1][3], int64(380))
+	assert.Equal(t, merged.PeopleMatrix[1][4], int64(400))
+
+	assert.Equal(t, merged.PeopleMatrix[2][0], int64(500))
+	assert.Equal(t, merged.PeopleMatrix[2][1], int64(600))
+	assert.Equal(t, merged.PeopleMatrix[2][2], int64(0))
+	assert.Equal(t, merged.PeopleMatrix[2][3], int64(700))
+	assert.Equal(t, merged.PeopleMatrix[2][4], int64(800))
+	burndown.serializeBinary(&merged, ioutil.Discard)
+}
+
+func TestBurndownMergeNils(t *testing.T) {
+	res1 := BurndownResult{
+		GlobalHistory:      [][]int64{},
+		FileHistories:      map[string][][]int64{},
+		PeopleHistories:    [][][]int64{},
+		PeopleMatrix:       [][]int64{},
+		reversedPeopleDict: []string{},
+		sampling:           15,
+		granularity:        20,
+	}
+	c1 := CommonAnalysisResult{
+		BeginTime:     600566400, // 1989 Jan 12
+		EndTime:       604713600, // 1989 March 1
+		CommitsNumber: 10,
+		RunTime:       100000,
+	}
+	res2 := BurndownResult{
+		GlobalHistory:      nil,
+		FileHistories:      nil,
+		PeopleHistories:    nil,
+		PeopleMatrix:       nil,
+		reversedPeopleDict: nil,
+		sampling:           14,
+		granularity:        19,
+	}
+	c2 := CommonAnalysisResult{
+		BeginTime:     601084800, // 1989 Jan 18
+		EndTime:       605923200, // 1989 March 15
+		CommitsNumber: 10,
+		RunTime:       100000,
+	}
+	burndown := BurndownAnalysis{}
+	merged := burndown.MergeResults(res1, res2, &c1, &c2).(BurndownResult)
+	assert.Equal(t, merged.granularity, 19)
+	assert.Equal(t, merged.sampling, 14)
+	assert.Nil(t, merged.GlobalHistory)
+	assert.Nil(t, merged.FileHistories)
+	assert.Nil(t, merged.PeopleHistories)
+	assert.Nil(t, merged.PeopleMatrix)
+	burndown.serializeBinary(&merged, ioutil.Discard)
+
+	res2.GlobalHistory = make([][]int64, 56/14 /* 4 samples */)
+	for i := range res2.GlobalHistory {
+		res2.GlobalHistory[i] = make([]int64, 56/19+1 /* 3 bands */)
+		switch i {
+		case 0:
+			res2.GlobalHistory[i][0] = 900
+		case 1:
+			res2.GlobalHistory[i][0] = 1100
+			res2.GlobalHistory[i][1] = 400
+		case 2:
+			res2.GlobalHistory[i][0] = 900
+			res2.GlobalHistory[i][1] = 750
+			res2.GlobalHistory[i][2] = 100
+		case 3:
+			res2.GlobalHistory[i][0] = 800
+			res2.GlobalHistory[i][1] = 600
+			res2.GlobalHistory[i][2] = 600
+		}
+	}
+	people1 := [...]string{"one", "two"}
+	res1.reversedPeopleDict = people1[:]
+	res1.PeopleMatrix = append(res1.PeopleMatrix, make([]int64, 4))
+	res1.PeopleMatrix = append(res1.PeopleMatrix, make([]int64, 4))
+	res1.PeopleMatrix[0][0] = 10
+	res1.PeopleMatrix[0][1] = 20
+	res1.PeopleMatrix[0][2] = 30
+	res1.PeopleMatrix[0][3] = 40
+	res1.PeopleMatrix[1][0] = 50
+	res1.PeopleMatrix[1][1] = 60
+	res1.PeopleMatrix[1][2] = 70
+	res1.PeopleMatrix[1][3] = 80
+	people2 := [...]string{"two", "three"}
+	res2.reversedPeopleDict = people2[:]
+	merged = burndown.MergeResults(res1, res2, &c1, &c2).(BurndownResult)
+	mgh := [5][4]int64{
+		{0, 0, 0, 0},
+		{578, 0, 0, 0},
+		{798, 546, 0, 0},
+		{664, 884, 222, 0},
+		{547, 663, 610, 178},
+	}
+	mgh2 := [...][]int64{
+		mgh[0][:], mgh[1][:], mgh[2][:], mgh[3][:], mgh[4][:],
+	}
+	mgh3 := mgh2[:]
+	assert.Equal(t, mgh3, merged.GlobalHistory)
+	assert.Len(t, merged.PeopleMatrix, 3)
+	for _, row := range merged.PeopleMatrix {
+		assert.Len(t, row, 5)
+	}
+	assert.Equal(t, merged.PeopleMatrix[0][0], int64(10))
+	assert.Equal(t, merged.PeopleMatrix[0][1], int64(20))
+	assert.Equal(t, merged.PeopleMatrix[0][2], int64(30))
+	assert.Equal(t, merged.PeopleMatrix[0][3], int64(40))
+	assert.Equal(t, merged.PeopleMatrix[0][4], int64(0))
+
+	assert.Equal(t, merged.PeopleMatrix[1][0], int64(50))
+	assert.Equal(t, merged.PeopleMatrix[1][1], int64(60))
+	assert.Equal(t, merged.PeopleMatrix[1][2], int64(70))
+	assert.Equal(t, merged.PeopleMatrix[1][3], int64(80))
+	assert.Equal(t, merged.PeopleMatrix[1][4], int64(0))
+
+	assert.Equal(t, merged.PeopleMatrix[2][0], int64(0))
+	assert.Equal(t, merged.PeopleMatrix[2][1], int64(0))
+	assert.Equal(t, merged.PeopleMatrix[2][2], int64(0))
+	assert.Equal(t, merged.PeopleMatrix[2][3], int64(0))
+	assert.Equal(t, merged.PeopleMatrix[2][4], int64(0))
+	burndown.serializeBinary(&merged, ioutil.Discard)
+}

+ 19 - 18
cmd/hercules-combine/main.go

@@ -47,10 +47,10 @@ func main() {
 		return
 	}
 	mergedMessage := pb.AnalysisResults{
-		Header:   &pb.Metadata{
-			Version:       2,
-			Hash:          hercules.GIT_HASH,
-			Repository:    strings.Join(repos, " & "),
+		Header: &pb.Metadata{
+			Version:    2,
+			Hash:       hercules.GIT_HASH,
+			Repository: strings.Join(repos, " & "),
 		},
 		Contents: map[string][]byte{},
 	}
@@ -69,17 +69,17 @@ func main() {
 }
 
 func loadMessage(fileName string, repos *[]string) (
-		map[string]interface{}, *hercules.CommonAnalysisResult, []string) {
+	map[string]interface{}, *hercules.CommonAnalysisResult, []string) {
 	errs := []string{}
 	buffer, err := ioutil.ReadFile(fileName)
 	if err != nil {
-		errs = append(errs, "Cannot read " + fileName + ": " + err.Error())
+		errs = append(errs, "Cannot read "+fileName+": "+err.Error())
 		return nil, nil, errs
 	}
 	message := pb.AnalysisResults{}
 	err = proto.Unmarshal(buffer, &message)
 	if err != nil {
-		errs = append(errs, "Cannot parse " + fileName + ": " + err.Error())
+		errs = append(errs, "Cannot parse "+fileName+": "+err.Error())
 		return nil, nil, errs
 	}
 	*repos = append(*repos, message.Header.Repository)
@@ -87,17 +87,17 @@ func loadMessage(fileName string, repos *[]string) (
 	for key, val := range message.Contents {
 		summoned := hercules.Registry.Summon(key)
 		if len(summoned) == 0 {
-			errs = append(errs, fileName + ": item not found: " + key)
+			errs = append(errs, fileName+": item not found: "+key)
 			continue
 		}
 		mpi, ok := summoned[0].(hercules.MergeablePipelineItem)
 		if !ok {
-			errs = append(errs, fileName + ": " + key + ": MergeablePipelineItem is not implemented")
+			errs = append(errs, fileName+": "+key+": MergeablePipelineItem is not implemented")
 			continue
 		}
 		msg, err := mpi.Deserialize(val)
 		if err != nil {
-			errs = append(errs, fileName + ": deserialization failed: " + key + ": " + err.Error())
+			errs = append(errs, fileName+": deserialization failed: "+key+": "+err.Error())
 			continue
 		}
 		results[key] = msg
@@ -114,23 +114,23 @@ func printErrors(allErrors map[string][]string) {
 		}
 	}
 	if !needToPrintErrors {
-	 return
+		return
 	}
 	fmt.Fprintln(os.Stderr, "Errors:")
 	for key, errs := range allErrors {
 		if len(errs) > 0 {
-			fmt.Fprintln(os.Stderr, "  " + key)
+			fmt.Fprintln(os.Stderr, "  "+key)
 			for _, err := range errs {
-				fmt.Fprintln(os.Stderr, "    " + err)
+				fmt.Fprintln(os.Stderr, "    "+err)
 			}
 		}
 	}
 }
 
 func mergeResults(mergedResults map[string]interface{},
-		mergedCommons *hercules.CommonAnalysisResult,
-		anotherResults map[string]interface{},
-		anotherCommons *hercules.CommonAnalysisResult) {
+	mergedCommons *hercules.CommonAnalysisResult,
+	anotherResults map[string]interface{},
+	anotherCommons *hercules.CommonAnalysisResult) {
 	for key, val := range anotherResults {
 		mergedResult, exists := mergedResults[key]
 		if !exists {
@@ -138,11 +138,12 @@ func mergeResults(mergedResults map[string]interface{},
 			continue
 		}
 		item := hercules.Registry.Summon(key)[0].(hercules.MergeablePipelineItem)
-		mergedResult, *mergedCommons = item.MergeResults(
-			mergedResult, val, mergedCommons, anotherCommons)
+		mergedResult = item.MergeResults(mergedResult, val, mergedCommons, anotherCommons)
 		mergedResults[key] = mergedResult
 	}
 	if mergedCommons.CommitsNumber == 0 {
 		*mergedCommons = *anotherCommons
+	} else {
+		mergedCommons.Merge(anotherCommons)
 	}
 }

+ 6 - 7
labours.py

@@ -295,17 +295,16 @@ def load_burndown(header, name, matrix, resample):
         epsrange = numpy.arange(0, 1, 1.0 / sampling)
         for y in range(matrix.shape[0]):
             for x in range(matrix.shape[1]):
-                previous = matrix[y, x - 1] if x > 0 else 0
-                value = ((previous + (matrix[y, x] - previous) * epsrange)
-                         / granularity)[numpy.newaxis, :]
                 if (y + 1) * granularity <= x * sampling:
+                    previous = matrix[y, x - 1] if x > 0 else 0
+                    value = ((previous + (matrix[y, x] - previous) * epsrange)
+                             / granularity)[numpy.newaxis, :]
                     daily_matrix[y * granularity:(y + 1) * granularity,
-                    x * sampling:(x + 1) * sampling] = value
+                                 x * sampling:(x + 1) * sampling] = value
                 elif y * granularity <= (x + 1) * sampling:
                     for suby in range(y * granularity, (y + 1) * granularity):
                         for subx in range(suby, (x + 1) * sampling):
-                            daily_matrix[suby, subx] = matrix[
-                                                           y, x] / granularity
+                            daily_matrix[suby, subx] = matrix[y, x] / granularity
         daily_matrix[(last - start).days:] = 0
         # Resample the bands
         aliases = {
@@ -337,7 +336,7 @@ def load_burndown(header, name, matrix, resample):
                     break
             matrix[i, j:] = \
                 daily_matrix[istart:ifinish, (sdt - start).days:].sum(axis=0)
-        # Hardcode some cases to improve labels" readability
+        # Hardcode some cases to improve labels' readability
         if resample in ("year", "A"):
             labels = [dt.year for dt in date_granularity_sampling]
         elif resample in ("month", "M"):

+ 3 - 0
pb/utils.go

@@ -3,6 +3,9 @@ package pb
 import "sort"
 
 func ToBurndownSparseMatrix(matrix [][]int64, name string) *BurndownSparseMatrix {
+	if len(matrix) == 0 {
+		panic("matrix may not be nil or empty")
+	}
 	r := BurndownSparseMatrix{
 		Name:            name,
 		NumberOfRows:    int32(len(matrix)),

+ 32 - 9
pipeline.go

@@ -18,8 +18,8 @@ import (
 	"gopkg.in/src-d/go-git.v4"
 	"gopkg.in/src-d/go-git.v4/plumbing"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
-	"gopkg.in/src-d/hercules.v3/toposort"
 	"gopkg.in/src-d/hercules.v3/pb"
+	"gopkg.in/src-d/hercules.v3/toposort"
 )
 
 type ConfigurationOptionType int
@@ -101,7 +101,7 @@ type MergeablePipelineItem interface {
 	// Deserialize loads the result from Protocol Buffers blob.
 	Deserialize(pbmessage []byte) (interface{}, error)
 	// MergeResults joins two results together. Common-s are specified as the global state.
-	MergeResults(r1, r2 interface{}, c1, c2 *CommonAnalysisResult) (interface{}, CommonAnalysisResult)
+	MergeResults(r1, r2 interface{}, c1, c2 *CommonAnalysisResult) interface{}
 }
 
 // CommonAnalysisResult holds the information which is always extracted at Pipeline.Run().
@@ -116,20 +116,43 @@ type CommonAnalysisResult struct {
 	RunTime time.Duration
 }
 
-func (car *CommonAnalysisResult) FillMetadata(meta *pb.Metadata) {
+func (car *CommonAnalysisResult) BeginTimeAsTime() time.Time {
+	return time.Unix(car.BeginTime, 0)
+}
+
+func (car *CommonAnalysisResult) EndTimeAsTime() time.Time {
+	return time.Unix(car.EndTime, 0)
+}
+
+func (car *CommonAnalysisResult) Merge(other *CommonAnalysisResult) {
+	if car.EndTime == 0 || other.BeginTime == 0 {
+		panic("Merging with an uninitialized CommonAnalysisResult")
+	}
+	if other.BeginTime < car.BeginTime {
+		car.BeginTime = other.BeginTime
+	}
+	if other.EndTime > car.EndTime {
+		car.EndTime = other.EndTime
+	}
+	car.CommitsNumber += other.CommitsNumber
+	car.RunTime += other.RunTime
+}
+
+func (car *CommonAnalysisResult) FillMetadata(meta *pb.Metadata) *pb.Metadata {
 	meta.BeginUnixTime = car.BeginTime
 	meta.EndUnixTime = car.EndTime
 	meta.Commits = int32(car.CommitsNumber)
 	meta.RunTime = car.RunTime.Nanoseconds() / 1e6
+	return meta
 }
 
 func MetadataToCommonAnalysisResult(meta *pb.Metadata) *CommonAnalysisResult {
-  return &CommonAnalysisResult{
-	  BeginTime:     meta.BeginUnixTime,
-	  EndTime:       meta.EndUnixTime,
-	  CommitsNumber: int(meta.Commits),
-	  RunTime:       time.Duration(meta.RunTime * 1e6),
-  }
+	return &CommonAnalysisResult{
+		BeginTime:     meta.BeginUnixTime,
+		EndTime:       meta.EndUnixTime,
+		CommitsNumber: int(meta.Commits),
+		RunTime:       time.Duration(meta.RunTime * 1e6),
+	}
 }
 
 // PipelineItemRegistry contains all the known PipelineItem-s.

+ 27 - 1
pipeline_test.go

@@ -15,6 +15,7 @@ import (
 	"gopkg.in/src-d/go-git.v4/plumbing"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
 	"gopkg.in/src-d/go-git.v4/storage/memory"
+	"gopkg.in/src-d/hercules.v3/pb"
 )
 
 type testPipelineItem struct {
@@ -221,7 +222,7 @@ func TestPipelineRun(t *testing.T) {
 	assert.Equal(t, common.BeginTime, int64(1481719092))
 	assert.Equal(t, common.EndTime, int64(1481719092))
 	assert.Equal(t, common.CommitsNumber, 1)
-	assert.True(t, common.RunTime.Nanoseconds() / 1e6 < 100)
+	assert.True(t, common.RunTime.Nanoseconds()/1e6 < 100)
 	assert.True(t, item.DepsConsumed)
 	assert.True(t, item.CommitMatches)
 	assert.True(t, item.IndexMatches)
@@ -405,6 +406,31 @@ func TestPipelineSerializeNoUast(t *testing.T) {
 }`, dot)
 }
 
+func TestCommonAnalysisResultMerge(t *testing.T) {
+	c1 := CommonAnalysisResult{
+		BeginTime: 1513620635, EndTime: 1513720635, CommitsNumber: 1, RunTime: 100}
+	assert.Equal(t, c1.BeginTimeAsTime().Unix(), int64(1513620635))
+	assert.Equal(t, c1.EndTimeAsTime().Unix(), int64(1513720635))
+	c2 := CommonAnalysisResult{
+		BeginTime: 1513620535, EndTime: 1513730635, CommitsNumber: 2, RunTime: 200}
+	c1.Merge(&c2)
+	assert.Equal(t, c1.BeginTime, int64(1513620535))
+	assert.Equal(t, c1.EndTime, int64(1513730635))
+	assert.Equal(t, c1.CommitsNumber, 3)
+	assert.Equal(t, c1.RunTime.Nanoseconds(), int64(300))
+}
+
+func TestCommonAnalysisResultMetadata(t *testing.T) {
+	c1 := &CommonAnalysisResult{
+		BeginTime: 1513620635, EndTime: 1513720635, CommitsNumber: 1, RunTime: 100 * 1e6}
+	meta := &pb.Metadata{}
+	c1 = MetadataToCommonAnalysisResult(c1.FillMetadata(meta))
+	assert.Equal(t, c1.BeginTimeAsTime().Unix(), int64(1513620635))
+	assert.Equal(t, c1.EndTimeAsTime().Unix(), int64(1513720635))
+	assert.Equal(t, c1.CommitsNumber, 1)
+	assert.Equal(t, c1.RunTime.Nanoseconds(), int64(100*1e6))
+}
+
 func init() {
 	cwd, err := os.Getwd()
 	if err == nil {