forks.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. package core
  2. import (
  3. "log"
  4. "reflect"
  5. "sort"
  6. "gopkg.in/src-d/go-git.v4/plumbing/object"
  7. "gopkg.in/src-d/go-git.v4/plumbing"
  8. "gopkg.in/src-d/hercules.v4/internal/toposort"
  9. )
  10. // OneShotMergeProcessor provides the convenience method to consume merges only once.
  11. type OneShotMergeProcessor struct {
  12. merges map[plumbing.Hash]bool
  13. }
  14. // Initialize resets OneShotMergeProcessor.
  15. func (proc *OneShotMergeProcessor) Initialize() {
  16. proc.merges = map[plumbing.Hash]bool{}
  17. }
  18. // ShouldConsumeCommit returns true on regular commits. It also returns true upon
  19. // the first occurrence of a particular merge commit.
  20. func (proc *OneShotMergeProcessor) ShouldConsumeCommit(deps map[string]interface{}) bool {
  21. commit := deps[DependencyCommit].(*object.Commit)
  22. if commit.NumParents() <= 1 {
  23. return true
  24. }
  25. if !proc.merges[commit.Hash] {
  26. proc.merges[commit.Hash] = true
  27. return true
  28. }
  29. return false
  30. }
  31. // IsMergeCommit indicates whether the commit is a merge or not.
  32. func IsMergeCommit(deps map[string]interface{}) bool {
  33. return deps[DependencyCommit].(*object.Commit).NumParents() > 1
  34. }
  35. // NoopMerger provides an empty Merge() method suitable for PipelineItem.
  36. type NoopMerger struct {
  37. }
  38. // Merge does nothing.
  39. func (merger *NoopMerger) Merge(branches []PipelineItem) {
  40. // no-op
  41. }
  42. // ForkSamePipelineItem clones items by referencing the same origin.
  43. func ForkSamePipelineItem(origin PipelineItem, n int) []PipelineItem {
  44. clones := make([]PipelineItem, n)
  45. for i := 0; i < n; i++ {
  46. clones[i] = origin
  47. }
  48. return clones
  49. }
  50. // ForkCopyPipelineItem clones items by copying them by value from the origin.
  51. func ForkCopyPipelineItem(origin PipelineItem, n int) []PipelineItem {
  52. originValue := reflect.Indirect(reflect.ValueOf(origin))
  53. originType := originValue.Type()
  54. clones := make([]PipelineItem, n)
  55. for i := 0; i < n; i++ {
  56. cloneValue := reflect.New(originType).Elem()
  57. cloneValue.Set(originValue)
  58. clones[i] = cloneValue.Addr().Interface().(PipelineItem)
  59. }
  60. return clones
  61. }
  62. const (
  63. // runActionCommit corresponds to a regular commit
  64. runActionCommit = 0
  65. // runActionFork splits a branch into several parts
  66. runActionFork = iota
  67. // runActionMerge merges several branches together
  68. runActionMerge = iota
  69. // runActionDelete removes the branch as it is no longer needed
  70. runActionDelete = iota
  71. )
  72. type runAction struct {
  73. Action int
  74. Commit *object.Commit
  75. Items []int
  76. }
  77. func cloneItems(origin []PipelineItem, n int) [][]PipelineItem {
  78. clones := make([][]PipelineItem, n)
  79. for j := 0; j < n; j++ {
  80. clones[j] = make([]PipelineItem, len(origin))
  81. }
  82. for i, item := range origin {
  83. itemClones := item.Fork(n)
  84. for j := 0; j < n; j++ {
  85. clones[j][i] = itemClones[j]
  86. }
  87. }
  88. return clones
  89. }
  90. func mergeItems(branches [][]PipelineItem) {
  91. buffer := make([]PipelineItem, len(branches) - 1)
  92. for i, item := range branches[0] {
  93. for j := 0; j < len(branches)-1; j++ {
  94. buffer[j] = branches[j+1][i]
  95. }
  96. item.Merge(buffer)
  97. }
  98. }
  99. // getMasterBranch returns the branch with the smallest index.
  100. func getMasterBranch(branches map[int][]PipelineItem) []PipelineItem {
  101. minKey := 1 << 31
  102. var minVal []PipelineItem
  103. for key, val := range branches {
  104. if key < minKey {
  105. minKey = key
  106. minVal = val
  107. }
  108. }
  109. return minVal
  110. }
  111. // prepareRunPlan schedules the actions for Pipeline.Run().
  112. func prepareRunPlan(commits []*object.Commit) []runAction {
  113. hashes, dag := buildDag(commits)
  114. leaveRootComponent(hashes, dag)
  115. numParents := bindNumParents(hashes, dag)
  116. mergedDag, mergedSeq := mergeDag(numParents, hashes, dag)
  117. orderNodes := bindOrderNodes(mergedDag)
  118. collapseFastForwards(orderNodes, hashes, mergedDag, dag, mergedSeq)
  119. /*fmt.Printf("digraph Hercules {\n")
  120. for i, c := range order {
  121. commit := hashes[c]
  122. fmt.Printf(" \"%s\"[label=\"[%d] %s\"]\n", commit.Hash.String(), i, commit.Hash.String()[:6])
  123. for _, child := range mergedDag[commit.Hash] {
  124. fmt.Printf(" \"%s\" -> \"%s\"\n", commit.Hash.String(), child.Hash.String())
  125. }
  126. }
  127. fmt.Printf("}\n")*/
  128. plan := generatePlan(orderNodes, numParents, hashes, mergedDag, dag, mergedSeq)
  129. plan = optimizePlan(plan)
  130. return plan
  131. }
  132. // buildDag generates the raw commit DAG and the commit hash map.
  133. func buildDag(commits []*object.Commit) (
  134. map[string]*object.Commit, map[plumbing.Hash][]*object.Commit) {
  135. hashes := map[string]*object.Commit{}
  136. for _, commit := range commits {
  137. hashes[commit.Hash.String()] = commit
  138. }
  139. dag := map[plumbing.Hash][]*object.Commit{}
  140. for _, commit := range commits {
  141. if _, exists := dag[commit.Hash]; !exists {
  142. dag[commit.Hash] = make([]*object.Commit, 0, 1)
  143. }
  144. for _, parent := range commit.ParentHashes {
  145. if _, exists := hashes[parent.String()]; !exists {
  146. continue
  147. }
  148. children := dag[parent]
  149. if children == nil {
  150. children = make([]*object.Commit, 0, 1)
  151. }
  152. dag[parent] = append(children, commit)
  153. }
  154. }
  155. return hashes, dag
  156. }
  157. // bindNumParents returns curried "numParents" function.
  158. func bindNumParents(
  159. hashes map[string]*object.Commit,
  160. dag map[plumbing.Hash][]*object.Commit) func(c *object.Commit) int {
  161. return func(c *object.Commit) int {
  162. r := 0
  163. for _, parent := range c.ParentHashes {
  164. if p, exists := hashes[parent.String()]; exists {
  165. for _, pc := range dag[p.Hash] {
  166. if pc.Hash == c.Hash {
  167. r++
  168. break
  169. }
  170. }
  171. }
  172. }
  173. return r
  174. }
  175. }
  176. // leaveRootComponent runs connected components analysis and throws away everything
  177. // but the part which grows from the root.
  178. func leaveRootComponent(
  179. hashes map[string]*object.Commit,
  180. dag map[plumbing.Hash][]*object.Commit) {
  181. visited := map[plumbing.Hash]bool{}
  182. var sets [][]plumbing.Hash
  183. for key := range dag {
  184. if visited[key] {
  185. continue
  186. }
  187. var set []plumbing.Hash
  188. for queue := []plumbing.Hash{key}; len(queue) > 0; {
  189. head := queue[len(queue)-1]
  190. queue = queue[:len(queue)-1]
  191. if visited[head] {
  192. continue
  193. }
  194. set = append(set, head)
  195. visited[head] = true
  196. for _, c := range dag[head] {
  197. if !visited[c.Hash] {
  198. queue = append(queue, c.Hash)
  199. }
  200. }
  201. if commit, exists := hashes[head.String()]; exists {
  202. for _, p := range commit.ParentHashes {
  203. if !visited[p] {
  204. if _, exists := hashes[p.String()]; exists {
  205. queue = append(queue, p)
  206. }
  207. }
  208. }
  209. }
  210. }
  211. sets = append(sets, set)
  212. }
  213. if len(sets) > 1 {
  214. maxlen := 0
  215. maxind := -1
  216. for i, set := range sets {
  217. if len(set) > maxlen {
  218. maxlen = len(set)
  219. maxind = i
  220. }
  221. }
  222. for i, set := range sets {
  223. if i == maxind {
  224. continue
  225. }
  226. for _, h := range set {
  227. log.Printf("warning: dropped %s from the analysis - disjoint", h.String())
  228. delete(dag, h)
  229. delete(hashes, h.String())
  230. }
  231. }
  232. }
  233. }
  234. // bindOrderNodes returns curried "orderNodes" function.
  235. func bindOrderNodes(mergedDag map[plumbing.Hash][]*object.Commit) func(reverse bool) []string {
  236. return func(reverse bool) []string {
  237. graph := toposort.NewGraph()
  238. keys := make([]plumbing.Hash, 0, len(mergedDag))
  239. for key := range mergedDag {
  240. keys = append(keys, key)
  241. }
  242. sort.Slice(keys, func(i, j int) bool { return keys[i].String() < keys[j].String() })
  243. for _, key := range keys {
  244. graph.AddNode(key.String())
  245. }
  246. for _, key := range keys {
  247. children := mergedDag[key]
  248. sort.Slice(children, func(i, j int) bool {
  249. return children[i].Hash.String() < children[j].Hash.String()
  250. })
  251. for _, c := range children {
  252. graph.AddEdge(key.String(), c.Hash.String())
  253. }
  254. }
  255. order, ok := graph.Toposort()
  256. if !ok {
  257. // should never happen
  258. panic("Could not topologically sort the DAG of commits")
  259. }
  260. if reverse {
  261. // one day this must appear in the standard library...
  262. for i, j := 0, len(order)-1; i < len(order)/2; i, j = i+1, j-1 {
  263. order[i], order[j] = order[j], order[i]
  264. }
  265. }
  266. return order
  267. }
  268. }
  269. // mergeDag turns sequences of consecutive commits into single nodes.
  270. func mergeDag(
  271. numParents func(c *object.Commit) int,
  272. hashes map[string]*object.Commit,
  273. dag map[plumbing.Hash][]*object.Commit) (
  274. mergedDag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  275. parentOf := func(c *object.Commit) plumbing.Hash {
  276. var parent plumbing.Hash
  277. for _, p := range c.ParentHashes {
  278. if _, exists := hashes[p.String()]; exists {
  279. if parent != plumbing.ZeroHash {
  280. // more than one parent
  281. return plumbing.ZeroHash
  282. }
  283. parent = p
  284. }
  285. }
  286. return parent
  287. }
  288. mergedDag = map[plumbing.Hash][]*object.Commit{}
  289. mergedSeq = map[plumbing.Hash][]*object.Commit{}
  290. visited := map[plumbing.Hash]bool{}
  291. for ch := range dag {
  292. c := hashes[ch.String()]
  293. if visited[c.Hash] {
  294. continue
  295. }
  296. for true {
  297. parent := parentOf(c)
  298. if parent == plumbing.ZeroHash || len(dag[parent]) != 1 {
  299. break
  300. }
  301. c = hashes[parent.String()]
  302. }
  303. head := c
  304. var seq []*object.Commit
  305. children := dag[c.Hash]
  306. for true {
  307. visited[c.Hash] = true
  308. seq = append(seq, c)
  309. if len(children) != 1 {
  310. break
  311. }
  312. c = children[0]
  313. children = dag[c.Hash]
  314. if numParents(c) != 1 {
  315. break
  316. }
  317. }
  318. mergedSeq[head.Hash] = seq
  319. mergedDag[head.Hash] = dag[seq[len(seq)-1].Hash]
  320. }
  321. return
  322. }
  323. // collapseFastForwards removes the fast forward merges.
  324. func collapseFastForwards(
  325. orderNodes func(reverse bool) []string,
  326. hashes map[string]*object.Commit,
  327. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  328. for _, strkey := range orderNodes(true) {
  329. key := hashes[strkey].Hash
  330. vals, exists := mergedDag[key]
  331. if !exists {
  332. continue
  333. }
  334. if len(vals) == 2 {
  335. grand1 := mergedDag[vals[0].Hash]
  336. grand2 := mergedDag[vals[1].Hash]
  337. if len(grand2) == 1 && vals[0].Hash == grand2[0].Hash {
  338. mergedDag[key] = mergedDag[vals[0].Hash]
  339. dag[key] = vals[1:]
  340. delete(mergedDag, vals[0].Hash)
  341. delete(mergedDag, vals[1].Hash)
  342. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
  343. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
  344. delete(mergedSeq, vals[0].Hash)
  345. delete(mergedSeq, vals[1].Hash)
  346. }
  347. // symmetric
  348. if len(grand1) == 1 && vals[1].Hash == grand1[0].Hash {
  349. mergedDag[key] = mergedDag[vals[1].Hash]
  350. dag[key] = vals[:1]
  351. delete(mergedDag, vals[0].Hash)
  352. delete(mergedDag, vals[1].Hash)
  353. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
  354. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
  355. delete(mergedSeq, vals[0].Hash)
  356. delete(mergedSeq, vals[1].Hash)
  357. }
  358. }
  359. }
  360. }
  361. // generatePlan creates the list of actions from the commit DAG.
  362. func generatePlan(
  363. orderNodes func(reverse bool) []string,
  364. numParents func(c *object.Commit) int,
  365. hashes map[string]*object.Commit,
  366. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) []runAction {
  367. var plan []runAction
  368. branches := map[plumbing.Hash]int{}
  369. counter := 1
  370. for seqIndex, name := range orderNodes(false) {
  371. commit := hashes[name]
  372. if seqIndex == 0 {
  373. branches[commit.Hash] = 0
  374. }
  375. var branch int
  376. {
  377. var exists bool
  378. branch, exists = branches[commit.Hash]
  379. if !exists {
  380. branch = -1
  381. }
  382. }
  383. branchExists := func() bool { return branch >= 0 }
  384. appendCommit := func(c *object.Commit, branch int) {
  385. plan = append(plan, runAction{
  386. Action: runActionCommit,
  387. Commit: c,
  388. Items: []int{branch},
  389. })
  390. }
  391. appendMergeIfNeeded := func() {
  392. if numParents(commit) < 2 {
  393. return
  394. }
  395. // merge after the merge commit (the first in the sequence)
  396. var items []int
  397. minBranch := 1 << 31
  398. for _, parent := range commit.ParentHashes {
  399. if _, exists := hashes[parent.String()]; exists {
  400. parentBranch := branches[parent]
  401. if len(dag[parent]) == 1 && minBranch > parentBranch {
  402. minBranch = parentBranch
  403. }
  404. items = append(items, parentBranch)
  405. if parentBranch != branch {
  406. appendCommit(commit, parentBranch)
  407. }
  408. }
  409. }
  410. if minBranch < 1 << 31 {
  411. branch = minBranch
  412. branches[commit.Hash] = minBranch
  413. } else if !branchExists() {
  414. panic("!branchExists()")
  415. }
  416. plan = append(plan, runAction{
  417. Action: runActionMerge,
  418. Commit: nil,
  419. Items: items,
  420. })
  421. }
  422. if subseq, exists := mergedSeq[commit.Hash]; exists {
  423. for subseqIndex, offspring := range subseq {
  424. if branchExists() {
  425. appendCommit(offspring, branch)
  426. }
  427. if subseqIndex == 0 {
  428. appendMergeIfNeeded()
  429. }
  430. }
  431. branches[subseq[len(subseq)-1].Hash] = branch
  432. }
  433. if len(mergedDag[commit.Hash]) > 1 {
  434. branches[mergedDag[commit.Hash][0].Hash] = branch
  435. children := []int{branch}
  436. for i, child := range mergedDag[commit.Hash] {
  437. if i > 0 {
  438. branches[child.Hash] = counter
  439. children = append(children, counter)
  440. counter++
  441. }
  442. }
  443. plan = append(plan, runAction{
  444. Action: runActionFork,
  445. Commit: nil,
  446. Items: children,
  447. })
  448. }
  449. }
  450. return plan
  451. }
  452. // optimizePlan removes "dead" nodes and inserts `runActionDelete` disposal steps.
  453. //
  454. // | *
  455. // * /
  456. // |\/
  457. // |/
  458. // *
  459. //
  460. func optimizePlan(plan []runAction) []runAction {
  461. // lives maps branch index to the number of commits in that branch
  462. lives := map[int]int{}
  463. // lastMentioned maps branch index to the index inside `plan` when that branch was last used
  464. lastMentioned := map[int]int{}
  465. for i, p := range plan {
  466. firstItem := p.Items[0]
  467. switch p.Action {
  468. case runActionCommit:
  469. lives[firstItem]++
  470. lastMentioned[firstItem] = i
  471. case runActionFork:
  472. lastMentioned[firstItem] = i
  473. case runActionMerge:
  474. for _, item := range p.Items {
  475. lastMentioned[item] = i
  476. }
  477. }
  478. }
  479. branchesToDelete := map[int]bool{}
  480. for key, life := range lives {
  481. if life == 1 {
  482. branchesToDelete[key] = true
  483. delete(lastMentioned, key)
  484. }
  485. }
  486. var optimizedPlan []runAction
  487. lastMentionedArr := make([][2]int, 0, len(lastMentioned) + 1)
  488. for key, val := range lastMentioned {
  489. if val != len(plan) - 1 {
  490. lastMentionedArr = append(lastMentionedArr, [2]int{val, key})
  491. }
  492. }
  493. if len(lastMentionedArr) == 0 && len(branchesToDelete) == 0 {
  494. // early return - we have nothing to optimize
  495. return plan
  496. }
  497. sort.Slice(lastMentionedArr, func(i, j int) bool {
  498. return lastMentionedArr[i][0] < lastMentionedArr[j][0]
  499. })
  500. lastMentionedArr = append(lastMentionedArr, [2]int{len(plan)-1, -1})
  501. prevpi := -1
  502. for _, pair := range lastMentionedArr {
  503. for pi := prevpi + 1; pi <= pair[0]; pi++ {
  504. p := plan[pi]
  505. switch p.Action {
  506. case runActionCommit:
  507. if !branchesToDelete[p.Items[0]] {
  508. optimizedPlan = append(optimizedPlan, p)
  509. }
  510. case runActionFork:
  511. var newBranches []int
  512. for _, b := range p.Items {
  513. if !branchesToDelete[b] {
  514. newBranches = append(newBranches, b)
  515. }
  516. }
  517. if len(newBranches) > 1 {
  518. optimizedPlan = append(optimizedPlan, runAction{
  519. Action: runActionFork,
  520. Commit: p.Commit,
  521. Items: newBranches,
  522. })
  523. }
  524. case runActionMerge:
  525. var newBranches []int
  526. for _, b := range p.Items {
  527. if !branchesToDelete[b] {
  528. newBranches = append(newBranches, b)
  529. }
  530. }
  531. if len(newBranches) > 1 {
  532. optimizedPlan = append(optimizedPlan, runAction{
  533. Action: runActionMerge,
  534. Commit: p.Commit,
  535. Items: newBranches,
  536. })
  537. }
  538. }
  539. }
  540. if pair[1] >= 0 {
  541. prevpi = pair[0]
  542. optimizedPlan = append(optimizedPlan, runAction{
  543. Action: runActionDelete,
  544. Commit: nil,
  545. Items: []int{pair[1]},
  546. })
  547. }
  548. }
  549. // single commit can be detected as redundant
  550. if len(optimizedPlan) > 0 {
  551. return optimizedPlan
  552. }
  553. return plan
  554. // TODO(vmarkovtsev): there can be also duplicate redundant merges, e.g.
  555. /*
  556. 0 4e34f03d829fbacb71cde0e010de87ea945dc69a [3]
  557. 0 4e34f03d829fbacb71cde0e010de87ea945dc69a [12]
  558. 2 [3 12]
  559. 0 06716c2b39422938b77ddafa4d5c39bb9e4476da [3]
  560. 0 06716c2b39422938b77ddafa4d5c39bb9e4476da [12]
  561. 2 [3 12]
  562. 0 1219c7bf9e0e1a93459a052ab8b351bfc379dc19 [12]
  563. */
  564. }