Kaynağa Gözat

Add CommonAnalysisResults

Vadim Markovtsev 7 yıl önce
ebeveyn
işleme
fc842301d8
7 değiştirilmiş dosya ile 85 ekleme ve 43 silme
  1. 1 0
      .travis.yml
  2. 22 20
      cmd/hercules/main.go
  3. 6 6
      doc.go
  4. 4 0
      identity_test.go
  5. 4 2
      pb/pb.proto
  6. 40 13
      pipeline.go
  7. 8 2
      pipeline_test.go

+ 1 - 0
.travis.yml

@@ -33,6 +33,7 @@ before_install:
 
 install:
   - make dependencies
+  - git clone --depth 1 https://github.com/src-d/go-git $GOPATH/src/gopkg.in/src-d/go-git.v4
   - go get -t -v -ldflags "-X gopkg.in/src-d/hercules.v3.GIT_HASH=$(git rev-parse HEAD)" ./...
   - pip3 install --user -r requirements.txt tensorflow
   - docker run -d --privileged -p 9432:9432 --name bblfshd bblfsh/bblfshd

+ 22 - 20
cmd/hercules/main.go

@@ -221,10 +221,11 @@ func main() {
 		}
 	}
 	facts["commits"] = commits
-	deployed := []hercules.PipelineItem{}
+	deployed := []hercules.LeafPipelineItem{}
 	for name, valPtr := range deployChoices {
 		if *valPtr {
-			deployed = append(deployed, pipeline.DeployItem(hercules.Registry.Summon(name)[0]))
+			item := pipeline.DeployItem(hercules.Registry.Summon(name)[0])
+			deployed = append(deployed, item.(hercules.LeafPipelineItem))
 		}
 	}
 	pipeline.Initialize(facts)
@@ -243,12 +244,10 @@ func main() {
 		}
 		fmt.Fprint(os.Stderr, "writing...\r")
 	}
