Browse Source

dag9 for sc 5p analysis

Avik Datta 4 years ago
parent
commit
70d0c9256c
1 changed files with 190 additions and 0 deletions
  1. 190 0
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 190 - 0
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -0,0 +1,190 @@
+from datetime import timedelta
+import os,json,logging,subprocess
+from airflow.models import DAG,Variable
+from airflow.utils.dates import days_ago
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import fetch_analysis_info_and_branch_func
+
+## ARGS
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': False,
+    'start_date': days_ago(2),
+    'email_on_failure': False,
+    'email_on_retry': False,
+    'retries': 4,
+    'max_active_runs':10,
+    'catchup':False,
+    'retry_delay': timedelta(minutes=5),
+    'provide_context': True,
+}
+
+FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types')
+
+## DAG
+dag = \
+  DAG(
+    dag_id='dag9_tenx_single_cell_immune_profiling',
+    schedule_interval=None,
+    tags=['hpc','analysis','tenx','sc'],
+    default_args=default_args,
+    orientation='LR')
+
+with dag:
+  ## TASK
+  fetch_analysis_info_and_branch = \
+    BranchPythonOperator(
+      task_id='fetch_analysis_info',
+      dag=dag,
+      python_callable=fetch_analysis_info_and_branch_func)
+  ## TASK
+  configure_cellranger_run = \
+    DummyOperator(
+      task_id='configure_cellranger_run',
+      dag=dag)
+  for analysis_name in FEATURE_TYPE_LIST:
+    ## TASK
+    task_branch = \
+      DummyOperator(
+        task_id=analysis_name,
+        dag=dag)
+    run_trim_list = []
+    for run_id in range(1,33):
+      ## TASK
+      t = \
+        DummyOperator(
+          task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
+          dag=dag)
+      run_trim_list.append(t)
+    ## TASK
+    collect_trimmed_files = \
+      DummyOperator(
+        task_id='collect_trimmed_files_{0}'.format(analysis_name),
+        dag=dag)
+    ## PIPELINE
+    fetch_analysis_info_and_branch >> task_branch
+    task_branch >> run_trim_list
+    run_trim_list >> collect_trimmed_files
+    collect_trimmed_files >> configure_cellranger_run
+  ## TASK
+  no_analysis = \
+    DummyOperator(
+      task_id='no_analysis',
+      dag=dag)
+  ## PIPELINE
+  fetch_analysis_info_and_branch >> no_analysis
+  ## TASK
+  run_cellranger = \
+    DummyOperator(
+      task_id='run_cellranger',
+      dag=dag)
+  ## PIPELINE
+  configure_cellranger_run >> run_cellranger
+  ## TASK
+  decide_analysis_branch = \
+    BranchPythonOperator(
+      task_id='decide_analysis_branch',
+      dag=dag,
+      python_callable=lambda: ['upload_report_to_ftp',
+                               'upload_results_to_irods',
+                               'run_scanpy_report',
+                               'run_picard_alignment_summary'])
+  ## PIPELINE
+  run_cellranger >> decide_analysis_branch
+  ## TASK
+  upload_report_to_ftp = \
+    DummyOperator(
+      task_id='upload_report_to_ftp',
+      dag=dag)
+  ## PIPELINE
+  decide_analysis_branch >> upload_report_to_ftp
+  ## TASK
+  upload_results_to_irods = \
+    DummyOperator(
+      task_id='upload_results_to_irods',
+      dag=dag)
+  ## PIPELINE
+  decide_analysis_branch >> upload_results_to_irods
+  ## TASK
+  run_scanpy_report = \
+    DummyOperator(
+      task_id='run_scanpy_report',
+      dag=dag)
+  ## PIPELINE
+  decide_analysis_branch >> run_scanpy_report
+  ## TASK
+  upload_scanpy_report_to_ftp = \
+    DummyOperator(
+      task_id='upload_scanpy_report_to_ftp',
+      dag=dag)
+  ## PIPELINE
+  run_scanpy_report >> upload_scanpy_report_to_ftp
+  ## TASK
+  run_picard_alignment_summary = \
+    DummyOperator(
+      task_id='run_picard_alignment_summary',
+      dag=dag)
+  ## PIPELINE
+  decide_analysis_branch >> run_picard_alignment_summary
+  ## TASK
+  run_picard_qual_summary = \
+    DummyOperator(
+      task_id='run_picard_qual_summary',
+      dag=dag)
+  ## PIPELINE
+  run_picard_alignment_summary >> run_picard_qual_summary
+  ## TASK
+  run_picard_rna_summary = \
+    DummyOperator(
+      task_id='run_picard_rna_summary',
+      dag=dag)
+  ## PIPELINE
+  run_picard_qual_summary >> run_picard_rna_summary
+  ## TASK
+  run_picard_gc_summary = \
+    DummyOperator(
+      task_id='run_picard_gc_summary',
+      dag=dag)
+  ## PIPELINE
+  run_picard_rna_summary >> run_picard_gc_summary
+  ## TASK
+  run_samtools_stats = \
+    DummyOperator(
+      task_id='run_samtools_stats',
+      dag=dag)
+  ## PIPELINE
+  run_picard_gc_summary >> run_samtools_stats
+  ## TASK
+  run_samtools_idxstats = \
+    DummyOperator(
+      task_id='run_samtools_idxstats',
+      dag=dag)
+  ## PIPELINE
+  run_samtools_stats >> run_samtools_idxstats
+  ## TASK
+  run_multiqc = \
+    DummyOperator(
+      task_id='run_multiqc',
+      dag=dag)
+  ## PIPELINE
+  run_samtools_idxstats >> run_multiqc
+  ## TASK
+  upload_multiqc_to_ftp = \
+    DummyOperator(
+      task_id='upload_multiqc_to_ftp',
+      dag=dag)
+  ## PIPELINE
+  run_multiqc >> upload_multiqc_to_ftp
+  ## TASK
+  update_analysis_and_status = \
+    DummyOperator(
+      task_id='update_analysis_and_status',
+      dag=dag)
+  ## PIPELINE
+  upload_multiqc_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_to_ftp >> update_analysis_and_status
+  upload_results_to_irods >> update_analysis_and_status
+  upload_report_to_ftp >> update_analysis_and_status