forks.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. package core
  2. import (
  3. "log"
  4. "reflect"
  5. "sort"
  6. "gopkg.in/src-d/go-git.v4/plumbing/object"
  7. "gopkg.in/src-d/go-git.v4/plumbing"
  8. "gopkg.in/src-d/hercules.v4/internal/toposort"
  9. )
  10. // OneShotMergeProcessor provides the convenience method to consume merges only once.
  11. type OneShotMergeProcessor struct {
  12. merges map[plumbing.Hash]bool
  13. }
  14. // Initialize resets OneShotMergeProcessor.
  15. func (proc *OneShotMergeProcessor) Initialize() {
  16. proc.merges = map[plumbing.Hash]bool{}
  17. }
  18. // ShouldConsumeCommit returns true on regular commits. It also returns true upon
  19. // the first occurrence of a particular merge commit.
  20. func (proc *OneShotMergeProcessor) ShouldConsumeCommit(deps map[string]interface{}) bool {
  21. commit := deps[DependencyCommit].(*object.Commit)
  22. if commit.NumParents() <= 1 {
  23. return true
  24. }
  25. if !proc.merges[commit.Hash] {
  26. proc.merges[commit.Hash] = true
  27. return true
  28. }
  29. return false
  30. }
  31. // NoopMerger provides an empty Merge() method suitable for PipelineItem.
  32. type NoopMerger struct {
  33. }
  34. // Merge does nothing.
  35. func (merger *NoopMerger) Merge(branches []PipelineItem) {
  36. // no-op
  37. }
  38. // ForkSamePipelineItem clones items by referencing the same origin.
  39. func ForkSamePipelineItem(origin PipelineItem, n int) []PipelineItem {
  40. clones := make([]PipelineItem, n)
  41. for i := 0; i < n; i++ {
  42. clones[i] = origin
  43. }
  44. return clones
  45. }
  46. // ForkCopyPipelineItem clones items by copying them by value from the origin.
  47. func ForkCopyPipelineItem(origin PipelineItem, n int) []PipelineItem {
  48. originValue := reflect.Indirect(reflect.ValueOf(origin))
  49. originType := originValue.Type()
  50. clones := make([]PipelineItem, n)
  51. for i := 0; i < n; i++ {
  52. cloneValue := reflect.New(originType).Elem()
  53. cloneValue.Set(originValue)
  54. clones[i] = cloneValue.Addr().Interface().(PipelineItem)
  55. }
  56. return clones
  57. }
  58. const (
  59. // runActionCommit corresponds to a regular commit
  60. runActionCommit = 0
  61. // runActionFork splits a branch into several parts
  62. runActionFork = iota
  63. // runActionMerge merges several branches together
  64. runActionMerge = iota
  65. // runActionDelete removes the branch as it is no longer needed
  66. runActionDelete = iota
  67. )
  68. type runAction struct {
  69. Action int
  70. Commit *object.Commit
  71. Items []int
  72. }
  73. func cloneItems(origin []PipelineItem, n int) [][]PipelineItem {
  74. clones := make([][]PipelineItem, n)
  75. for j := 0; j < n; j++ {
  76. clones[j] = make([]PipelineItem, len(origin))
  77. }
  78. for i, item := range origin {
  79. itemClones := item.Fork(n)
  80. for j := 0; j < n; j++ {
  81. clones[j][i] = itemClones[j]
  82. }
  83. }
  84. return clones
  85. }
  86. func mergeItems(branches [][]PipelineItem) {
  87. buffer := make([]PipelineItem, len(branches) - 1)
  88. for i, item := range branches[0] {
  89. for j := 0; j < len(branches)-1; j++ {
  90. buffer[j] = branches[j+1][i]
  91. }
  92. item.Merge(buffer)
  93. }
  94. }
  95. // getMasterBranch returns the branch with the smallest index.
  96. func getMasterBranch(branches map[int][]PipelineItem) []PipelineItem {
  97. minKey := 1 << 31
  98. var minVal []PipelineItem
  99. for key, val := range branches {
  100. if key < minKey {
  101. minKey = key
  102. minVal = val
  103. }
  104. }
  105. return minVal
  106. }
  107. // prepareRunPlan schedules the actions for Pipeline.Run().
  108. func prepareRunPlan(commits []*object.Commit) []runAction {
  109. hashes, dag := buildDag(commits)
  110. leaveRootComponent(hashes, dag)
  111. numParents := bindNumParents(hashes, dag)
  112. mergedDag, mergedSeq := mergeDag(numParents, hashes, dag)
  113. orderNodes := bindOrderNodes(mergedDag)
  114. collapseFastForwards(orderNodes, hashes, mergedDag, dag, mergedSeq)
  115. /*fmt.Printf("digraph Hercules {\n")
  116. for i, c := range order {
  117. commit := hashes[c]
  118. fmt.Printf(" \"%s\"[label=\"[%d] %s\"]\n", commit.Hash.String(), i, commit.Hash.String()[:6])
  119. for _, child := range mergedDag[commit.Hash] {
  120. fmt.Printf(" \"%s\" -> \"%s\"\n", commit.Hash.String(), child.Hash.String())
  121. }
  122. }
  123. fmt.Printf("}\n")*/
  124. plan := generatePlan(orderNodes, numParents, hashes, mergedDag, dag, mergedSeq)
  125. plan = optimizePlan(plan)
  126. return plan
  127. }
  128. // buildDag generates the raw commit DAG and the commit hash map.
  129. func buildDag(commits []*object.Commit) (
  130. map[string]*object.Commit, map[plumbing.Hash][]*object.Commit) {
  131. hashes := map[string]*object.Commit{}
  132. for _, commit := range commits {
  133. hashes[commit.Hash.String()] = commit
  134. }
  135. dag := map[plumbing.Hash][]*object.Commit{}
  136. for _, commit := range commits {
  137. if _, exists := dag[commit.Hash]; !exists {
  138. dag[commit.Hash] = make([]*object.Commit, 0, 1)
  139. }
  140. for _, parent := range commit.ParentHashes {
  141. if _, exists := hashes[parent.String()]; !exists {
  142. continue
  143. }
  144. children := dag[parent]
  145. if children == nil {
  146. children = make([]*object.Commit, 0, 1)
  147. }
  148. dag[parent] = append(children, commit)
  149. }
  150. }
  151. return hashes, dag
  152. }
  153. // bindNumParents returns curried "numParents" function.
  154. func bindNumParents(
  155. hashes map[string]*object.Commit,
  156. dag map[plumbing.Hash][]*object.Commit) func(c *object.Commit) int {
  157. return func(c *object.Commit) int {
  158. r := 0
  159. for _, parent := range c.ParentHashes {
  160. if p, exists := hashes[parent.String()]; exists {
  161. for _, pc := range dag[p.Hash] {
  162. if pc.Hash == c.Hash {
  163. r++
  164. break
  165. }
  166. }
  167. }
  168. }
  169. return r
  170. }
  171. }
  172. // leaveRootComponent runs connected components analysis and throws away everything
  173. // but the part which grows from the root.
  174. func leaveRootComponent(
  175. hashes map[string]*object.Commit,
  176. dag map[plumbing.Hash][]*object.Commit) {
  177. visited := map[plumbing.Hash]bool{}
  178. var sets [][]plumbing.Hash
  179. for key := range dag {
  180. if visited[key] {
  181. continue
  182. }
  183. var set []plumbing.Hash
  184. for queue := []plumbing.Hash{key}; len(queue) > 0; {
  185. head := queue[len(queue)-1]
  186. queue = queue[:len(queue)-1]
  187. if visited[head] {
  188. continue
  189. }
  190. set = append(set, head)
  191. visited[head] = true
  192. for _, c := range dag[head] {
  193. if !visited[c.Hash] {
  194. queue = append(queue, c.Hash)
  195. }
  196. }
  197. if commit, exists := hashes[head.String()]; exists {
  198. for _, p := range commit.ParentHashes {
  199. if !visited[p] {
  200. if _, exists := hashes[p.String()]; exists {
  201. queue = append(queue, p)
  202. }
  203. }
  204. }
  205. }
  206. }
  207. sets = append(sets, set)
  208. }
  209. if len(sets) > 1 {
  210. maxlen := 0
  211. maxind := -1
  212. for i, set := range sets {
  213. if len(set) > maxlen {
  214. maxlen = len(set)
  215. maxind = i
  216. }
  217. }
  218. for i, set := range sets {
  219. if i == maxind {
  220. continue
  221. }
  222. for _, h := range set {
  223. log.Printf("warning: dropped %s from the analysis - disjoint", h.String())
  224. delete(dag, h)
  225. delete(hashes, h.String())
  226. }
  227. }
  228. }
  229. }
  230. // bindOrderNodes returns curried "orderNodes" function.
  231. func bindOrderNodes(mergedDag map[plumbing.Hash][]*object.Commit) func(reverse bool) []string {
  232. return func(reverse bool) []string {
  233. graph := toposort.NewGraph()
  234. keys := make([]plumbing.Hash, 0, len(mergedDag))
  235. for key := range mergedDag {
  236. keys = append(keys, key)
  237. }
  238. sort.Slice(keys, func(i, j int) bool { return keys[i].String() < keys[j].String() })
  239. for _, key := range keys {
  240. graph.AddNode(key.String())
  241. }
  242. for _, key := range keys {
  243. children := mergedDag[key]
  244. sort.Slice(children, func(i, j int) bool {
  245. return children[i].Hash.String() < children[j].Hash.String()
  246. })
  247. for _, c := range children {
  248. graph.AddEdge(key.String(), c.Hash.String())
  249. }
  250. }
  251. order, ok := graph.Toposort()
  252. if !ok {
  253. // should never happen
  254. panic("Could not topologically sort the DAG of commits")
  255. }
  256. if reverse {
  257. // one day this must appear in the standard library...
  258. for i, j := 0, len(order)-1; i < len(order)/2; i, j = i+1, j-1 {
  259. order[i], order[j] = order[j], order[i]
  260. }
  261. }
  262. return order
  263. }
  264. }
  265. // mergeDag turns sequences of consecutive commits into single nodes.
  266. func mergeDag(
  267. numParents func(c *object.Commit) int,
  268. hashes map[string]*object.Commit,
  269. dag map[plumbing.Hash][]*object.Commit) (
  270. mergedDag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  271. parentOf := func(c *object.Commit) plumbing.Hash {
  272. var parent plumbing.Hash
  273. for _, p := range c.ParentHashes {
  274. if _, exists := hashes[p.String()]; exists {
  275. if parent != plumbing.ZeroHash {
  276. // more than one parent
  277. return plumbing.ZeroHash
  278. }
  279. parent = p
  280. }
  281. }
  282. return parent
  283. }
  284. mergedDag = map[plumbing.Hash][]*object.Commit{}
  285. mergedSeq = map[plumbing.Hash][]*object.Commit{}
  286. visited := map[plumbing.Hash]bool{}
  287. for ch := range dag {
  288. c := hashes[ch.String()]
  289. if visited[c.Hash] {
  290. continue
  291. }
  292. for true {
  293. parent := parentOf(c)
  294. if parent == plumbing.ZeroHash || len(dag[parent]) != 1 {
  295. break
  296. }
  297. c = hashes[parent.String()]
  298. }
  299. head := c
  300. var seq []*object.Commit
  301. children := dag[c.Hash]
  302. for true {
  303. visited[c.Hash] = true
  304. seq = append(seq, c)
  305. if len(children) != 1 {
  306. break
  307. }
  308. c = children[0]
  309. children = dag[c.Hash]
  310. if numParents(c) != 1 {
  311. break
  312. }
  313. }
  314. mergedSeq[head.Hash] = seq
  315. mergedDag[head.Hash] = dag[seq[len(seq)-1].Hash]
  316. }
  317. return
  318. }
  319. // collapseFastForwards removes the fast forward merges.
  320. func collapseFastForwards(
  321. orderNodes func(reverse bool) []string,
  322. hashes map[string]*object.Commit,
  323. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  324. for _, strkey := range orderNodes(true) {
  325. key := hashes[strkey].Hash
  326. vals, exists := mergedDag[key]
  327. if !exists {
  328. continue
  329. }
  330. if len(vals) == 2 {
  331. grand1 := mergedDag[vals[0].Hash]
  332. grand2 := mergedDag[vals[1].Hash]
  333. if len(grand2) == 1 && vals[0].Hash == grand2[0].Hash {
  334. mergedDag[key] = mergedDag[vals[0].Hash]
  335. dag[key] = vals[1:]
  336. delete(mergedDag, vals[0].Hash)
  337. delete(mergedDag, vals[1].Hash)
  338. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
  339. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
  340. delete(mergedSeq, vals[0].Hash)
  341. delete(mergedSeq, vals[1].Hash)
  342. }
  343. // symmetric
  344. if len(grand1) == 1 && vals[1].Hash == grand1[0].Hash {
  345. mergedDag[key] = mergedDag[vals[1].Hash]
  346. dag[key] = vals[:1]
  347. delete(mergedDag, vals[0].Hash)
  348. delete(mergedDag, vals[1].Hash)
  349. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
  350. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
  351. delete(mergedSeq, vals[0].Hash)
  352. delete(mergedSeq, vals[1].Hash)
  353. }
  354. }
  355. }
  356. }
  357. // generatePlan creates the list of actions from the commit DAG.
  358. func generatePlan(
  359. orderNodes func(reverse bool) []string,
  360. numParents func(c *object.Commit) int,
  361. hashes map[string]*object.Commit,
  362. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) []runAction {
  363. var plan []runAction
  364. branches := map[plumbing.Hash]int{}
  365. counter := 1
  366. for seqIndex, name := range orderNodes(false) {
  367. commit := hashes[name]
  368. if seqIndex == 0 {
  369. branches[commit.Hash] = 0
  370. }
  371. var branch int
  372. {
  373. var exists bool
  374. branch, exists = branches[commit.Hash]
  375. if !exists {
  376. branch = -1
  377. }
  378. }
  379. branchExists := func() bool { return branch >= 0 }
  380. appendCommit := func(c *object.Commit, branch int) {
  381. plan = append(plan, runAction{
  382. Action: runActionCommit,
  383. Commit: c,
  384. Items: []int{branch},
  385. })
  386. }
  387. appendMergeIfNeeded := func() {
  388. if numParents(commit) < 2 {
  389. return
  390. }
  391. // merge after the merge commit (the first in the sequence)
  392. var items []int
  393. minBranch := 1 << 31
  394. for _, parent := range commit.ParentHashes {
  395. if _, exists := hashes[parent.String()]; exists {
  396. parentBranch := branches[parent]
  397. if len(dag[parent]) == 1 && minBranch > parentBranch {
  398. minBranch = parentBranch
  399. }
  400. items = append(items, parentBranch)
  401. if parentBranch != branch {
  402. appendCommit(commit, parentBranch)
  403. }
  404. }
  405. }
  406. if minBranch < 1 << 31 {
  407. branch = minBranch
  408. branches[commit.Hash] = minBranch
  409. } else if !branchExists() {
  410. panic("!branchExists()")
  411. }
  412. plan = append(plan, runAction{
  413. Action: runActionMerge,
  414. Commit: nil,
  415. Items: items,
  416. })
  417. }
  418. if subseq, exists := mergedSeq[commit.Hash]; exists {
  419. for subseqIndex, offspring := range subseq {
  420. if branchExists() {
  421. appendCommit(offspring, branch)
  422. }
  423. if subseqIndex == 0 {
  424. appendMergeIfNeeded()
  425. }
  426. }
  427. branches[subseq[len(subseq)-1].Hash] = branch
  428. }
  429. if len(mergedDag[commit.Hash]) > 1 {
  430. branches[mergedDag[commit.Hash][0].Hash] = branch
  431. children := []int{branch}
  432. for i, child := range mergedDag[commit.Hash] {
  433. if i > 0 {
  434. branches[child.Hash] = counter
  435. children = append(children, counter)
  436. counter++
  437. }
  438. }
  439. plan = append(plan, runAction{
  440. Action: runActionFork,
  441. Commit: nil,
  442. Items: children,
  443. })
  444. }
  445. }
  446. return plan
  447. }
  448. // optimizePlan removes "dead" nodes and inserts `runActionDelete` disposal steps.
  449. //
  450. // | *
  451. // * /
  452. // |\/
  453. // |/
  454. // *
  455. //
  456. func optimizePlan(plan []runAction) []runAction {
  457. // lives maps branch index to the number of commits in that branch
  458. lives := map[int]int{}
  459. // lastMentioned maps branch index to the index inside `plan` when that branch was last used
  460. lastMentioned := map[int]int{}
  461. for i, p := range plan {
  462. firstItem := p.Items[0]
  463. switch p.Action {
  464. case runActionCommit:
  465. lives[firstItem]++
  466. lastMentioned[firstItem] = i
  467. case runActionFork:
  468. lastMentioned[firstItem] = i
  469. case runActionMerge:
  470. for _, item := range p.Items {
  471. lastMentioned[item] = i
  472. }
  473. }
  474. }
  475. branchesToDelete := map[int]bool{}
  476. for key, life := range lives {
  477. if life == 1 {
  478. branchesToDelete[key] = true
  479. delete(lastMentioned, key)
  480. }
  481. }
  482. var optimizedPlan []runAction
  483. lastMentionedArr := make([][2]int, 0, len(lastMentioned) + 1)
  484. for key, val := range lastMentioned {
  485. if val != len(plan) - 1 {
  486. lastMentionedArr = append(lastMentionedArr, [2]int{val, key})
  487. }
  488. }
  489. if len(lastMentionedArr) == 0 && len(branchesToDelete) == 0 {
  490. // early return - we have nothing to optimize
  491. return plan
  492. }
  493. sort.Slice(lastMentionedArr, func(i, j int) bool {
  494. return lastMentionedArr[i][0] < lastMentionedArr[j][0]
  495. })
  496. lastMentionedArr = append(lastMentionedArr, [2]int{len(plan)-1, -1})
  497. prevpi := -1
  498. for _, pair := range lastMentionedArr {
  499. for pi := prevpi + 1; pi <= pair[0]; pi++ {
  500. p := plan[pi]
  501. switch p.Action {
  502. case runActionCommit:
  503. if !branchesToDelete[p.Items[0]] {
  504. optimizedPlan = append(optimizedPlan, p)
  505. }
  506. case runActionFork:
  507. var newBranches []int
  508. for _, b := range p.Items {
  509. if !branchesToDelete[b] {
  510. newBranches = append(newBranches, b)
  511. }
  512. }
  513. if len(newBranches) > 1 {
  514. optimizedPlan = append(optimizedPlan, runAction{
  515. Action: runActionFork,
  516. Commit: p.Commit,
  517. Items: newBranches,
  518. })
  519. }
  520. case runActionMerge:
  521. var newBranches []int
  522. for _, b := range p.Items {
  523. if !branchesToDelete[b] {
  524. newBranches = append(newBranches, b)
  525. }
  526. }
  527. if len(newBranches) > 1 {
  528. optimizedPlan = append(optimizedPlan, runAction{
  529. Action: runActionMerge,
  530. Commit: p.Commit,
  531. Items: newBranches,
  532. })
  533. }
  534. }
  535. }
  536. if pair[1] >= 0 {
  537. prevpi = pair[0]
  538. optimizedPlan = append(optimizedPlan, runAction{
  539. Action: runActionDelete,
  540. Commit: nil,
  541. Items: []int{pair[1]},
  542. })
  543. }
  544. }
  545. // single commit can be detected as redundant
  546. if len(optimizedPlan) > 0 {
  547. return optimizedPlan
  548. }
  549. return plan
  550. // TODO(vmarkovtsev): there can be also duplicate redundant merges, e.g.
  551. /*
  552. 0 4e34f03d829fbacb71cde0e010de87ea945dc69a [3]
  553. 0 4e34f03d829fbacb71cde0e010de87ea945dc69a [12]
  554. 2 [3 12]
  555. 0 06716c2b39422938b77ddafa4d5c39bb9e4476da [3]
  556. 0 06716c2b39422938b77ddafa4d5c39bb9e4476da [12]
  557. 2 [3 12]
  558. 0 1219c7bf9e0e1a93459a052ab8b351bfc379dc19 [12]
  559. */
  560. }