pipeline.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package hercules
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "bufio"
  7. "gopkg.in/src-d/go-git.v4"
  8. "gopkg.in/src-d/go-git.v4/plumbing"
  9. "gopkg.in/src-d/go-git.v4/plumbing/object"
  10. "gopkg.in/src-d/hercules.v1/toposort"
  11. )
  12. type PipelineItem interface {
  13. // Name returns the name of the analysis.
  14. Name() string
  15. // Provides returns the list of keys of reusable calculated entities.
  16. // Other items may depend on them.
  17. Provides() []string
  18. // Requires returns the list of keys of needed entities which must be supplied in Consume().
  19. Requires() []string
  20. // Initialize prepares and resets the item. Consume() requires Initialize()
  21. // to be called at least once beforehand.
  22. Initialize(*git.Repository)
  23. // Consume processes the next commit.
  24. // deps contains the required entities which match Depends(). Besides, it always includes
  25. // "commit" and "index".
  26. // Returns the calculated entities which match Provides().
  27. Consume(deps map[string]interface{}) (map[string]interface{}, error)
  28. // Finalize returns the result of the analysis.
  29. Finalize() interface{}
  30. }
  31. type Pipeline struct {
  32. // OnProgress is the callback which is invoked in Analyse() to output it's
  33. // progress. The first argument is the number of processed commits and the
  34. // second is the total number of commits.
  35. OnProgress func(int, int)
  36. // repository points to the analysed Git repository struct from go-git.
  37. repository *git.Repository
  38. // items are the registered analysers in the pipeline.
  39. items []PipelineItem
  40. // plan is the resolved execution sequence.
  41. plan []PipelineItem
  42. }
  43. func NewPipeline(repository *git.Repository) *Pipeline {
  44. return &Pipeline{repository: repository, items: []PipelineItem{}, plan: []PipelineItem{}}
  45. }
  46. func (pipeline *Pipeline) AddItem(item PipelineItem) {
  47. for _, reg := range pipeline.items {
  48. if reg == item {
  49. return
  50. }
  51. }
  52. pipeline.items = append(pipeline.items, item)
  53. }
  54. func (pipeline *Pipeline) RemoveItem(item PipelineItem) {
  55. for i, reg := range pipeline.items {
  56. if reg == item {
  57. pipeline.items = append(pipeline.items[:i], pipeline.items[i+1:]...)
  58. return
  59. }
  60. }
  61. }
  62. // Commits returns the critical path in the repository's history. It starts
  63. // from HEAD and traces commits backwards till the root. When it encounters
  64. // a merge (more than one parent), it always chooses the first parent.
  65. func (pipeline *Pipeline) Commits() []*object.Commit {
  66. result := []*object.Commit{}
  67. repository := pipeline.repository
  68. head, err := repository.Head()
  69. if err != nil {
  70. panic(err)
  71. }
  72. commit, err := repository.CommitObject(head.Hash())
  73. if err != nil {
  74. panic(err)
  75. }
  76. result = append(result, commit)
  77. for ; err != io.EOF; commit, err = commit.Parents().Next() {
  78. if err != nil {
  79. panic(err)
  80. }
  81. result = append(result, commit)
  82. }
  83. // reverse the order
  84. for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
  85. result[i], result[j] = result[j], result[i]
  86. }
  87. return result
  88. }
  89. func (pipeline *Pipeline) Initialize() {
  90. graph := toposort.NewGraph()
  91. name2item := map[string]PipelineItem{}
  92. for index, item := range pipeline.items {
  93. name := fmt.Sprintf("%s_%d", item.Name(), index)
  94. graph.AddNode(name)
  95. name2item[name] = item
  96. for _, key := range item.Provides() {
  97. key += "_entity"
  98. graph.AddNode(key)
  99. graph.AddEdge(name, key)
  100. }
  101. }
  102. for index, item := range pipeline.items {
  103. name := fmt.Sprintf("%s_%d", item.Name(), index)
  104. for _, key := range item.Requires() {
  105. key += "_entity"
  106. if !graph.AddEdge(key, name) {
  107. panic(fmt.Sprintf("Unsatisfied dependency: %s -> %s", key, item.Name()))
  108. }
  109. }
  110. }
  111. strplan, ok := graph.Toposort()
  112. if !ok {
  113. panic("Failed to resolve pipeline dependencies.")
  114. }
  115. for _, key := range strplan {
  116. item, ok := name2item[key]
  117. if ok {
  118. pipeline.plan = append(pipeline.plan, item)
  119. }
  120. }
  121. if len(pipeline.plan) != len(pipeline.items) {
  122. panic("Internal pipeline dependency resolution error.")
  123. }
  124. for _, item := range pipeline.items {
  125. item.Initialize(pipeline.repository)
  126. }
  127. }
  128. // Run executes the pipeline.
  129. //
  130. // commits is a slice with the sequential commit history. It shall start from
  131. // the root (ascending order).
  132. func (pipeline *Pipeline) Run(commits []*object.Commit) (map[PipelineItem]interface{}, error) {
  133. onProgress := pipeline.OnProgress
  134. if onProgress == nil {
  135. onProgress = func(int, int) {}
  136. }
  137. for index, commit := range commits {
  138. onProgress(index, len(commits))
  139. state := map[string]interface{}{"commit": commit, "index": index}
  140. for _, item := range pipeline.plan {
  141. update, err := item.Consume(state)
  142. if err != nil {
  143. fmt.Fprintf(os.Stderr, "%s failed on commit #%d %s\n",
  144. item.Name(), index, commit.Hash.String())
  145. return nil, err
  146. }
  147. for _, key := range item.Provides() {
  148. val, ok := update[key]
  149. if !ok {
  150. panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
  151. }
  152. state[key] = val
  153. }
  154. }
  155. }
  156. result := map[PipelineItem]interface{}{}
  157. for _, item := range pipeline.items {
  158. result[item] = item.Finalize()
  159. }
  160. return result, nil
  161. }
  162. func LoadCommitsFromFile(path string, repository *git.Repository) []*object.Commit {
  163. var file io.Reader
  164. if path != "-" {
  165. file, err := os.Open(path)
  166. if err != nil {
  167. panic(err)
  168. }
  169. defer file.Close()
  170. } else {
  171. file = os.Stdin
  172. }
  173. scanner := bufio.NewScanner(file)
  174. commits := []*object.Commit{}
  175. for scanner.Scan() {
  176. hash := plumbing.NewHash(scanner.Text())
  177. if len(hash) != 20 {
  178. panic("invalid commit hash " + scanner.Text())
  179. }
  180. commit, err := repository.CommitObject(hash)
  181. if err != nil {
  182. panic(err)
  183. }
  184. commits = append(commits, commit)
  185. }
  186. return commits
  187. }