forks.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. package core
  2. import (
  3. "log"
  4. "reflect"
  5. "sort"
  6. "gopkg.in/src-d/go-git.v4/plumbing"
  7. "gopkg.in/src-d/go-git.v4/plumbing/object"
  8. "gopkg.in/src-d/hercules.v6/internal/toposort"
  9. )
  10. // OneShotMergeProcessor provides the convenience method to consume merges only once.
  11. type OneShotMergeProcessor struct {
  12. merges map[plumbing.Hash]bool
  13. }
  14. // Initialize resets OneShotMergeProcessor.
  15. func (proc *OneShotMergeProcessor) Initialize() {
  16. proc.merges = map[plumbing.Hash]bool{}
  17. }
  18. // ShouldConsumeCommit returns true on regular commits. It also returns true upon
  19. // the first occurrence of a particular merge commit.
  20. func (proc *OneShotMergeProcessor) ShouldConsumeCommit(deps map[string]interface{}) bool {
  21. commit := deps[DependencyCommit].(*object.Commit)
  22. if commit.NumParents() <= 1 {
  23. return true
  24. }
  25. if !proc.merges[commit.Hash] {
  26. proc.merges[commit.Hash] = true
  27. return true
  28. }
  29. return false
  30. }
  31. // NoopMerger provides an empty Merge() method suitable for PipelineItem.
  32. type NoopMerger struct {
  33. }
  34. // Merge does nothing.
  35. func (merger *NoopMerger) Merge(branches []PipelineItem) {
  36. // no-op
  37. }
  38. // ForkSamePipelineItem clones items by referencing the same origin.
  39. func ForkSamePipelineItem(origin PipelineItem, n int) []PipelineItem {
  40. clones := make([]PipelineItem, n)
  41. for i := 0; i < n; i++ {
  42. clones[i] = origin
  43. }
  44. return clones
  45. }
  46. // ForkCopyPipelineItem clones items by copying them by value from the origin.
  47. func ForkCopyPipelineItem(origin PipelineItem, n int) []PipelineItem {
  48. originValue := reflect.Indirect(reflect.ValueOf(origin))
  49. originType := originValue.Type()
  50. clones := make([]PipelineItem, n)
  51. for i := 0; i < n; i++ {
  52. cloneValue := reflect.New(originType).Elem()
  53. cloneValue.Set(originValue)
  54. clones[i] = cloneValue.Addr().Interface().(PipelineItem)
  55. }
  56. return clones
  57. }
  58. const (
  59. // runActionCommit corresponds to a regular commit
  60. runActionCommit = 0
  61. // runActionFork splits a branch into several parts
  62. runActionFork = iota
  63. // runActionMerge merges several branches together
  64. runActionMerge = iota
  65. // runActionEmerge starts a root branch
  66. runActionEmerge = iota
  67. // runActionDelete removes the branch as it is no longer needed
  68. runActionDelete = iota
  69. // rootBranchIndex is the minimum branch index in the plan
  70. rootBranchIndex = 1
  71. )
  72. type runAction struct {
  73. Action int
  74. Commit *object.Commit
  75. Items []int
  76. }
  77. type orderer = func(reverse, direction bool) []string
  78. func cloneItems(origin []PipelineItem, n int) [][]PipelineItem {
  79. clones := make([][]PipelineItem, n)
  80. for j := 0; j < n; j++ {
  81. clones[j] = make([]PipelineItem, len(origin))
  82. }
  83. for i, item := range origin {
  84. itemClones := item.Fork(n)
  85. for j := 0; j < n; j++ {
  86. clones[j][i] = itemClones[j]
  87. }
  88. }
  89. return clones
  90. }
  91. func mergeItems(branches [][]PipelineItem) {
  92. buffer := make([]PipelineItem, len(branches)-1)
  93. for i, item := range branches[0] {
  94. for j := 0; j < len(branches)-1; j++ {
  95. buffer[j] = branches[j+1][i]
  96. }
  97. item.Merge(buffer)
  98. }
  99. }
  100. // getMasterBranch returns the branch with the smallest index.
  101. func getMasterBranch(branches map[int][]PipelineItem) []PipelineItem {
  102. minKey := 1 << 31
  103. var minVal []PipelineItem
  104. for key, val := range branches {
  105. if key < minKey {
  106. minKey = key
  107. minVal = val
  108. }
  109. }
  110. return minVal
  111. }
  112. // prepareRunPlan schedules the actions for Pipeline.Run().
  113. func prepareRunPlan(commits []*object.Commit) []runAction {
  114. hashes, dag := buildDag(commits)
  115. leaveRootComponent(hashes, dag)
  116. mergedDag, mergedSeq := mergeDag(hashes, dag)
  117. orderNodes := bindOrderNodes(mergedDag)
  118. collapseFastForwards(orderNodes, hashes, mergedDag, dag, mergedSeq)
  119. /*fmt.Printf("digraph Hercules {\n")
  120. for i, c := range orderNodes(false, false) {
  121. commit := hashes[c]
  122. fmt.Printf(" \"%s\"[label=\"[%d] %s\"]\n", commit.Hash.String(), i, commit.Hash.String()[:6])
  123. for _, child := range mergedDag[commit.Hash] {
  124. fmt.Printf(" \"%s\" -> \"%s\"\n", commit.Hash.String(), child.Hash.String())
  125. }
  126. }
  127. fmt.Printf("}\n")*/
  128. plan := generatePlan(orderNodes, hashes, mergedDag, dag, mergedSeq)
  129. plan = collectGarbage(plan)
  130. /*for _, p := range plan {
  131. firstItem := p.Items[0]
  132. switch p.Action {
  133. case runActionCommit:
  134. fmt.Fprintln(os.Stderr, "C", firstItem, p.Commit.Hash.String())
  135. case runActionFork:
  136. fmt.Fprintln(os.Stderr, "F", p.Items)
  137. case runActionMerge:
  138. fmt.Fprintln(os.Stderr, "M", p.Items)
  139. }
  140. }*/
  141. return plan
  142. }
  143. // buildDag generates the raw commit DAG and the commit hash map.
  144. func buildDag(commits []*object.Commit) (
  145. map[string]*object.Commit, map[plumbing.Hash][]*object.Commit) {
  146. hashes := map[string]*object.Commit{}
  147. for _, commit := range commits {
  148. hashes[commit.Hash.String()] = commit
  149. }
  150. dag := map[plumbing.Hash][]*object.Commit{}
  151. for _, commit := range commits {
  152. if _, exists := dag[commit.Hash]; !exists {
  153. dag[commit.Hash] = make([]*object.Commit, 0, 1)
  154. }
  155. for _, parent := range commit.ParentHashes {
  156. if _, exists := hashes[parent.String()]; !exists {
  157. continue
  158. }
  159. children := dag[parent]
  160. if children == nil {
  161. children = make([]*object.Commit, 0, 1)
  162. }
  163. dag[parent] = append(children, commit)
  164. }
  165. }
  166. return hashes, dag
  167. }
  168. // leaveRootComponent runs connected components analysis and throws away everything
  169. // but the part which grows from the root.
  170. func leaveRootComponent(
  171. hashes map[string]*object.Commit,
  172. dag map[plumbing.Hash][]*object.Commit) {
  173. visited := map[plumbing.Hash]bool{}
  174. var sets [][]plumbing.Hash
  175. for key := range dag {
  176. if visited[key] {
  177. continue
  178. }
  179. var set []plumbing.Hash
  180. for queue := []plumbing.Hash{key}; len(queue) > 0; {
  181. head := queue[len(queue)-1]
  182. queue = queue[:len(queue)-1]
  183. if visited[head] {
  184. continue
  185. }
  186. set = append(set, head)
  187. visited[head] = true
  188. for _, c := range dag[head] {
  189. if !visited[c.Hash] {
  190. queue = append(queue, c.Hash)
  191. }
  192. }
  193. if commit, exists := hashes[head.String()]; exists {
  194. for _, p := range commit.ParentHashes {
  195. if !visited[p] {
  196. if _, exists := hashes[p.String()]; exists {
  197. queue = append(queue, p)
  198. }
  199. }
  200. }
  201. }
  202. }
  203. sets = append(sets, set)
  204. }
  205. if len(sets) > 1 {
  206. maxlen := 0
  207. maxind := -1
  208. for i, set := range sets {
  209. if len(set) > maxlen {
  210. maxlen = len(set)
  211. maxind = i
  212. }
  213. }
  214. for i, set := range sets {
  215. if i == maxind {
  216. continue
  217. }
  218. for _, h := range set {
  219. log.Printf("warning: dropped %s from the analysis - disjoint", h.String())
  220. delete(dag, h)
  221. delete(hashes, h.String())
  222. }
  223. }
  224. }
  225. }
  226. // bindOrderNodes returns curried "orderNodes" function.
  227. func bindOrderNodes(mergedDag map[plumbing.Hash][]*object.Commit) orderer {
  228. return func(reverse, direction bool) []string {
  229. graph := toposort.NewGraph()
  230. keys := make([]plumbing.Hash, 0, len(mergedDag))
  231. for key := range mergedDag {
  232. keys = append(keys, key)
  233. }
  234. sort.Slice(keys, func(i, j int) bool { return keys[i].String() < keys[j].String() })
  235. for _, key := range keys {
  236. graph.AddNode(key.String())
  237. }
  238. for _, key := range keys {
  239. children := mergedDag[key]
  240. sort.Slice(children, func(i, j int) bool {
  241. return children[i].Hash.String() < children[j].Hash.String()
  242. })
  243. for _, c := range children {
  244. if !direction {
  245. graph.AddEdge(key.String(), c.Hash.String())
  246. } else {
  247. graph.AddEdge(c.Hash.String(), key.String())
  248. }
  249. }
  250. }
  251. order, ok := graph.Toposort()
  252. if !ok {
  253. // should never happen
  254. panic("Could not topologically sort the DAG of commits")
  255. }
  256. if reverse != direction {
  257. // one day this must appear in the standard library...
  258. for i, j := 0, len(order)-1; i < len(order)/2; i, j = i+1, j-1 {
  259. order[i], order[j] = order[j], order[i]
  260. }
  261. }
  262. return order
  263. }
  264. }
  265. // inverts `dag`
  266. func buildParents(dag map[plumbing.Hash][]*object.Commit) map[plumbing.Hash]map[plumbing.Hash]bool {
  267. parents := map[plumbing.Hash]map[plumbing.Hash]bool{}
  268. for key, vals := range dag {
  269. for _, val := range vals {
  270. myps := parents[val.Hash]
  271. if myps == nil {
  272. myps = map[plumbing.Hash]bool{}
  273. parents[val.Hash] = myps
  274. }
  275. myps[key] = true
  276. }
  277. }
  278. return parents
  279. }
  280. // mergeDag turns sequences of consecutive commits into single nodes.
  281. func mergeDag(
  282. hashes map[string]*object.Commit,
  283. dag map[plumbing.Hash][]*object.Commit) (
  284. mergedDag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  285. parents := buildParents(dag)
  286. mergedDag = map[plumbing.Hash][]*object.Commit{}
  287. mergedSeq = map[plumbing.Hash][]*object.Commit{}
  288. visited := map[plumbing.Hash]bool{}
  289. for head := range dag {
  290. if visited[head] {
  291. continue
  292. }
  293. c := head
  294. for true {
  295. nextParents := parents[c]
  296. var next plumbing.Hash
  297. for p := range nextParents {
  298. next = p
  299. break
  300. }
  301. if len(nextParents) != 1 || len(dag[next]) != 1 {
  302. break
  303. }
  304. c = next
  305. }
  306. head = c
  307. var seq []*object.Commit
  308. for true {
  309. visited[c] = true
  310. seq = append(seq, hashes[c.String()])
  311. if len(dag[c]) != 1 {
  312. break
  313. }
  314. c = dag[c][0].Hash
  315. if len(parents[c]) != 1 {
  316. break
  317. }
  318. }
  319. mergedSeq[head] = seq
  320. mergedDag[head] = dag[seq[len(seq)-1].Hash]
  321. }
  322. return
  323. }
  324. // collapseFastForwards removes the fast forward merges.
  325. func collapseFastForwards(
  326. orderNodes orderer, hashes map[string]*object.Commit,
  327. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  328. parents := buildParents(mergedDag)
  329. processed := map[plumbing.Hash]bool{}
  330. for _, strkey := range orderNodes(false, true) {
  331. key := hashes[strkey].Hash
  332. processed[key] = true
  333. repeat:
  334. vals, exists := mergedDag[key]
  335. if !exists {
  336. continue
  337. }
  338. if len(vals) < 2 {
  339. continue
  340. }
  341. toRemove := map[plumbing.Hash]bool{}
  342. sort.Slice(vals, func(i, j int) bool { return vals[i].Hash.String() < vals[j].Hash.String() })
  343. for _, child := range vals {
  344. var queue []plumbing.Hash
  345. visited := map[plumbing.Hash]bool{child.Hash: true}
  346. childParents := parents[child.Hash]
  347. childNumOtherParents := 0
  348. for parent := range childParents {
  349. if parent != key {
  350. visited[parent] = true
  351. childNumOtherParents++
  352. queue = append(queue, parent)
  353. }
  354. }
  355. var immediateParent plumbing.Hash
  356. if childNumOtherParents == 1 {
  357. immediateParent = queue[0]
  358. }
  359. for len(queue) > 0 {
  360. head := queue[len(queue)-1]
  361. queue = queue[:len(queue)-1]
  362. if processed[head] {
  363. if head == key {
  364. toRemove[child.Hash] = true
  365. if childNumOtherParents == 1 && len(mergedDag[immediateParent]) == 1 {
  366. mergedSeq[immediateParent] = append(
  367. mergedSeq[immediateParent], mergedSeq[child.Hash]...)
  368. delete(mergedSeq, child.Hash)
  369. mergedDag[immediateParent] = mergedDag[child.Hash]
  370. delete(mergedDag, child.Hash)
  371. parents[child.Hash] = parents[immediateParent]
  372. for _, vals := range parents {
  373. for v := range vals {
  374. if v == child.Hash {
  375. delete(vals, v)
  376. vals[immediateParent] = true
  377. break
  378. }
  379. }
  380. }
  381. }
  382. break
  383. }
  384. } else {
  385. for parent := range parents[head] {
  386. if !visited[parent] {
  387. visited[head] = true
  388. queue = append(queue, parent)
  389. }
  390. }
  391. }
  392. }
  393. }
  394. if len(toRemove) == 0 {
  395. continue
  396. }
  397. // update dag
  398. var newVals []*object.Commit
  399. node := mergedSeq[key][len(mergedSeq[key])-1].Hash
  400. for _, child := range dag[node] {
  401. if !toRemove[child.Hash] {
  402. newVals = append(newVals, child)
  403. }
  404. }
  405. dag[node] = newVals
  406. // update mergedDag
  407. newVals = []*object.Commit{}
  408. for _, child := range vals {
  409. if !toRemove[child.Hash] {
  410. newVals = append(newVals, child)
  411. }
  412. }
  413. merged := false
  414. if len(newVals) == 1 {
  415. onlyChild := newVals[0].Hash
  416. if len(parents[onlyChild]) == 1 {
  417. merged = true
  418. mergedSeq[key] = append(mergedSeq[key], mergedSeq[onlyChild]...)
  419. delete(mergedSeq, onlyChild)
  420. mergedDag[key] = mergedDag[onlyChild]
  421. delete(mergedDag, onlyChild)
  422. parents[onlyChild] = parents[key]
  423. for _, vals := range parents {
  424. for v := range vals {
  425. if v == onlyChild {
  426. delete(vals, v)
  427. vals[key] = true
  428. break
  429. }
  430. }
  431. }
  432. }
  433. }
  434. // update parents
  435. for rm := range toRemove {
  436. delete(parents[rm], key)
  437. }
  438. if !merged {
  439. mergedDag[key] = newVals
  440. } else {
  441. goto repeat
  442. }
  443. }
  444. }
  445. // generatePlan creates the list of actions from the commit DAG.
  446. func generatePlan(
  447. orderNodes orderer, hashes map[string]*object.Commit,
  448. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) []runAction {
  449. parents := buildParents(dag)
  450. var plan []runAction
  451. branches := map[plumbing.Hash]int{}
  452. branchers := map[plumbing.Hash]map[plumbing.Hash]int{}
  453. counter := rootBranchIndex
  454. for _, name := range orderNodes(false, true) {
  455. commit := hashes[name]
  456. if len(parents[commit.Hash]) == 0 {
  457. branches[commit.Hash] = counter
  458. plan = append(plan, runAction{
  459. Action: runActionEmerge,
  460. Commit: commit,
  461. Items: []int{counter},
  462. })
  463. counter++
  464. }
  465. var branch int
  466. {
  467. var exists bool
  468. branch, exists = branches[commit.Hash]
  469. if !exists {
  470. branch = -1
  471. }
  472. }
  473. branchExists := func() bool { return branch >= rootBranchIndex }
  474. appendCommit := func(c *object.Commit, branch int) {
  475. if branch == 0 {
  476. log.Panicf("setting a zero branch for %s", c.Hash.String())
  477. }
  478. plan = append(plan, runAction{
  479. Action: runActionCommit,
  480. Commit: c,
  481. Items: []int{branch},
  482. })
  483. }
  484. appendMergeIfNeeded := func() bool {
  485. if len(parents[commit.Hash]) < 2 {
  486. return false
  487. }
  488. // merge after the merge commit (the first in the sequence)
  489. var items []int
  490. minBranch := 1 << 31
  491. for parent := range parents[commit.Hash] {
  492. parentBranch := -1
  493. if parents, exists := branchers[commit.Hash]; exists {
  494. if inheritedBranch, exists := parents[parent]; exists {
  495. parentBranch = inheritedBranch
  496. }
  497. }
  498. if parentBranch == -1 {
  499. parentBranch = branches[parent]
  500. if parentBranch < rootBranchIndex {
  501. log.Panicf("parent %s > %s does not have a branch assigned",
  502. parent.String(), commit.Hash.String())
  503. }
  504. }
  505. if len(dag[parent]) == 1 && minBranch > parentBranch {
  506. minBranch = parentBranch
  507. }
  508. items = append(items, parentBranch)
  509. if parentBranch != branch {
  510. appendCommit(commit, parentBranch)
  511. }
  512. }
  513. // there should be no duplicates in items
  514. if minBranch < 1<<31 {
  515. branch = minBranch
  516. branches[commit.Hash] = minBranch
  517. } else if !branchExists() {
  518. log.Panicf("failed to assign the branch to merge %s", commit.Hash.String())
  519. }
  520. plan = append(plan, runAction{
  521. Action: runActionMerge,
  522. Commit: nil,
  523. Items: items,
  524. })
  525. return true
  526. }
  527. var head plumbing.Hash
  528. if subseq, exists := mergedSeq[commit.Hash]; exists {
  529. for subseqIndex, offspring := range subseq {
  530. if branchExists() {
  531. appendCommit(offspring, branch)
  532. }
  533. if subseqIndex == 0 {
  534. if !appendMergeIfNeeded() && !branchExists() {
  535. log.Panicf("head of the sequence does not have an assigned branch: %s",
  536. commit.Hash.String())
  537. }
  538. }
  539. }
  540. head = subseq[len(subseq)-1].Hash
  541. branches[head] = branch
  542. } else {
  543. head = commit.Hash
  544. }
  545. if len(mergedDag[commit.Hash]) > 1 {
  546. children := []int{branch}
  547. for i, child := range mergedDag[commit.Hash] {
  548. if i == 0 {
  549. branches[child.Hash] = branch
  550. continue
  551. }
  552. if _, exists := branches[child.Hash]; !exists {
  553. branches[child.Hash] = counter
  554. }
  555. parents := branchers[child.Hash]
  556. if parents == nil {
  557. parents = map[plumbing.Hash]int{}
  558. branchers[child.Hash] = parents
  559. }
  560. parents[head] = counter
  561. children = append(children, counter)
  562. counter++
  563. }
  564. plan = append(plan, runAction{
  565. Action: runActionFork,
  566. Commit: hashes[head.String()],
  567. Items: children,
  568. })
  569. }
  570. }
  571. return plan
  572. }
  573. // collectGarbage inserts `runActionDelete` disposal steps.
  574. func collectGarbage(plan []runAction) []runAction {
  575. // lastMentioned maps branch index to the index inside `plan` when that branch was last used
  576. lastMentioned := map[int]int{}
  577. for i, p := range plan {
  578. firstItem := p.Items[0]
  579. switch p.Action {
  580. case runActionCommit:
  581. lastMentioned[firstItem] = i
  582. if firstItem < rootBranchIndex {
  583. log.Panicf("commit %s does not have an assigned branch",
  584. p.Commit.Hash.String())
  585. }
  586. case runActionFork:
  587. lastMentioned[firstItem] = i
  588. case runActionMerge:
  589. for _, item := range p.Items {
  590. lastMentioned[item] = i
  591. }
  592. case runActionEmerge:
  593. lastMentioned[firstItem] = i
  594. }
  595. }
  596. var garbageCollectedPlan []runAction
  597. lastMentionedArr := make([][2]int, 0, len(lastMentioned)+1)
  598. for key, val := range lastMentioned {
  599. if val != len(plan)-1 {
  600. lastMentionedArr = append(lastMentionedArr, [2]int{val, key})
  601. }
  602. }
  603. if len(lastMentionedArr) == 0 {
  604. // early return - we have nothing to collect
  605. return plan
  606. }
  607. sort.Slice(lastMentionedArr, func(i, j int) bool {
  608. return lastMentionedArr[i][0] < lastMentionedArr[j][0]
  609. })
  610. lastMentionedArr = append(lastMentionedArr, [2]int{len(plan) - 1, -1})
  611. prevpi := -1
  612. for _, pair := range lastMentionedArr {
  613. for pi := prevpi + 1; pi <= pair[0]; pi++ {
  614. garbageCollectedPlan = append(garbageCollectedPlan, plan[pi])
  615. }
  616. if pair[1] >= 0 {
  617. prevpi = pair[0]
  618. garbageCollectedPlan = append(garbageCollectedPlan, runAction{
  619. Action: runActionDelete,
  620. Commit: nil,
  621. Items: []int{pair[1]},
  622. })
  623. }
  624. }
  625. return garbageCollectedPlan
  626. }