pipeline.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package hercules
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "bufio"
  8. "gopkg.in/src-d/go-git.v4"
  9. "gopkg.in/src-d/go-git.v4/plumbing"
  10. "gopkg.in/src-d/go-git.v4/plumbing/object"
  11. "gopkg.in/src-d/hercules.v2/toposort"
  12. )
  13. type PipelineItem interface {
  14. // Name returns the name of the analysis.
  15. Name() string
  16. // Provides returns the list of keys of reusable calculated entities.
  17. // Other items may depend on them.
  18. Provides() []string
  19. // Requires returns the list of keys of needed entities which must be supplied in Consume().
  20. Requires() []string
  21. // Initialize prepares and resets the item. Consume() requires Initialize()
  22. // to be called at least once beforehand.
  23. Initialize(*git.Repository)
  24. // Consume processes the next commit.
  25. // deps contains the required entities which match Depends(). Besides, it always includes
  26. // "commit" and "index".
  27. // Returns the calculated entities which match Provides().
  28. Consume(deps map[string]interface{}) (map[string]interface{}, error)
  29. // Finalize returns the result of the analysis.
  30. Finalize() interface{}
  31. }
  32. type Pipeline struct {
  33. // OnProgress is the callback which is invoked in Analyse() to output it's
  34. // progress. The first argument is the number of processed commits and the
  35. // second is the total number of commits.
  36. OnProgress func(int, int)
  37. // repository points to the analysed Git repository struct from go-git.
  38. repository *git.Repository
  39. // items are the registered analysers in the pipeline.
  40. items []PipelineItem
  41. // plan is the resolved execution sequence.
  42. plan []PipelineItem
  43. }
  44. func NewPipeline(repository *git.Repository) *Pipeline {
  45. return &Pipeline{repository: repository, items: []PipelineItem{}, plan: []PipelineItem{}}
  46. }
  47. func (pipeline *Pipeline) AddItem(item PipelineItem) {
  48. for _, reg := range pipeline.items {
  49. if reg == item {
  50. return
  51. }
  52. }
  53. pipeline.items = append(pipeline.items, item)
  54. }
  55. func (pipeline *Pipeline) RemoveItem(item PipelineItem) {
  56. for i, reg := range pipeline.items {
  57. if reg == item {
  58. pipeline.items = append(pipeline.items[:i], pipeline.items[i+1:]...)
  59. return
  60. }
  61. }
  62. }
  63. // Commits returns the critical path in the repository's history. It starts
  64. // from HEAD and traces commits backwards till the root. When it encounters
  65. // a merge (more than one parent), it always chooses the first parent.
  66. func (pipeline *Pipeline) Commits() []*object.Commit {
  67. result := []*object.Commit{}
  68. repository := pipeline.repository
  69. head, err := repository.Head()
  70. if err != nil {
  71. panic(err)
  72. }
  73. commit, err := repository.CommitObject(head.Hash())
  74. if err != nil {
  75. panic(err)
  76. }
  77. // the first parent matches the head
  78. for ; err != io.EOF; commit, err = commit.Parents().Next() {
  79. if err != nil {
  80. panic(err)
  81. }
  82. result = append(result, commit)
  83. }
  84. // reverse the order
  85. for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
  86. result[i], result[j] = result[j], result[i]
  87. }
  88. return result
  89. }
  90. func (pipeline *Pipeline) Initialize() {
  91. graph := toposort.NewGraph()
  92. name2item := map[string]PipelineItem{}
  93. for index, item := range pipeline.items {
  94. name := fmt.Sprintf("%s_%d", item.Name(), index)
  95. graph.AddNode(name)
  96. name2item[name] = item
  97. for _, key := range item.Provides() {
  98. key += "_entity"
  99. graph.AddNode(key)
  100. graph.AddEdge(name, key)
  101. }
  102. }
  103. for index, item := range pipeline.items {
  104. name := fmt.Sprintf("%s_%d", item.Name(), index)
  105. for _, key := range item.Requires() {
  106. key += "_entity"
  107. if !graph.AddEdge(key, name) {
  108. panic(fmt.Sprintf("Unsatisfied dependency: %s -> %s", key, item.Name()))
  109. }
  110. }
  111. }
  112. strplan, ok := graph.Toposort()
  113. if !ok {
  114. panic("Failed to resolve pipeline dependencies.")
  115. }
  116. for _, key := range strplan {
  117. item, ok := name2item[key]
  118. if ok {
  119. pipeline.plan = append(pipeline.plan, item)
  120. }
  121. }
  122. if len(pipeline.plan) != len(pipeline.items) {
  123. panic("Internal pipeline dependency resolution error.")
  124. }
  125. for _, item := range pipeline.items {
  126. item.Initialize(pipeline.repository)
  127. }
  128. }
  129. // Run executes the pipeline.
  130. //
  131. // commits is a slice with the sequential commit history. It shall start from
  132. // the root (ascending order).
  133. func (pipeline *Pipeline) Run(commits []*object.Commit) (map[PipelineItem]interface{}, error) {
  134. onProgress := pipeline.OnProgress
  135. if onProgress == nil {
  136. onProgress = func(int, int) {}
  137. }
  138. for index, commit := range commits {
  139. onProgress(index, len(commits))
  140. state := map[string]interface{}{"commit": commit, "index": index}
  141. for _, item := range pipeline.plan {
  142. update, err := item.Consume(state)
  143. if err != nil {
  144. fmt.Fprintf(os.Stderr, "%s failed on commit #%d %s\n",
  145. item.Name(), index, commit.Hash.String())
  146. return nil, err
  147. }
  148. for _, key := range item.Provides() {
  149. val, ok := update[key]
  150. if !ok {
  151. panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
  152. }
  153. state[key] = val
  154. }
  155. }
  156. }
  157. onProgress(len(commits), len(commits))
  158. result := map[PipelineItem]interface{}{}
  159. for _, item := range pipeline.items {
  160. result[item] = item.Finalize()
  161. }
  162. return result, nil
  163. }
  164. func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Commit, error) {
  165. var file io.ReadCloser
  166. if path != "-" {
  167. var err error
  168. file, err = os.Open(path)
  169. if err != nil {
  170. return nil, err
  171. }
  172. defer file.Close()
  173. } else {
  174. file = os.Stdin
  175. }
  176. scanner := bufio.NewScanner(file)
  177. commits := []*object.Commit{}
  178. for scanner.Scan() {
  179. hash := plumbing.NewHash(scanner.Text())
  180. if len(hash) != 20 {
  181. return nil, errors.New("invalid commit hash " + scanner.Text())
  182. }
  183. commit, err := repository.CommitObject(hash)
  184. if err != nil {
  185. return nil, err
  186. }
  187. commits = append(commits, commit)
  188. }
  189. return commits, nil
  190. }