123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package hercules
- import (
- "fmt"
- "io"
- "os"
- "bufio"
- "gopkg.in/src-d/go-git.v4"
- "gopkg.in/src-d/go-git.v4/plumbing"
- "gopkg.in/src-d/go-git.v4/plumbing/object"
- "gopkg.in/src-d/hercules.v1/toposort"
- )
- type PipelineItem interface {
- // Name returns the name of the analysis.
- Name() string
- // Provides returns the list of keys of reusable calculated entities.
- // Other items may depend on them.
- Provides() []string
- // Requires returns the list of keys of needed entities which must be supplied in Consume().
- Requires() []string
- // Initialize prepares and resets the item. Consume() requires Initialize()
- // to be called at least once beforehand.
- Initialize(*git.Repository)
- // Consume processes the next commit.
- // deps contains the required entities which match Depends(). Besides, it always includes
- // "commit" and "index".
- // Returns the calculated entities which match Provides().
- Consume(deps map[string]interface{}) (map[string]interface{}, error)
- // Finalize returns the result of the analysis.
- Finalize() interface{}
- }
- type Pipeline struct {
- // OnProgress is the callback which is invoked in Analyse() to output it's
- // progress. The first argument is the number of processed commits and the
- // second is the total number of commits.
- OnProgress func(int, int)
- // repository points to the analysed Git repository struct from go-git.
- repository *git.Repository
- // items are the registered analysers in the pipeline.
- items []PipelineItem
- // plan is the resolved execution sequence.
- plan []PipelineItem
- }
- func NewPipeline(repository *git.Repository) *Pipeline {
- return &Pipeline{repository: repository, items: []PipelineItem{}, plan: []PipelineItem{}}
- }
- func (pipeline *Pipeline) AddItem(item PipelineItem) {
- for _, reg := range pipeline.items {
- if reg == item {
- return
- }
- }
- pipeline.items = append(pipeline.items, item)
- }
- func (pipeline *Pipeline) RemoveItem(item PipelineItem) {
- for i, reg := range pipeline.items {
- if reg == item {
- pipeline.items = append(pipeline.items[:i], pipeline.items[i+1:]...)
- return
- }
- }
- }
- // Commits returns the critical path in the repository's history. It starts
- // from HEAD and traces commits backwards till the root. When it encounters
- // a merge (more than one parent), it always chooses the first parent.
- func (pipeline *Pipeline) Commits() []*object.Commit {
- result := []*object.Commit{}
- repository := pipeline.repository
- head, err := repository.Head()
- if err != nil {
- panic(err)
- }
- commit, err := repository.CommitObject(head.Hash())
- if err != nil {
- panic(err)
- }
- result = append(result, commit)
- for ; err != io.EOF; commit, err = commit.Parents().Next() {
- if err != nil {
- panic(err)
- }
- result = append(result, commit)
- }
- // reverse the order
- for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
- result[i], result[j] = result[j], result[i]
- }
- return result
- }
- func (pipeline *Pipeline) Initialize() {
- graph := toposort.NewGraph()
- name2item := map[string]PipelineItem{}
- for index, item := range pipeline.items {
- name := fmt.Sprintf("%s_%d", item.Name(), index)
- graph.AddNode(name)
- name2item[name] = item
- for _, key := range item.Provides() {
- key += "_entity"
- graph.AddNode(key)
- graph.AddEdge(name, key)
- }
- }
- for index, item := range pipeline.items {
- name := fmt.Sprintf("%s_%d", item.Name(), index)
- for _, key := range item.Requires() {
- key += "_entity"
- if !graph.AddEdge(key, name) {
- panic(fmt.Sprintf("Unsatisfied dependency: %s -> %s", key, item.Name()))
- }
- }
- }
- strplan, ok := graph.Toposort()
- if !ok {
- panic("Failed to resolve pipeline dependencies.")
- }
- for _, key := range strplan {
- item, ok := name2item[key]
- if ok {
- pipeline.plan = append(pipeline.plan, item)
- }
- }
- if len(pipeline.plan) != len(pipeline.items) {
- panic("Internal pipeline dependency resolution error.")
- }
- for _, item := range pipeline.items {
- item.Initialize(pipeline.repository)
- }
- }
- // Run executes the pipeline.
- //
- // commits is a slice with the sequential commit history. It shall start from
- // the root (ascending order).
- func (pipeline *Pipeline) Run(commits []*object.Commit) (map[PipelineItem]interface{}, error) {
- onProgress := pipeline.OnProgress
- if onProgress == nil {
- onProgress = func(int, int) {}
- }
- for index, commit := range commits {
- onProgress(index, len(commits))
- state := map[string]interface{}{"commit": commit, "index": index}
- for _, item := range pipeline.plan {
- update, err := item.Consume(state)
- if err != nil {
- fmt.Fprintf(os.Stderr, "%s failed on commit #%d %s\n",
- item.Name(), index, commit.Hash.String())
- return nil, err
- }
- for _, key := range item.Provides() {
- val, ok := update[key]
- if !ok {
- panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
- }
- state[key] = val
- }
- }
- }
- result := map[PipelineItem]interface{}{}
- for _, item := range pipeline.items {
- result[item] = item.Finalize()
- }
- return result, nil
- }
- func LoadCommitsFromFile(path string, repository *git.Repository) []*object.Commit {
- var file io.Reader
- if path != "-" {
- file, err := os.Open(path)
- if err != nil {
- panic(err)
- }
- defer file.Close()
- } else {
- file = os.Stdin
- }
- scanner := bufio.NewScanner(file)
- commits := []*object.Commit{}
- for scanner.Scan() {
- hash := plumbing.NewHash(scanner.Text())
- if len(hash) != 20 {
- panic("invalid commit hash " + scanner.Text())
- }
- commit, err := repository.CommitObject(hash)
- if err != nil {
- panic(err)
- }
- commits = append(commits, commit)
- }
- return commits
- }
|