dag9_tenx_single_cell_immune_profiling.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import fetch_analysis_info_and_branch_func
  10. ## ARGS
  11. default_args = {
  12. 'owner': 'airflow',
  13. 'depends_on_past': False,
  14. 'start_date': days_ago(2),
  15. 'email_on_failure': False,
  16. 'email_on_retry': False,
  17. 'retries': 4,
  18. 'max_active_runs':10,
  19. 'catchup':False,
  20. 'retry_delay': timedelta(minutes=5),
  21. 'provide_context': True,
  22. }
  23. FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types')
  24. ## DAG
  25. dag = \
  26. DAG(
  27. dag_id='dag9_tenx_single_cell_immune_profiling',
  28. schedule_interval=None,
  29. tags=['hpc','analysis','tenx','sc'],
  30. default_args=default_args,
  31. orientation='LR')
  32. with dag:
  33. ## TASK
  34. fetch_analysis_info_and_branch = \
  35. BranchPythonOperator(
  36. task_id='fetch_analysis_info',
  37. dag=dag,
  38. queue='hpc_4g',
  39. params={'no_analysis_task':'no_analysis'},
  40. python_callable=fetch_analysis_info_and_branch_func)
  41. ## TASK
  42. configure_cellranger_run = \
  43. PythonOperator(
  44. task_id='configure_cellranger_run',
  45. dag=dag,
  46. queue='hpc_4g',
  47. python_callable=configure_cellranger_run_func)
  48. for analysis_name in FEATURE_TYPE_LIST:
  49. ## TASK
  50. task_branch = \
  51. DummyOperator(
  52. task_id=analysis_name,
  53. dag=dag)
  54. run_trim_list = list()
  55. for run_id in range(0,32):
  56. ## TASK
  57. t = \
  58. DummyOperator(
  59. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  60. dag=dag)
  61. run_trim_list.append(t)
  62. ## TASK
  63. collect_trimmed_files = \
  64. DummyOperator(
  65. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  66. dag=dag)
  67. ## PIPELINE
  68. fetch_analysis_info_and_branch >> task_branch
  69. task_branch >> run_trim_list
  70. run_trim_list >> collect_trimmed_files
  71. collect_trimmed_files >> configure_cellranger_run
  72. ## TASK
  73. no_analysis = \
  74. DummyOperator(
  75. task_id='no_analysis',
  76. dag=dag)
  77. ## PIPELINE
  78. fetch_analysis_info_and_branch >> no_analysis
  79. ## TASK
  80. run_cellranger = \
  81. DummyOperator(
  82. task_id='run_cellranger',
  83. dag=dag)
  84. ## PIPELINE
  85. configure_cellranger_run >> run_cellranger
  86. ## TASK
  87. decide_analysis_branch = \
  88. BranchPythonOperator(
  89. task_id='decide_analysis_branch',
  90. dag=dag,
  91. python_callable=lambda: ['upload_report_to_ftp',
  92. 'upload_results_to_irods',
  93. 'run_scanpy_report',
  94. 'run_picard_alignment_summary'])
  95. ## PIPELINE
  96. run_cellranger >> decide_analysis_branch
  97. ## TASK
  98. upload_report_to_ftp = \
  99. DummyOperator(
  100. task_id='upload_report_to_ftp',
  101. dag=dag)
  102. ## PIPELINE
  103. decide_analysis_branch >> upload_report_to_ftp
  104. ## TASK
  105. upload_results_to_irods = \
  106. DummyOperator(
  107. task_id='upload_results_to_irods',
  108. dag=dag)
  109. ## PIPELINE
  110. decide_analysis_branch >> upload_results_to_irods
  111. ## TASK
  112. run_scanpy_report = \
  113. DummyOperator(
  114. task_id='run_scanpy_report',
  115. dag=dag)
  116. ## PIPELINE
  117. decide_analysis_branch >> run_scanpy_report
  118. ## TASK
  119. upload_scanpy_report_to_ftp = \
  120. DummyOperator(
  121. task_id='upload_scanpy_report_to_ftp',
  122. dag=dag)
  123. ## PIPELINE
  124. run_scanpy_report >> upload_scanpy_report_to_ftp
  125. ## TASK
  126. run_picard_alignment_summary = \
  127. DummyOperator(
  128. task_id='run_picard_alignment_summary',
  129. dag=dag)
  130. ## PIPELINE
  131. decide_analysis_branch >> run_picard_alignment_summary
  132. ## TASK
  133. run_picard_qual_summary = \
  134. DummyOperator(
  135. task_id='run_picard_qual_summary',
  136. dag=dag)
  137. ## PIPELINE
  138. run_picard_alignment_summary >> run_picard_qual_summary
  139. ## TASK
  140. run_picard_rna_summary = \
  141. DummyOperator(
  142. task_id='run_picard_rna_summary',
  143. dag=dag)
  144. ## PIPELINE
  145. run_picard_qual_summary >> run_picard_rna_summary
  146. ## TASK
  147. run_picard_gc_summary = \
  148. DummyOperator(
  149. task_id='run_picard_gc_summary',
  150. dag=dag)
  151. ## PIPELINE
  152. run_picard_rna_summary >> run_picard_gc_summary
  153. ## TASK
  154. run_samtools_stats = \
  155. DummyOperator(
  156. task_id='run_samtools_stats',
  157. dag=dag)
  158. ## PIPELINE
  159. run_picard_gc_summary >> run_samtools_stats
  160. ## TASK
  161. run_samtools_idxstats = \
  162. DummyOperator(
  163. task_id='run_samtools_idxstats',
  164. dag=dag)
  165. ## PIPELINE
  166. run_samtools_stats >> run_samtools_idxstats
  167. ## TASK
  168. run_multiqc = \
  169. DummyOperator(
  170. task_id='run_multiqc',
  171. dag=dag)
  172. ## PIPELINE
  173. run_samtools_idxstats >> run_multiqc
  174. ## TASK
  175. upload_multiqc_to_ftp = \
  176. DummyOperator(
  177. task_id='upload_multiqc_to_ftp',
  178. dag=dag)
  179. ## PIPELINE
  180. run_multiqc >> upload_multiqc_to_ftp
  181. ## TASK
  182. update_analysis_and_status = \
  183. DummyOperator(
  184. task_id='update_analysis_and_status',
  185. dag=dag)
  186. ## PIPELINE
  187. upload_multiqc_to_ftp >> update_analysis_and_status
  188. upload_scanpy_report_to_ftp >> update_analysis_and_status
  189. upload_results_to_irods >> update_analysis_and_status
  190. upload_report_to_ftp >> update_analysis_and_status