dag9_tenx_single_cell_immune_profiling.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import fetch_analysis_info_and_branch_func
  10. ## ARGS
  11. default_args = {
  12. 'owner': 'airflow',
  13. 'depends_on_past': False,
  14. 'start_date': days_ago(2),
  15. 'email_on_failure': False,
  16. 'email_on_retry': False,
  17. 'retries': 4,
  18. 'max_active_runs':10,
  19. 'catchup':False,
  20. 'retry_delay': timedelta(minutes=5),
  21. 'provide_context': True,
  22. }
  23. FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types')
  24. ## DAG
  25. dag = \
  26. DAG(
  27. dag_id='dag9_tenx_single_cell_immune_profiling',
  28. schedule_interval=None,
  29. tags=['hpc','analysis','tenx','sc'],
  30. default_args=default_args,
  31. orientation='LR')
  32. with dag:
  33. ## TASK
  34. fetch_analysis_info_and_branch = \
  35. BranchPythonOperator(
  36. task_id='fetch_analysis_info',
  37. dag=dag,
  38. python_callable=fetch_analysis_info_and_branch_func)
  39. ## TASK
  40. configure_cellranger_run = \
  41. DummyOperator(
  42. task_id='configure_cellranger_run',
  43. dag=dag)
  44. for analysis_name in FEATURE_TYPE_LIST:
  45. ## TASK
  46. task_branch = \
  47. DummyOperator(
  48. task_id=analysis_name,
  49. dag=dag)
  50. run_trim_list = []
  51. for run_id in range(0,32):
  52. ## TASK
  53. t = \
  54. DummyOperator(
  55. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  56. dag=dag)
  57. run_trim_list.append(t)
  58. ## TASK
  59. collect_trimmed_files = \
  60. DummyOperator(
  61. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  62. dag=dag)
  63. ## PIPELINE
  64. fetch_analysis_info_and_branch >> task_branch
  65. task_branch >> run_trim_list
  66. run_trim_list >> collect_trimmed_files
  67. collect_trimmed_files >> configure_cellranger_run
  68. ## TASK
  69. no_analysis = \
  70. DummyOperator(
  71. task_id='no_analysis',
  72. dag=dag)
  73. ## PIPELINE
  74. fetch_analysis_info_and_branch >> no_analysis
  75. ## TASK
  76. run_cellranger = \
  77. DummyOperator(
  78. task_id='run_cellranger',
  79. dag=dag)
  80. ## PIPELINE
  81. configure_cellranger_run >> run_cellranger
  82. ## TASK
  83. decide_analysis_branch = \
  84. BranchPythonOperator(
  85. task_id='decide_analysis_branch',
  86. dag=dag,
  87. python_callable=lambda: ['upload_report_to_ftp',
  88. 'upload_results_to_irods',
  89. 'run_scanpy_report',
  90. 'run_picard_alignment_summary'])
  91. ## PIPELINE
  92. run_cellranger >> decide_analysis_branch
  93. ## TASK
  94. upload_report_to_ftp = \
  95. DummyOperator(
  96. task_id='upload_report_to_ftp',
  97. dag=dag)
  98. ## PIPELINE
  99. decide_analysis_branch >> upload_report_to_ftp
  100. ## TASK
  101. upload_results_to_irods = \
  102. DummyOperator(
  103. task_id='upload_results_to_irods',
  104. dag=dag)
  105. ## PIPELINE
  106. decide_analysis_branch >> upload_results_to_irods
  107. ## TASK
  108. run_scanpy_report = \
  109. DummyOperator(
  110. task_id='run_scanpy_report',
  111. dag=dag)
  112. ## PIPELINE
  113. decide_analysis_branch >> run_scanpy_report
  114. ## TASK
  115. upload_scanpy_report_to_ftp = \
  116. DummyOperator(
  117. task_id='upload_scanpy_report_to_ftp',
  118. dag=dag)
  119. ## PIPELINE
  120. run_scanpy_report >> upload_scanpy_report_to_ftp
  121. ## TASK
  122. run_picard_alignment_summary = \
  123. DummyOperator(
  124. task_id='run_picard_alignment_summary',
  125. dag=dag)
  126. ## PIPELINE
  127. decide_analysis_branch >> run_picard_alignment_summary
  128. ## TASK
  129. run_picard_qual_summary = \
  130. DummyOperator(
  131. task_id='run_picard_qual_summary',
  132. dag=dag)
  133. ## PIPELINE
  134. run_picard_alignment_summary >> run_picard_qual_summary
  135. ## TASK
  136. run_picard_rna_summary = \
  137. DummyOperator(
  138. task_id='run_picard_rna_summary',
  139. dag=dag)
  140. ## PIPELINE
  141. run_picard_qual_summary >> run_picard_rna_summary
  142. ## TASK
  143. run_picard_gc_summary = \
  144. DummyOperator(
  145. task_id='run_picard_gc_summary',
  146. dag=dag)
  147. ## PIPELINE
  148. run_picard_rna_summary >> run_picard_gc_summary
  149. ## TASK
  150. run_samtools_stats = \
  151. DummyOperator(
  152. task_id='run_samtools_stats',
  153. dag=dag)
  154. ## PIPELINE
  155. run_picard_gc_summary >> run_samtools_stats
  156. ## TASK
  157. run_samtools_idxstats = \
  158. DummyOperator(
  159. task_id='run_samtools_idxstats',
  160. dag=dag)
  161. ## PIPELINE
  162. run_samtools_stats >> run_samtools_idxstats
  163. ## TASK
  164. run_multiqc = \
  165. DummyOperator(
  166. task_id='run_multiqc',
  167. dag=dag)
  168. ## PIPELINE
  169. run_samtools_idxstats >> run_multiqc
  170. ## TASK
  171. upload_multiqc_to_ftp = \
  172. DummyOperator(
  173. task_id='upload_multiqc_to_ftp',
  174. dag=dag)
  175. ## PIPELINE
  176. run_multiqc >> upload_multiqc_to_ftp
  177. ## TASK
  178. update_analysis_and_status = \
  179. DummyOperator(
  180. task_id='update_analysis_and_status',
  181. dag=dag)
  182. ## PIPELINE
  183. upload_multiqc_to_ftp >> update_analysis_and_status
  184. upload_scanpy_report_to_ftp >> update_analysis_and_status
  185. upload_results_to_irods >> update_analysis_and_status
  186. upload_report_to_ftp >> update_analysis_and_status