dag17_create_transcriptome_ref.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import os
  2. from datetime import timedelta
  3. from airflow.models import DAG
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.bash_operator import BashOperator
  8. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import download_gtf_file_func
  9. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_star_index_func
  10. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_rsem_index_func
  11. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_reflat_index_func
  12. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_ribosomal_interval_func
  13. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_cellranger_ref_func
  14. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import add_refs_to_db_collection_func
  15. args = {
  16. 'owner': 'airflow',
  17. 'start_date': days_ago(2),
  18. 'retries': 1,
  19. 'retry_delay': timedelta(minutes=5),
  20. 'provide_context': True,
  21. 'email_on_failure': False,
  22. 'email_on_retry': False,
  23. 'catchup': False,
  24. 'max_active_runs': 1}
  25. DAG_ID = \
  26. os.path.basename(__file__).\
  27. replace(".pyc", "").\
  28. replace(".py", "")
  29. dag = \
  30. DAG(
  31. dag_id=DAG_ID,
  32. schedule_interval=None,
  33. default_args=args,
  34. tags=['hpc'])
  35. with dag:
  36. ## TASK
  37. download_gtf_file = \
  38. PythonOperator(
  39. task_id="download_gtf_file",
  40. dag=dag,
  41. queue='hpc_4G',
  42. params={
  43. 'gtf_xcom_key': 'gtf_file'
  44. },
  45. python_callable=download_gtf_file_func)
  46. ## TASK
  47. create_star_index = \
  48. PythonOperator(
  49. task_id="create_star_index",
  50. dag=dag,
  51. queue='hpc_42G16t',
  52. params={
  53. 'gtf_xcom_task': 'download_gtf_file',
  54. 'gtf_xcom_key': 'gtf_file',
  55. 'star_ref_xcom_key': 'star_ref',
  56. 'threads': 12,
  57. 'star_options': [
  58. '--sjdbOverhang', 149]
  59. },
  60. python_callable=create_star_index_func)
  61. ## TASK
  62. create_rsem_index = \
  63. PythonOperator(
  64. task_id="create_rsem_index",
  65. dag=dag,
  66. queue='hpc_4G',
  67. params={
  68. 'gtf_xcom_task': 'download_gtf_file',
  69. 'gtf_xcom_key': 'gtf_file',
  70. 'rsem_ref_xcom_key': 'rsem_ref',
  71. 'threads': 8
  72. },
  73. python_callable=create_rsem_index_func)
  74. ## TASK
  75. create_reflat_index = \
  76. PythonOperator(
  77. task_id='create_reflat_index',
  78. dag=dag,
  79. queue='hpc_4G',
  80. params={
  81. 'gtf_xcom_task': 'download_gtf_file',
  82. 'gtf_xcom_key': 'gtf_file',
  83. 'refflat_ref_xcom_key': 'refflat_ref'
  84. },
  85. python_callable=create_reflat_index_func)
  86. ## TASK
  87. create_ribosomal_interval = \
  88. PythonOperator(
  89. task_id='create_ribosomal_interval',
  90. dag=dag,
  91. queue='hpc_4G',
  92. params={
  93. 'gtf_xcom_task': 'download_gtf_file',
  94. 'gtf_xcom_key': 'gtf_file',
  95. 'ribosomal_ref_xcom_key': 'ribosomal_ref',
  96. 'skip_gtf_rows': 5
  97. },
  98. python_callable=create_ribosomal_interval_func)
  99. ## TASK
  100. create_cellranger_ref = \
  101. PythonOperator(
  102. task_id='create_cellranger_ref',
  103. dag=dag,
  104. queue='hpc_32G8t',
  105. params={
  106. 'gtf_xcom_task': 'download_gtf_file',
  107. 'gtf_xcom_key': 'gtf_file',
  108. 'cellranger_ref_xcom_key': 'cellranger_ref',
  109. 'skip_gtf_rows': 5,
  110. 'threads': 7,
  111. 'memory': 30
  112. },
  113. python_callable=create_cellranger_ref_func)
  114. ## TASK
  115. add_refs_to_db_collection = \
  116. PythonOperator(
  117. task_id='add_refs_to_db_collection',
  118. dag=dag,
  119. queue='hpc_4G',
  120. params={
  121. 'task_list': [
  122. ['download_gtf_file', 'gtf_file', 'GENE_GTF'],
  123. ['create_star_index', 'star_ref', 'TRANSCRIPTOME_STAR'],
  124. ['create_rsem_index', 'rsem_ref', 'TRANSCRIPTOME_RSEM'],
  125. ['create_reflat_index', 'refflat_ref', 'GENE_REFFLAT'],
  126. ['create_ribosomal_interval', 'ribosomal_ref', 'RIBOSOMAL_INTERVAL'],
  127. ['create_cellranger_ref', 'cellranger_ref', 'TRANSCRIPTOME_TENX']
  128. ]
  129. },
  130. python_callable=add_refs_to_db_collection_func)
  131. ## PIPELINE
  132. download_gtf_file >> create_star_index >> add_refs_to_db_collection
  133. download_gtf_file >> create_rsem_index >> add_refs_to_db_collection
  134. download_gtf_file >> create_reflat_index >> add_refs_to_db_collection
  135. download_gtf_file >> create_ribosomal_interval >> add_refs_to_db_collection
  136. download_gtf_file >> create_cellranger_ref >> add_refs_to_db_collection