浏览代码

Start implementing burndown merges

Vadim Markovtsev 7 年之前
父节点
当前提交
7e0587ae33
共有 4 个文件被更改,包括 123 次插入38 次删除
  1. 84 11
      burndown.go
  2. 19 18
      cmd/hercules-combine/main.go
  3. 19 8
      pipeline.go
  4. 1 1
      pipeline_test.go

+ 84 - 11
burndown.go

@@ -68,10 +68,13 @@ type BurndownAnalysis struct {
 }
 
 type BurndownResult struct {
-	GlobalHistory   [][]int64
-	FileHistories   map[string][][]int64
-	PeopleHistories [][][]int64
-	PeopleMatrix    [][]int64
+	GlobalHistory      [][]int64
+	FileHistories      map[string][][]int64
+	PeopleHistories    [][][]int64
+	PeopleMatrix       [][]int64
+	reversedPeopleDict []string
+	sampling           int
+	granularity        int
 }
 
 const (
@@ -243,10 +246,11 @@ func (analyser *BurndownAnalysis) Finalize() interface{} {
 		}
 	}
 	return BurndownResult{
-		GlobalHistory:   analyser.globalHistory,
-		FileHistories:   analyser.fileHistories,
-		PeopleHistories: analyser.peopleHistories,
-		PeopleMatrix:    peopleMatrix,
+		GlobalHistory:      analyser.globalHistory,
+		FileHistories:      analyser.fileHistories,
+		PeopleHistories:    analyser.peopleHistories,
+		PeopleMatrix:       peopleMatrix,
+		reversedPeopleDict: analyser.reversedPeopleDict,
 	}
 }
 
@@ -259,6 +263,75 @@ func (analyser *BurndownAnalysis) Serialize(result interface{}, binary bool, wri
 	return nil
 }
 
