dag9_tenx_single_cell_immune_profiling.py 7.5 KB


  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import fetch_analysis_info_and_branch_func
  10. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import configure_cellranger_run_func
  11. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import run_sc_read_trimmming_func
  12. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import run_cellranger_tool
  13. ## ARGS
  14. default_args = {
  15. 'owner': 'airflow',
  16. 'depends_on_past': False,
  17. 'start_date': days_ago(2),
  18. 'email_on_failure': False,
  19. 'email_on_retry': False,
  20. 'retries': 4,
  21. 'max_active_runs':10,
  22. 'catchup':False,
  23. 'retry_delay': timedelta(minutes=5),
  24. 'provide_context': True,
  25. }
  26. FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types')
  27. ## DAG
  28. dag = \
  29. DAG(
  30. dag_id='dag9_tenx_single_cell_immune_profiling',
  31. schedule_interval=None,
  32. tags=['hpc','analysis','tenx','sc'],
  33. default_args=default_args,
  34. orientation='LR')
  35. with dag:
  36. ## TASK
  37. fetch_analysis_info_and_branch = \
  38. BranchPythonOperator(
  39. task_id='fetch_analysis_info',
  40. dag=dag,
  41. queue='hpc_4g',
  42. params={'no_analysis_task':'no_analysis',
  43. 'analysis_description_xcom_key':'analysis_description',
  44. 'analysis_info_xcom_key':'analysis_info'},
  45. python_callable=fetch_analysis_info_and_branch_func)
  46. ## TASK
  47. configure_cellranger_run = \
  48. PythonOperator(
  49. task_id='configure_cellranger_run',
  50. dag=dag,
  51. queue='hpc_4g',
  52. params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
  53. 'analysis_description_xcom_key':'analysis_description',
  54. 'analysis_info_xcom_key':'analysis_info',
  55. 'library_csv_xcom_key':'cellranger_library_csv'},
  56. python_callable=configure_cellranger_run_func)
  57. for analysis_name in FEATURE_TYPE_LIST:
  58. ## TASK
  59. task_branch = \
  60. DummyOperator(
  61. task_id=analysis_name,
  62. dag=dag)
  63. run_trim_list = list()
  64. for run_id in range(0,32):
  65. ## TASK
  66. t = \
  67. PythonOperator(
  68. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  69. dag=dag,
  70. params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
  71. 'analysis_info_xcom_key':'analysis_info',
  72. 'analysis_name':analysis_name,
  73. 'run_id':run_id,
  74. 'r1_length':26,
  75. 'r2_length':0,
  76. 'fastq_input_dir_tag':'fastq_dir',
  77. 'fastq_output_dir_tag':'output_path'},
  78. python_callable=run_sc_read_trimmming_func)
  79. run_trim_list.append(t)
  80. ## TASK
  81. collect_trimmed_files = \
  82. DummyOperator(
  83. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  84. dag=dag)
  85. ## PIPELINE
  86. fetch_analysis_info_and_branch >> task_branch
  87. task_branch >> run_trim_list
  88. run_trim_list >> collect_trimmed_files
  89. collect_trimmed_files >> configure_cellranger_run
  90. ## TASK
  91. no_analysis = \
  92. DummyOperator(
  93. task_id='no_analysis',
  94. dag=dag)
  95. ## PIPELINE
  96. fetch_analysis_info_and_branch >> no_analysis
  97. ## TASK
  98. run_cellranger = \
  99. PythonOperator(
  100. task_id='run_cellranger',
  101. dag=dag,
  102. params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
  103. 'analysis_description_xcom_key':'analysis_description',
  104. 'library_csv_xcom_key':'cellranger_library_csv',
  105. 'library_csv_xcom_pull_task':'configure_cellranger_run',
  106. 'cellranger_xcom_key':'cellranger_output'},
  107. python_callable=run_cellranger_tool)
  108. ## PIPELINE
  109. configure_cellranger_run >> run_cellranger
  110. ## TASK
  111. decide_analysis_branch = \
  112. BranchPythonOperator(
  113. task_id='decide_analysis_branch',
  114. dag=dag,
  115. python_callable=lambda: ['upload_report_to_ftp',
  116. 'upload_report_to_box',
  117. 'upload_results_to_irods',
  118. 'run_scanpy_report',
  119. 'run_picard_alignment_summary'])
  120. ## PIPELINE
  121. run_cellranger >> decide_analysis_branch
  122. ## TASK
  123. upload_report_to_ftp = \
  124. DummyOperator(
  125. task_id='upload_report_to_ftp',
  126. dag=dag)
  127. ## PIPELINE
  128. decide_analysis_branch >> upload_report_to_ftp
  129. ## TASK
  130. upload_report_to_box = \
  131. DummyOperator(
  132. task_id='upload_report_to_box',
  133. dag=dag)
  134. ## PIPELINE
  135. decide_analysis_branch >> upload_report_to_box
  136. ## TASK
  137. upload_results_to_irods = \
  138. DummyOperator(
  139. task_id='upload_results_to_irods',
  140. dag=dag)
  141. ## PIPELINE
  142. decide_analysis_branch >> upload_results_to_irods
  143. ## TASK
  144. run_scanpy_report = \
  145. DummyOperator(
  146. task_id='run_scanpy_report',
  147. dag=dag)
  148. ## PIPELINE
  149. decide_analysis_branch >> run_scanpy_report
  150. ## TASK
  151. upload_scanpy_report_to_ftp = \
  152. DummyOperator(
  153. task_id='upload_scanpy_report_to_ftp',
  154. dag=dag)
  155. ## PIPELINE
  156. run_scanpy_report >> upload_scanpy_report_to_ftp
  157. ## TASK
  158. upload_scanpy_report_to_box = \
  159. DummyOperator(
  160. task_id='upload_scanpy_report_to_box',
  161. dag=dag)
  162. ## PIPELINE
  163. run_scanpy_report >> upload_scanpy_report_to_box
  164. ## TASK
  165. run_picard_alignment_summary = \
  166. DummyOperator(
  167. task_id='run_picard_alignment_summary',
  168. dag=dag)
  169. ## PIPELINE
  170. decide_analysis_branch >> run_picard_alignment_summary
  171. ## TASK
  172. run_picard_qual_summary = \
  173. DummyOperator(
  174. task_id='run_picard_qual_summary',
  175. dag=dag)
  176. ## PIPELINE
  177. run_picard_alignment_summary >> run_picard_qual_summary
  178. ## TASK
  179. run_picard_rna_summary = \
  180. DummyOperator(
  181. task_id='run_picard_rna_summary',
  182. dag=dag)
  183. ## PIPELINE
  184. run_picard_qual_summary >> run_picard_rna_summary
  185. ## TASK
  186. run_picard_gc_summary = \
  187. DummyOperator(
  188. task_id='run_picard_gc_summary',
  189. dag=dag)
  190. ## PIPELINE
  191. run_picard_rna_summary >> run_picard_gc_summary
  192. ## TASK
  193. run_samtools_stats = \
  194. DummyOperator(
  195. task_id='run_samtools_stats',
  196. dag=dag)
  197. ## PIPELINE
  198. run_picard_gc_summary >> run_samtools_stats
  199. ## TASK
  200. run_samtools_idxstats = \
  201. DummyOperator(
  202. task_id='run_samtools_idxstats',
  203. dag=dag)
  204. ## PIPELINE
  205. run_samtools_stats >> run_samtools_idxstats
  206. ## TASK
  207. run_multiqc = \
  208. DummyOperator(
  209. task_id='run_multiqc',
  210. dag=dag)
  211. ## PIPELINE
  212. run_samtools_idxstats >> run_multiqc
  213. ## TASK
  214. upload_multiqc_to_ftp = \
  215. DummyOperator(
  216. task_id='upload_multiqc_to_ftp',
  217. dag=dag)
  218. ## PIPELINE
  219. run_multiqc >> upload_multiqc_to_ftp
  220. ## TASK
  221. upload_multiqc_to_box = \
  222. DummyOperator(
  223. task_id='upload_multiqc_to_box',
  224. dag=dag)
  225. ## PIPELINE
  226. run_multiqc >> upload_multiqc_to_box
  227. ## TASK
  228. update_analysis_and_status = \
  229. DummyOperator(
  230. task_id='update_analysis_and_status',
  231. dag=dag)
  232. ## PIPELINE
  233. upload_multiqc_to_ftp >> update_analysis_and_status
  234. upload_scanpy_report_to_ftp >> update_analysis_and_status
  235. upload_scanpy_report_to_box >> update_analysis_and_status
  236. upload_results_to_irods >> update_analysis_and_status
  237. upload_report_to_ftp >> update_analysis_and_status
  238. upload_report_to_box >> update_analysis_and_status