Browse Source

Add the goroutine pool support

Vadim Markovtsev 7 năm trước cách đây
mục cha
commit
4d524ba6c0
3 tập tin đã thay đổi với 108 bổ sung24 xóa
  1. 1 1
      .travis.yml
  2. 106 22
      uast.go
  3. 1 1
      uast_test.go

+ 1 - 1
.travis.yml

@@ -25,7 +25,7 @@ before_install:
   - docker run -d --privileged -p 9432:9432 --name bblfsh bblfsh/server
   - git clone https://github.com/bblfsh/libuast
   - cd libuast && cmake -DCMAKE_BUILD_TYPE=Release . && make && ln -s src libuast && cd ..
-  - export CGO_CFLAGS="-I$(pwd)/libuast" && export CGO_LDFLAGS="-L$(pwd)/libuast/lib -Wl,-rpath -Wl,$(pwd)/libuast/lib"
+  - export CGO_CFLAGS="-I$(pwd)/libuast -I$(pwd)/libuast/libuast" && export CGO_LDFLAGS="-luast -L$(pwd)/libuast/lib -Wl,-rpath -Wl,$(pwd)/libuast/lib"
   
 script:
   - go test -v -cpu=1,2 -coverprofile=coverage.txt -covermode=count gopkg.in/src-d/hercules.v2

+ 106 - 22
uast.go