-	begin := commits[0].Author.When.Unix()
-	end := commits[len(commits)-1].Author.When.Unix()
 	if !protobuf {
-		printResults(uri, begin, end, len(commits), deployed, results)
+		printResults(uri, deployed, results)
 	} else {
-		protobufResults(uri, begin, end, len(commits), deployed, results)
+		protobufResults(uri, deployed, results)
 	}
 	if !disableStatus {
 		fmt.Fprint(os.Stderr, "\033[K")
@@ -256,37 +255,41 @@ func main() {
 }
 
 func printResults(
-	uri string, begin, end int64, commitsCount int, deployed []hercules.PipelineItem,
-	results map[hercules.PipelineItem]interface{}) {
+	uri string, deployed []hercules.LeafPipelineItem,
+	results map[hercules.LeafPipelineItem]interface{}) {
+	commonResult := results[nil].(hercules.CommonAnalysisResult)
+
 	fmt.Println("hercules:")
 	fmt.Println("  version: 3")
 	fmt.Println("  hash:", hercules.GIT_HASH)
 	fmt.Println("  repository:", uri)
-	fmt.Println("  begin_unix_time:", begin)
-	fmt.Println("  end_unix_time:", end)
-	fmt.Println("  commits:", commitsCount)
+	fmt.Println("  begin_unix_time:", commonResult.BeginTime)
+	fmt.Println("  end_unix_time:", commonResult.EndTime)
+	fmt.Println("  commits:", commonResult.CommitsNumber)
+	fmt.Println("  run_time:", commonResult.RunTime.Nanoseconds()/1e6)
 
 	for _, item := range deployed {
 		result := results[item]
 		fmt.Printf("%s:\n", item.Name())
-		err := interface{}(item).(hercules.LeafPipelineItem).Serialize(result, false, os.Stdout)
-		if err != nil {
+		if err := item.Serialize(result, false, os.Stdout); err != nil {
 			panic(err)
 		}
 	}
 }
 
 func protobufResults(
-	uri string, begin, end int64, commitsCount int, deployed []hercules.PipelineItem,
-	results map[hercules.PipelineItem]interface{}) {
+	uri string, deployed []hercules.LeafPipelineItem,
+	results map[hercules.LeafPipelineItem]interface{}) {
+	commonResult := results[nil].(hercules.CommonAnalysisResult)
 
 	header := pb.Metadata{
 		Version:       1,
 		Hash:          hercules.GIT_HASH,
 		Repository:    uri,
-		BeginUnixTime: begin,
-		EndUnixTime:   end,
-		Commits:       int32(commitsCount),
+		BeginUnixTime: commonResult.BeginTime,
+		EndUnixTime:   commonResult.EndTime,
+		Commits:       int32(commonResult.CommitsNumber),
+		RunTime:       commonResult.RunTime.Nanoseconds() / 1e6,
 	}
 
 	message := pb.AnalysisResults{
@@ -297,8 +300,7 @@ func protobufResults(
 	for _, item := range deployed {
 		result := results[item]
 		buffer := &bytes.Buffer{}
-		err := interface{}(item).(hercules.LeafPipelineItem).Serialize(result, true, buffer)
-		if err != nil {
+		if err := item.Serialize(result, true, buffer); err != nil {
 			panic(err)
 		}
 		message.Contents[item.Name()] = buffer.Bytes()

+ 6 - 6
doc.go

@@ -15,21 +15,21 @@ The typical API usage is to initialize the Pipeline class:
 
   import "gopkg.in/src-d/go-git.v4"
 
-	var repository *git.Repository
-	// ...initialize repository...
-	pipeline := hercules.NewPipeline(repository)
+  var repository *git.Repository
+  // ...initialize repository...
+  pipeline := hercules.NewPipeline(repository)
 
 Then add the required analysis:
 
   ba := pipeline.DeployItem(&hercules.BurndownAnalysis{
     Granularity:  30,
-		Sampling:     30,
-  })
+    Sampling:     30,
+  }).(hercules.LeafPipelineItem)
 
 This call will add all the needed intermediate pipeline items. Then link and execute the analysis tree:
 
   pipeline.Initialize(nil)
-	result, err := pipeline.Run(pipeline.Commits())
+  result, err := pipeline.Run(pipeline.Commits())
 
 Finally extract the result:
 

+ 4 - 0
identity_test.go

@@ -306,6 +306,10 @@ func (strr fakeEncodedObjectStorer) NewEncodedObject() plumbing.EncodedObject {
 	return nil
 }
 
+func (strr fakeEncodedObjectStorer) HasEncodedObject(plumbing.Hash) error {
+	return nil
+}
+
 func (strr fakeEncodedObjectStorer) SetEncodedObject(plumbing.EncodedObject) (plumbing.Hash, error) {
 	return plumbing.NewHash("0000000000000000000000000000000000000000"), nil
 }

+ 4 - 2
pb/pb.proto

@@ -7,12 +7,14 @@ message Metadata {
     string hash = 2;
     // repository's name
     string repository = 3;
-    // timestamp of the first analysed commit
+    // UNIX timestamp of the first analysed commit
     int64 begin_unix_time = 4;
-    // timestamp of the last analysed commit
+    // UNIX timestamp of the last analysed commit
     int64 end_unix_time = 5;
     // number of processed commits
     int32 commits = 6;
+    // duration of the analysis in milliseconds
+    int64 run_time = 7;
 }
 
 message BurndownSparseMatrixRow {

+ 40 - 13
pipeline.go

@@ -12,6 +12,7 @@ import (
 	"reflect"
 	"sort"
 	"strings"
+	"time"
 	"unsafe"
 
 	"gopkg.in/src-d/go-git.v4"
@@ -89,10 +90,31 @@ type LeafPipelineItem interface {
 	Flag() string
 	// Finalize returns the result of the analysis.
 	Finalize() interface{}
-	// Serialize encodes the object returned by Finalize() to Text or Protocol Buffers.
+	// Serialize encodes the object returned by Finalize() to YAML or Protocol Buffers.
 	Serialize(result interface{}, binary bool, writer io.Writer) error
 }
 
+// MergeablePipelineItem specifies the methods to combine several analysis results together.
+type MergeablePipelineItem interface {
+	LeafPipelineItem
+	// Deserialize loads the result from Protocol Buffers blob.
+	Deserialize(pbmessage []byte) (interface{}, error)
+	// MergeResults joins two results together.
+	MergeResults(r1, r2 interface{}, c1, c2 CommonAnalysisResult) interface{}
+}
+
+// CommonAnalysisResult holds the information which is always extracted at Pipeline.Run().
+type CommonAnalysisResult struct {
+	// Time of the first commit in the analysed sequence.
+	BeginTime int64
+	// Time of the last commit in the analysed sequence.
+	EndTime int64
+	// The number of commits in the analysed sequence.
+	CommitsNumber int
+	// The duration of Pipeline.Run().
+	RunTime time.Duration
+}
+
 // PipelineItemRegistry contains all the known PipelineItem-s.
 type PipelineItemRegistry struct {
 	provided   map[string][]reflect.Type
@@ -104,7 +126,7 @@ type PipelineItemRegistry struct {
 func (registry *PipelineItemRegistry) Register(example PipelineItem) {
 	t := reflect.TypeOf(example)
 	registry.registered[example.Name()] = t
-	if fpi, ok := interface{}(example).(LeafPipelineItem); ok {
+	if fpi, ok := example.(LeafPipelineItem); ok {
 		registry.flags[fpi.Flag()] = t
 	}
 	for _, dep := range example.Provides() {
@@ -224,11 +246,6 @@ var Registry = &PipelineItemRegistry{
 	flags:      map[string]reflect.Type{},
 }
 
-type wrappedPipelineItem struct {
-	Item     PipelineItem
-	Children []wrappedPipelineItem
-}
-
 type Pipeline struct {
 	// OnProgress is the callback which is invoked in Analyse() to output it's
 	// progress. The first argument is the number of processed commits and the
@@ -300,7 +317,7 @@ func (pipeline *Pipeline) DeployItem(item PipelineItem) PipelineItem {
 				if _, exists := added[sibling.Name()]; !exists {
 					disabled := false
 					// If this item supports features, check them against the activated in pipeline.features
-					if fpi, matches := interface{}(sibling).(FeaturedPipelineItem); matches {
+					if fpi, matches := sibling.(FeaturedPipelineItem); matches {
 						for _, feature := range fpi.Features() {
 							if !pipeline.features[feature] {
 								disabled = true
@@ -525,11 +542,15 @@ func (pipeline *Pipeline) Initialize(facts map[string]interface{}) {
 	}
 }
 
-// Run executes the pipeline.
+// Run method executes the pipeline.
 //
 // commits is a slice with the sequential commit history. It shall start from
 // the root (ascending order).
-func (pipeline *Pipeline) Run(commits []*object.Commit) (map[PipelineItem]interface{}, error) {
+//
+// Returns the mapping from each LeafPipelineItem to the corresponding analysis result.
+// There is always a "nil" record with CommonAnalysisResult.
+func (pipeline *Pipeline) Run(commits []*object.Commit) (map[LeafPipelineItem]interface{}, error) {
+	startRunTime := time.Now()
 	onProgress := pipeline.OnProgress
 	if onProgress == nil {
 		onProgress = func(int, int) {}
@@ -555,12 +576,18 @@ func (pipeline *Pipeline) Run(commits []*object.Commit) (map[PipelineItem]interf
 		}
 	}
 	onProgress(len(commits), len(commits))
-	result := map[PipelineItem]interface{}{}
+	result := map[LeafPipelineItem]interface{}{}
 	for _, item := range pipeline.items {
-		if fpi, ok := interface{}(item).(LeafPipelineItem); ok {
-			result[item] = fpi.Finalize()
+		if casted, ok := item.(LeafPipelineItem); ok {
+			result[casted] = casted.Finalize()
 		}
 	}
+	result[nil] = CommonAnalysisResult{
+		BeginTime:     commits[0].Author.When.Unix(),
+		EndTime:       commits[len(commits)-1].Author.When.Unix(),
+		CommitsNumber: len(commits),
+		RunTime:       time.Since(startRunTime),
+	}
 	return result, nil
 }
 

+ 8 - 2
pipeline_test.go

@@ -215,14 +215,20 @@ func TestPipelineRun(t *testing.T) {
 		"af9ddc0db70f09f3f27b4b98e415592a7485171c"))
 	result, err := pipeline.Run(commits)
 	assert.Nil(t, err)
+	assert.Equal(t, 2, len(result))
 	assert.Equal(t, item, result[item].(*testPipelineItem))
+	common := result[nil].(CommonAnalysisResult)
+	assert.Equal(t, common.BeginTime, int64(1481719092))
+	assert.Equal(t, common.EndTime, int64(1481719092))
+	assert.Equal(t, common.CommitsNumber, 1)
+	assert.True(t, common.RunTime.Nanoseconds() / 1e6 < 100)
 	assert.True(t, item.DepsConsumed)
 	assert.True(t, item.CommitMatches)
 	assert.True(t, item.IndexMatches)
 	pipeline.RemoveItem(item)
 	result, err = pipeline.Run(commits)
 	assert.Nil(t, err)
-	assert.Equal(t, 0, len(result))
+	assert.Equal(t, 1, len(result))
 }
 
 func TestPipelineOnProgress(t *testing.T) {
@@ -244,7 +250,7 @@ func TestPipelineOnProgress(t *testing.T) {
 		"af9ddc0db70f09f3f27b4b98e415592a7485171c"))
 	result, err := pipeline.Run(commits)
 	assert.Nil(t, err)
-	assert.Equal(t, 0, len(result))
+	assert.Equal(t, 1, len(result))
 	assert.True(t, progressOk1)
 	assert.True(t, progressOk2)
 }