dag9_tenx_single_cell_immune_profiling.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import fetch_analysis_info_and_branch_func
  10. ## ARGS
  11. default_args = {
  12. 'owner': 'airflow',
  13. 'depends_on_past': False,
  14. 'start_date': days_ago(2),
  15. 'email_on_failure': False,
  16. 'email_on_retry': False,
  17. 'retries': 4,
  18. 'max_active_runs':10,
  19. 'catchup':False,
  20. 'retry_delay': timedelta(minutes=5),
  21. 'provide_context': True,
  22. }
  23. FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types')
  24. ## DAG
  25. dag = \
  26. DAG(
  27. dag_id='dag9_tenx_single_cell_immune_profiling',
  28. schedule_interval=None,
  29. tags=['hpc','analysis','tenx','sc'],
  30. default_args=default_args,
  31. orientation='LR')
  32. with dag:
  33. ## TASK
  34. fetch_analysis_info_and_branch = \
  35. BranchPythonOperator(
  36. task_id='fetch_analysis_info',
  37. dag=dag,
  38. queue='hpc_4g',
  39. params={'no_analysis_task':'no_analysis',
  40. 'analysis_description_xcom_key':'analysis_description',
  41. 'analysis_info_xcom_key':'analysis_info'},
  42. python_callable=fetch_analysis_info_and_branch_func)
  43. ## TASK
  44. configure_cellranger_run = \
  45. PythonOperator(
  46. task_id='configure_cellranger_run',
  47. dag=dag,
  48. queue='hpc_4g',
  49. params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
  50. 'analysis_description_xcom_key':'analysis_description',
  51. 'analysis_info_xcom_key':'analysis_info',
  52. 'library_csv_xcom_key':'cellranger_library_csv'},
  53. python_callable=configure_cellranger_run_func)
  54. for analysis_name in FEATURE_TYPE_LIST:
  55. ## TASK
  56. task_branch = \
  57. DummyOperator(
  58. task_id=analysis_name,
  59. dag=dag)
  60. run_trim_list = list()
  61. for run_id in range(0,32):
  62. ## TASK
  63. t = \
  64. DummyOperator(
  65. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  66. dag=dag)
  67. run_trim_list.append(t)
  68. ## TASK
  69. collect_trimmed_files = \
  70. DummyOperator(
  71. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  72. dag=dag)
  73. ## PIPELINE
  74. fetch_analysis_info_and_branch >> task_branch
  75. task_branch >> run_trim_list
  76. run_trim_list >> collect_trimmed_files
  77. collect_trimmed_files >> configure_cellranger_run
  78. ## TASK
  79. no_analysis = \
  80. DummyOperator(
  81. task_id='no_analysis',
  82. dag=dag)
  83. ## PIPELINE
  84. fetch_analysis_info_and_branch >> no_analysis
  85. ## TASK
  86. run_cellranger = \
  87. DummyOperator(
  88. task_id='run_cellranger',
  89. dag=dag)
  90. ## PIPELINE
  91. configure_cellranger_run >> run_cellranger
  92. ## TASK
  93. decide_analysis_branch = \
  94. BranchPythonOperator(
  95. task_id='decide_analysis_branch',
  96. dag=dag,
  97. python_callable=lambda: ['upload_report_to_ftp',
  98. 'upload_results_to_irods',
  99. 'run_scanpy_report',
  100. 'run_picard_alignment_summary'])
  101. ## PIPELINE
  102. run_cellranger >> decide_analysis_branch
  103. ## TASK
  104. upload_report_to_ftp = \
  105. DummyOperator(
  106. task_id='upload_report_to_ftp',
  107. dag=dag)
  108. ## PIPELINE
  109. decide_analysis_branch >> upload_report_to_ftp
  110. ## TASK
  111. upload_results_to_irods = \
  112. DummyOperator(
  113. task_id='upload_results_to_irods',
  114. dag=dag)
  115. ## PIPELINE
  116. decide_analysis_branch >> upload_results_to_irods
  117. ## TASK
  118. run_scanpy_report = \
  119. DummyOperator(
  120. task_id='run_scanpy_report',
  121. dag=dag)
  122. ## PIPELINE
  123. decide_analysis_branch >> run_scanpy_report
  124. ## TASK
  125. upload_scanpy_report_to_ftp = \
  126. DummyOperator(
  127. task_id='upload_scanpy_report_to_ftp',
  128. dag=dag)
  129. ## PIPELINE
  130. run_scanpy_report >> upload_scanpy_report_to_ftp
  131. ## TASK
  132. run_picard_alignment_summary = \
  133. DummyOperator(
  134. task_id='run_picard_alignment_summary',
  135. dag=dag)
  136. ## PIPELINE
  137. decide_analysis_branch >> run_picard_alignment_summary
  138. ## TASK
  139. run_picard_qual_summary = \
  140. DummyOperator(
  141. task_id='run_picard_qual_summary',
  142. dag=dag)
  143. ## PIPELINE
  144. run_picard_alignment_summary >> run_picard_qual_summary
  145. ## TASK
  146. run_picard_rna_summary = \
  147. DummyOperator(
  148. task_id='run_picard_rna_summary',
  149. dag=dag)
  150. ## PIPELINE
  151. run_picard_qual_summary >> run_picard_rna_summary
  152. ## TASK
  153. run_picard_gc_summary = \
  154. DummyOperator(
  155. task_id='run_picard_gc_summary',
  156. dag=dag)
  157. ## PIPELINE
  158. run_picard_rna_summary >> run_picard_gc_summary
  159. ## TASK
  160. run_samtools_stats = \
  161. DummyOperator(
  162. task_id='run_samtools_stats',
  163. dag=dag)
  164. ## PIPELINE
  165. run_picard_gc_summary >> run_samtools_stats
  166. ## TASK
  167. run_samtools_idxstats = \
  168. DummyOperator(
  169. task_id='run_samtools_idxstats',
  170. dag=dag)
  171. ## PIPELINE
  172. run_samtools_stats >> run_samtools_idxstats
  173. ## TASK
  174. run_multiqc = \
  175. DummyOperator(
  176. task_id='run_multiqc',
  177. dag=dag)
  178. ## PIPELINE
  179. run_samtools_idxstats >> run_multiqc
  180. ## TASK
  181. upload_multiqc_to_ftp = \
  182. DummyOperator(
  183. task_id='upload_multiqc_to_ftp',
  184. dag=dag)
  185. ## PIPELINE
  186. run_multiqc >> upload_multiqc_to_ftp
  187. ## TASK
  188. update_analysis_and_status = \
  189. DummyOperator(
  190. task_id='update_analysis_and_status',
  191. dag=dag)
  192. ## PIPELINE
  193. upload_multiqc_to_ftp >> update_analysis_and_status
  194. upload_scanpy_report_to_ftp >> update_analysis_and_status
  195. upload_results_to_irods >> update_analysis_and_status
  196. upload_report_to_ftp >> update_analysis_and_status