@@ -2,22 +2,53 @@ package hercules
 
 import (
 	"context"
+	"errors"
+	"runtime"
+	"strings"
+	"sync"
 
+	"github.com/jeffail/tunny"
+	"gopkg.in/bblfsh/client-go.v0"
+	"gopkg.in/bblfsh/sdk.v0/protocol"
+	"gopkg.in/bblfsh/sdk.v0/uast"
 	"gopkg.in/src-d/go-git.v4"
 	"gopkg.in/src-d/go-git.v4/plumbing"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
 	"gopkg.in/src-d/go-git.v4/utils/merkletrie"
-	"gopkg.in/bblfsh/client-go.v0"
-	"github.com/bblfsh/sdk/uast"
-	"github.com/bblfsh/sdk/protocol"
-	"errors"
-	"strings"
 )
 
 type UASTExtractor struct {
-    Endpoint string
-	Context func() context.Context
-	client *bblfsh.BblfshClient
+	Endpoint string
+	Context  func() context.Context
+	PoolSize int
+
+	clients []*bblfsh.BblfshClient
+	pool   *tunny.WorkPool
+}
+
+type uastTask struct {
+	Client *bblfsh.BblfshClient
+	Lock   *sync.RWMutex
+	Dest   map[string]*uast.Node
+	Name   string
+	File   *object.File
+	Errors *[]error
+	Status chan int
+}
+
+type worker struct {
+	Client *bblfsh.BblfshClient
+	Job func(interface{}) interface{}
+}
+
+func (w worker) TunnyReady() bool {
+	return true
+}
+
+func (w worker) TunnyJob(data interface{}) interface{} {
+	task := data.(uastTask)
+	task.Client = w.Client
+	return w.Job(task)
 }
 
 func (exr *UASTExtractor) Name() string {
@@ -35,20 +66,50 @@ func (exr *UASTExtractor) Requires() []string {
 }
 
 func (exr *UASTExtractor) Initialize(repository *git.Repository) {
-	client, err := bblfsh.NewBblfshClient(exr.Endpoint)
-	if err != nil {
-		panic(err)
-	}
-	exr.client = client
 	if exr.Context == nil {
 		exr.Context = func() context.Context { return context.Background() }
 	}
+	poolSize := exr.PoolSize
+	if poolSize == 0 {
+		poolSize = runtime.NumCPU()
+	}
+	var err error
+	exr.clients = make([]*bblfsh.BblfshClient, poolSize)
+	for i := 0; i < poolSize; i++ {
+		client, err := bblfsh.NewBblfshClient(exr.Endpoint)
+		if err != nil {
+			panic(err)
+		}
+		exr.clients[i] = client
+	}
+	if exr.pool != nil {
+		exr.pool.Close()
+	}
+	workers := make([]tunny.TunnyWorker, poolSize)
+	for i := 0; i < poolSize; i++ {
+		workers[i] = worker{Client: exr.clients[i], Job: exr.extractTask}
+	}
+	exr.pool, err = tunny.CreateCustomPool(workers).Open()
+	if err != nil {
+		panic(err)
+	}
 }
 
 func (exr *UASTExtractor) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 	cache := deps["blob_cache"].(map[plumbing.Hash]*object.Blob)
 	treeDiffs := deps["changes"].(object.Changes)
 	uasts := map[string]*uast.Node{}
+	lock := sync.RWMutex{}
+	errs := make([]error, 0)
+	status := make(chan int)
+	pending := 0
+	submit := func(change *object.Change) {
+		pending++
+		exr.pool.SendWorkAsync(uastTask{
+			Lock: &lock, Dest: uasts, Name: change.To.Name,
+			File:   &object.File{Name: change.To.Name, Blob: *cache[change.To.TreeEntry.Hash]},
+			Errors: &errs, Status: status}, nil)
+	}
 	for _, change := range treeDiffs {
 		action, err := change.Action()
 		if err != nil {
@@ -56,17 +117,22 @@ func (exr *UASTExtractor) Consume(deps map[string]interface{}) (map[string]inter
 		}
 		switch action {
 		case merkletrie.Insert:
-			uasts[change.To.Name], err = exr.extractUAST(&object.File{
-				Name: change.To.Name, Blob: *cache[change.To.TreeEntry.Hash]})
+			submit(change)
 		case merkletrie.Delete:
 			continue
 		case merkletrie.Modify:
-			uasts[change.To.Name], err = exr.extractUAST(&object.File{
-				Name: change.To.Name, Blob: *cache[change.To.TreeEntry.Hash]})
+			submit(change)
 		}
-		if err != nil {
-			return nil, err
+	}
+	for i := 0; i < pending; i++ {
+		_ = <-status
+	}
+	if len(errs) > 0 {
+		msgs := make([]string, len(errs))
+		for i, err := range errs {
+			msgs[i] = err.Error()
 		}
+		return nil, errors.New(strings.Join(msgs, "\n"))
 	}
 	return map[string]interface{}{"uasts": uasts}, nil
 }
@@ -75,8 +141,9 @@ func (exr *UASTExtractor) Finalize() interface{} {
 	return nil
 }
 
-func (exr *UASTExtractor) extractUAST(file *object.File) (*uast.Node, error) {
-	request := exr.client.NewParseRequest()
+func (exr *UASTExtractor) extractUAST(
+		client *bblfsh.BblfshClient, file *object.File) (*uast.Node, error) {
+	request := client.NewParseRequest()
 	contents, err := file.Contents()
 	if err != nil {
 		return nil, err
@@ -84,7 +151,10 @@ func (exr *UASTExtractor) extractUAST(file *object.File) (*uast.Node, error) {
 	request.Content(contents)
 	request.Filename(file.Name)
 	response, err := request.DoWithContext(exr.Context())
-    if response.Status != protocol.Ok {
+	if err != nil {
+		return nil, err
+	}
+	if response.Status != protocol.Ok {
 		return nil, errors.New(strings.Join(response.Errors, "\n"))
 	}
 	if err != nil {
@@ -92,3 +162,17 @@ func (exr *UASTExtractor) extractUAST(file *object.File) (*uast.Node, error) {
 	}
 	return response.UAST, nil
 }
+
+func (exr *UASTExtractor) extractTask(data interface{}) interface{} {
+	task := data.(uastTask)
+	defer func() { task.Status <- 0 }()
+	node, err := exr.extractUAST(task.Client, task.File)
+	task.Lock.Lock()
+	defer task.Lock.Unlock()
+	if err != nil {
+		*task.Errors = append(*task.Errors, errors.New(task.Name+": "+err.Error()))
+		return nil
+	}
+	task.Dest[task.Name] = node
+	return nil
+}

+ 1 - 1
uast_test.go

@@ -6,7 +6,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
 	"gopkg.in/src-d/go-git.v4/plumbing"
-	"github.com/bblfsh/sdk/uast"
+	"gopkg.in/bblfsh/sdk.v0/uast"
 )
 
 func fixtureUASTExtractor() *UASTExtractor {