dag17_create_transcriptome_ref.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import os
  2. from datetime import timedelta
  3. from airflow.models import DAG
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.bash_operator import BashOperator
  8. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import download_gtf_file_func
  9. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_star_index_func
  10. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_rsem_index_func
  11. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_reflat_index_func
  12. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_ribosomal_interval_func
  13. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_cellranger_ref_func
  14. args = {
  15. 'owner': 'airflow',
  16. 'start_date': days_ago(2),
  17. 'retries': 1,
  18. 'retry_delay': timedelta(minutes=5),
  19. 'provide_context': True,
  20. 'email_on_failure': False,
  21. 'email_on_retry': False,
  22. 'catchup': False,
  23. 'max_active_runs': 1}
  24. DAG_ID = \
  25. os.path.basename(__file__).\
  26. replace(".pyc", "").\
  27. replace(".py", "")
  28. dag = \
  29. DAG(
  30. dag_id=DAG_ID,
  31. schedule_interval=None,
  32. default_args=args,
  33. tags=['hpc'])
  34. with dag:
  35. ## TASK
  36. download_gtf_file = \
  37. PythonOperator(
  38. task_id="download_gtf_file",
  39. dag=dag,
  40. queue='hpc_4G',
  41. params={
  42. 'gtf_xcom_key': 'gtf_file'
  43. },
  44. python_callable=download_gtf_file_func)
  45. ## TASK
  46. create_star_index = \
  47. PythonOperator(
  48. task_id="create_star_index",
  49. dag=dag,
  50. queue='hpc_32G8t',
  51. params={
  52. 'gtf_xcom_task': 'download_gtf_file',
  53. 'gtf_xcom_key': 'gtf_file',
  54. 'star_ref_xcom_key': 'star_ref',
  55. 'threads': 8,
  56. 'star_options': [
  57. '--sjdbOverhang', 149]
  58. },
  59. python_callable=create_star_index_func)
  60. ## TASK
  61. create_rsem_index = \
  62. PythonOperator(
  63. task_id="create_rsem_index",
  64. dag=dag,
  65. queue='hpc_8G8t',
  66. params={
  67. 'gtf_xcom_task': 'download_gtf_file',
  68. 'gtf_xcom_key': 'gtf_file',
  69. 'rsem_ref_xcom_key': 'rsem_ref',
  70. 'threads': 8
  71. },
  72. python_callable=create_rsem_index_func)
  73. ## TASK
  74. create_reflat_index = \
  75. PythonOperator(
  76. task_id='create_reflat_index',
  77. dag=dag,
  78. queue='hpc_4G',
  79. params={
  80. 'gtf_xcom_task': 'download_gtf_file',
  81. 'gtf_xcom_key': 'gtf_file',
  82. 'refflat_ref_xcom_key': 'refflat_ref'
  83. },
  84. python_callable=create_reflat_index_func)
  85. ## TASK
  86. create_ribosomal_interval = \
  87. PythonOperator(
  88. task_id='create_ribosomal_interval',
  89. dag=dag,
  90. queue='hpc_4G',
  91. params={
  92. 'gtf_xcom_task': 'download_gtf_file',
  93. 'gtf_xcom_key': 'gtf_file',
  94. 'ribosomal_ref_xcom_key': 'ribosomal_ref',
  95. 'skip_gtf_rows': 5
  96. },
  97. python_callable=create_ribosomal_interval_func)
  98. ## TASK
  99. create_cellranger_ref = \
  100. PythonOperator(
  101. task_id='create_cellranger_ref',
  102. dag=dag,
  103. queue='hpc_8G8t',
  104. params={
  105. 'gtf_xcom_task': 'download_gtf_file',
  106. 'gtf_xcom_key': 'gtf_file',
  107. 'cellranger_ref_xcom_key': 'cellranger_ref',
  108. 'skip_gtf_rows': 5,
  109. 'threads': 8,
  110. 'memory': 8
  111. },
  112. python_callable=create_cellranger_ref_func)
  113. ## PIPELINE
  114. download_gtf_file >> create_star_index
  115. download_gtf_file >> create_rsem_index
  116. download_gtf_file >> create_reflat_index
  117. download_gtf_file >> create_ribosomal_interval
  118. download_gtf_file >> create_cellranger_ref