+func (analyser *BurndownAnalysis) Deserialize(pbmessage []byte) (interface{}, error) {
+	msg := pb.BurndownAnalysisResults{}
+	err := proto.Unmarshal(pbmessage, &msg)
+	if err != nil {
+		return nil, err
+	}
+	result := BurndownResult{}
+	convertCSR := func(mat *pb.BurndownSparseMatrix) [][]int64 {
+		res := make([][]int64, mat.NumberOfRows)
+		for i := 0; i < int(mat.NumberOfRows); i++ {
+			res[i] = make([]int64, mat.NumberOfColumns)
+			for j := 0; j < int(mat.NumberOfColumns); j++ {
+				res[i][j] = int64(mat.Rows[i].Columns[j])
+			}
+		}
+		return res
+	}
+	result.GlobalHistory = convertCSR(msg.Project)
+	result.FileHistories = map[string][][]int64{}
+	for _, mat := range msg.Files {
+		result.FileHistories[mat.Name] = convertCSR(mat)
+	}
+	result.reversedPeopleDict = make([]string, len(msg.People))
+	result.PeopleHistories = make([][][]int64, len(msg.People))
+	for i, mat := range msg.People {
+		result.PeopleHistories[i] = convertCSR(mat)
+		result.reversedPeopleDict[i] = mat.Name
+	}
+	result.PeopleMatrix = make([][]int64, msg.PeopleInteraction.NumberOfRows)
+	for i := 0; i < len(result.PeopleMatrix); i++ {
+		result.PeopleMatrix[i] = make([]int64, msg.PeopleInteraction.NumberOfColumns)
+		for j := int(msg.PeopleInteraction.Indptr[i]); j < int(msg.PeopleInteraction.Indptr[i+1]); j++ {
+			result.PeopleMatrix[i][msg.PeopleInteraction.Indices[j]] = msg.PeopleInteraction.Data[j]
+		}
+	}
+	result.sampling = int(msg.Sampling)
+	result.granularity = int(msg.Granularity)
+	return result, nil
+}
+
+func (analyser *BurndownAnalysis) MergeResults(
+	r1, r2 interface{}, c1, c2 *CommonAnalysisResult) interface{} {
+	bar1 := r1.(BurndownResult)
+	bar2 := r2.(BurndownResult)
+  merged := BurndownResult{}
+	if bar1.sampling < bar2.sampling {
+		merged.sampling = bar1.sampling
+	} else {
+		merged.sampling = bar2.sampling
+	}
+	if bar1.granularity < bar2.granularity {
+		merged.granularity = bar1.granularity
+	} else {
+		merged.granularity = bar2.granularity
+	}
+	people := make([]string, len(bar1.reversedPeopleDict))
+	copy(people, bar1.reversedPeopleDict)
+	merged.reversedPeopleDict = append(people, bar2.reversedPeopleDict...)
+	// interpolate to daily and sum
+	_ = bar1
+	_ = bar2
+	panic("not implemented")
+	// return merged
+}
+
+func interpolateMatrix(matrix [][]int64, granularity, sampling int, daily [][]int64, offset int) {
+
+}
+
 func (analyser *BurndownAnalysis) serializeText(result *BurndownResult, writer io.Writer) {
 	fmt.Fprintln(writer, "  granularity:", analyser.Granularity)
 	fmt.Fprintln(writer, "  sampling:", analyser.Sampling)
@@ -274,11 +347,11 @@ func (analyser *BurndownAnalysis) serializeText(result *BurndownResult, writer i
 	if len(result.PeopleHistories) > 0 {
 		fmt.Fprintln(writer, "  people_sequence:")
 		for key := range result.PeopleHistories {
-			fmt.Fprintln(writer, "    - "+yaml.SafeString(analyser.reversedPeopleDict[key]))
+			fmt.Fprintln(writer, "    - "+yaml.SafeString(result.reversedPeopleDict[key]))
 		}
 		fmt.Fprintln(writer, "  people:")
 		for key, val := range result.PeopleHistories {
-			yaml.PrintMatrix(writer, val, 4, analyser.reversedPeopleDict[key], true)
+			yaml.PrintMatrix(writer, val, 4, result.reversedPeopleDict[key], true)
 		}
 		fmt.Fprintln(writer, "  people_interaction: |-")
 		yaml.PrintMatrix(writer, result.PeopleMatrix, 4, "", false)
@@ -306,7 +379,7 @@ func (analyser *BurndownAnalysis) serializeBinary(result *BurndownResult, writer
 		message.People = make(
 			[]*pb.BurndownSparseMatrix, len(result.PeopleHistories))
 		for key, val := range result.PeopleHistories {
-			message.People[key] = pb.ToBurndownSparseMatrix(val, analyser.reversedPeopleDict[key])
+			message.People[key] = pb.ToBurndownSparseMatrix(val, result.reversedPeopleDict[key])
 		}
 		message.PeopleInteraction = pb.DenseToCompressedSparseRowMatrix(result.PeopleMatrix)
 	}

+ 19 - 18
cmd/hercules-combine/main.go

@@ -47,10 +47,10 @@ func main() {
 		return
 	}
 	mergedMessage := pb.AnalysisResults{
-		Header:   &pb.Metadata{
-			Version:       2,
-			Hash:          hercules.GIT_HASH,
-			Repository:    strings.Join(repos, " & "),
+		Header: &pb.Metadata{
+			Version:    2,
+			Hash:       hercules.GIT_HASH,
+			Repository: strings.Join(repos, " & "),
 		},
 		Contents: map[string][]byte{},
 	}
@@ -69,17 +69,17 @@ func main() {
 }
 
 func loadMessage(fileName string, repos *[]string) (
-		map[string]interface{}, *hercules.CommonAnalysisResult, []string) {
+	map[string]interface{}, *hercules.CommonAnalysisResult, []string) {
 	errs := []string{}
 	buffer, err := ioutil.ReadFile(fileName)
 	if err != nil {
-		errs = append(errs, "Cannot read " + fileName + ": " + err.Error())
+		errs = append(errs, "Cannot read "+fileName+": "+err.Error())
 		return nil, nil, errs
 	}
 	message := pb.AnalysisResults{}
 	err = proto.Unmarshal(buffer, &message)
 	if err != nil {
-		errs = append(errs, "Cannot parse " + fileName + ": " + err.Error())
+		errs = append(errs, "Cannot parse "+fileName+": "+err.Error())
 		return nil, nil, errs
 	}
 	*repos = append(*repos, message.Header.Repository)
@@ -87,17 +87,17 @@ func loadMessage(fileName string, repos *[]string) (
 	for key, val := range message.Contents {
 		summoned := hercules.Registry.Summon(key)
 		if len(summoned) == 0 {
-			errs = append(errs, fileName + ": item not found: " + key)
+			errs = append(errs, fileName+": item not found: "+key)
 			continue
 		}
 		mpi, ok := summoned[0].(hercules.MergeablePipelineItem)
 		if !ok {
-			errs = append(errs, fileName + ": " + key + ": MergeablePipelineItem is not implemented")
+			errs = append(errs, fileName+": "+key+": MergeablePipelineItem is not implemented")
 			continue
 		}
 		msg, err := mpi.Deserialize(val)
 		if err != nil {
-			errs = append(errs, fileName + ": deserialization failed: " + key + ": " + err.Error())
+			errs = append(errs, fileName+": deserialization failed: "+key+": "+err.Error())
 			continue
 		}
 		results[key] = msg
@@ -114,23 +114,23 @@ func printErrors(allErrors map[string][]string) {
 		}
 	}
 	if !needToPrintErrors {
-	 return
+		return
 	}
 	fmt.Fprintln(os.Stderr, "Errors:")
 	for key, errs := range allErrors {
 		if len(errs) > 0 {
-			fmt.Fprintln(os.Stderr, "  " + key)
+			fmt.Fprintln(os.Stderr, "  "+key)
 			for _, err := range errs {
-				fmt.Fprintln(os.Stderr, "    " + err)
+				fmt.Fprintln(os.Stderr, "    "+err)
 			}
 		}
 	}
 }
 
 func mergeResults(mergedResults map[string]interface{},
-		mergedCommons *hercules.CommonAnalysisResult,
-		anotherResults map[string]interface{},
-		anotherCommons *hercules.CommonAnalysisResult) {
+	mergedCommons *hercules.CommonAnalysisResult,
+	anotherResults map[string]interface{},
+	anotherCommons *hercules.CommonAnalysisResult) {
 	for key, val := range anotherResults {
 		mergedResult, exists := mergedResults[key]
 		if !exists {
@@ -138,11 +138,12 @@ func mergeResults(mergedResults map[string]interface{},
 			continue
 		}
 		item := hercules.Registry.Summon(key)[0].(hercules.MergeablePipelineItem)
-		mergedResult, *mergedCommons = item.MergeResults(
-			mergedResult, val, mergedCommons, anotherCommons)
+		mergedResult = item.MergeResults(mergedResult, val, mergedCommons, anotherCommons)
 		mergedResults[key] = mergedResult
 	}
 	if mergedCommons.CommitsNumber == 0 {
 		*mergedCommons = *anotherCommons
+	} else {
+		mergedCommons.Merge(anotherCommons)
 	}
 }

+ 19 - 8
pipeline.go

@@ -18,8 +18,8 @@ import (
 	"gopkg.in/src-d/go-git.v4"
 	"gopkg.in/src-d/go-git.v4/plumbing"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
-	"gopkg.in/src-d/hercules.v3/toposort"
 	"gopkg.in/src-d/hercules.v3/pb"
+	"gopkg.in/src-d/hercules.v3/toposort"
 )
 
 type ConfigurationOptionType int
@@ -101,7 +101,7 @@ type MergeablePipelineItem interface {
 	// Deserialize loads the result from Protocol Buffers blob.
 	Deserialize(pbmessage []byte) (interface{}, error)
 	// MergeResults joins two results together. Common-s are specified as the global state.
-	MergeResults(r1, r2 interface{}, c1, c2 *CommonAnalysisResult) (interface{}, CommonAnalysisResult)
+	MergeResults(r1, r2 interface{}, c1, c2 *CommonAnalysisResult) interface{}
 }
 
 // CommonAnalysisResult holds the information which is always extracted at Pipeline.Run().
@@ -123,13 +123,24 @@ func (car *CommonAnalysisResult) FillMetadata(meta *pb.Metadata) {
 	meta.RunTime = car.RunTime.Nanoseconds() / 1e6
 }
 
+func (car *CommonAnalysisResult) Merge(other *CommonAnalysisResult) {
+	if other.BeginTime < car.BeginTime {
+		car.BeginTime = other.BeginTime
+	}
+	if other.EndTime > car.EndTime {
+		car.EndTime = other.EndTime
+	}
+	car.CommitsNumber += other.CommitsNumber
+	car.RunTime += other.RunTime
+}
+
 func MetadataToCommonAnalysisResult(meta *pb.Metadata) *CommonAnalysisResult {
-  return &CommonAnalysisResult{
-	  BeginTime:     meta.BeginUnixTime,
-	  EndTime:       meta.EndUnixTime,
-	  CommitsNumber: int(meta.Commits),
-	  RunTime:       time.Duration(meta.RunTime * 1e6),
-  }
+	return &CommonAnalysisResult{
+		BeginTime:     meta.BeginUnixTime,
+		EndTime:       meta.EndUnixTime,
+		CommitsNumber: int(meta.Commits),
+		RunTime:       time.Duration(meta.RunTime * 1e6),
+	}
 }
 
 // PipelineItemRegistry contains all the known PipelineItem-s.

+ 1 - 1
pipeline_test.go

@@ -221,7 +221,7 @@ func TestPipelineRun(t *testing.T) {
 	assert.Equal(t, common.BeginTime, int64(1481719092))
 	assert.Equal(t, common.EndTime, int64(1481719092))
 	assert.Equal(t, common.CommitsNumber, 1)
-	assert.True(t, common.RunTime.Nanoseconds() / 1e6 < 100)
+	assert.True(t, common.RunTime.Nanoseconds()/1e6 < 100)
 	assert.True(t, item.DepsConsumed)
 	assert.True(t, item.CommitMatches)
 	assert.True(t, item.IndexMatches)