فهرست منبع

converted dummy operators to python operators with null function

Avik Datta 4 سال پیش
والد
کامیت
0fb5301c99
1فایلهای تغییر یافته به همراه82 افزوده شده و 47 حذف شده
  1. 82 47
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 82 - 47
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -181,15 +181,16 @@ with dag:
               'collection_table':'sample',
               'collect_remote_file':True})
   upload_cellranger_report_to_box = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_cellranger_report_to_box',
       dag=dag,
       queue='hpc_4G',
+      python_callable=None,
       params={'xcom_pull_task':'load_cellranger_result_to_db',
               'xcom_pull_files_key':'html_report_file'})
   upload_cellranger_results_to_irods = \
     PythonOperator(
-      task_id='upload_results_to_irods',
+      task_id='upload_cellranger_results_to_irods',
       dag=dag,
       queue='hpc_4G',
       python_callable=irods_files_upload_for_analysis,
@@ -246,11 +247,13 @@ with dag:
               'collection_table':'sample',
               'collect_remote_file':True})
   upload_scanpy_report_for_sc_5p_to_box = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scanpy_report_for_sc_5p_to_box',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   upload_cellbrowser_for_sc_5p_to_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_cellbrowser_for_sc_5p_to_ftp',
       dag=dag,
       queue='hpc_4G',
@@ -316,9 +319,11 @@ with dag:
               'collection_table':'sample',
               'collect_remote_file':True})
   upload_scirpy_report_for_vdj_to_box = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scirpy_report_for_vdj_to_box',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   decide_analysis_branch >> run_scirpy_for_vdj
   run_scirpy_for_vdj >> load_scirpy_report_for_vdj_to_db
@@ -326,13 +331,13 @@ with dag:
   load_scirpy_report_for_vdj_to_db >> upload_scirpy_report_for_vdj_to_box
   ## TASK
   run_scirpy_for_vdj_b = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_scirpy_for_vdj_b',
       dag=dag,
       queue='hpc_4G',
       python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
-              ' cellranger_xcom_pull_task':'run_cellranger',
+              'cellranger_xcom_pull_task':'run_cellranger',
               'scanpy_timeout':1200,
               'allow_errors':False,
               'kernel_name':'python3',
@@ -372,9 +377,11 @@ with dag:
               'collection_table':'sample',
               'collect_remote_file':True})
   upload_scirpy_report_for_vdj_b_to_box = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scanpy_report_for_vdj_b_to_box',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   decide_analysis_branch >> run_scirpy_for_vdj_b
   run_scirpy_for_vdj_b >> load_scirpy_report_for_vdj_b_to_db
@@ -382,7 +389,7 @@ with dag:
   load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_box
   ## TASK
   run_scirpy_for_vdj_t = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_scirpy_for_vdj_t',
       dag=dag,
       queue='hpc_4G',
@@ -428,9 +435,11 @@ with dag:
               'collection_table':'sample',
               'collect_remote_file':True})
   upload_scirpy_report_for_vdj_t_to_box = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scirpy_report_for_vdj_t_to_box',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   decide_analysis_branch >> run_scirpy_for_vdj_t
   run_scirpy_for_vdj_t >> load_scirpy_report_for_vdj_t_to_db
@@ -471,7 +480,7 @@ with dag:
               'collection_table':'sample',
               'output_files_key':'output_db_files'})
   upload_seurat_report_for_sc_5p_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_seurat_report_for_sc_5p_ftp',
       dag=dag,
       queue='hpc_4G',
@@ -484,9 +493,11 @@ with dag:
               'collection_table':'sample',
               'collect_remote_file':True})
   upload_seurat_report_for_sc_5p_to_box = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_seurat_report_for_sc_5p_to_box',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   decide_analysis_branch >> run_seurat_for_sc_5p
   run_seurat_for_sc_5p >> load_seurat_report_for_sc_5p_db
@@ -494,103 +505,127 @@ with dag:
   load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_to_box
   ## TASK
   convert_bam_to_cram = \
-    DummyOperator(
+    PythonOperator(
       task_id='convert_bam_to_cram',
       dag=dag,
+      queue='hpc_4G',
+      python_callable=None,
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger'})
   upload_cram_to_irods = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_cram_to_irods',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   decide_analysis_branch >> convert_bam_to_cram
   convert_bam_to_cram >> upload_cram_to_irods
   ## TASK
   run_picard_alignment_summary = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_picard_alignment_summary',
       dag=dag,
+      queue='hpc_4G',
+      python_callable=None,
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger'})
   ## PIPELINE
   decide_analysis_branch >> run_picard_alignment_summary
   ## TASK
   run_picard_qual_summary = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_picard_qual_summary',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   run_picard_alignment_summary >> run_picard_qual_summary
   ## TASK
   run_picard_rna_summary = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_picard_rna_summary',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   run_picard_qual_summary >> run_picard_rna_summary
   ## TASK
   run_picard_gc_summary = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_picard_gc_summary',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   run_picard_rna_summary >> run_picard_gc_summary
   ## TASK
   run_samtools_stats = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_samtools_stats',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   run_picard_gc_summary >> run_samtools_stats
   ## TASK
   run_samtools_idxstats = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_samtools_idxstats',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   run_samtools_stats >> run_samtools_idxstats
   ## TASK
   run_multiqc = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_multiqc',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   run_samtools_idxstats >> run_multiqc
   ## TASK
   upload_multiqc_to_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_multiqc_to_ftp',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   run_multiqc >> upload_multiqc_to_ftp
   ## TASK
   upload_multiqc_to_box = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_multiqc_to_box',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=None)
   ## PIPELINE
   run_multiqc >> upload_multiqc_to_box
   ## TASK
   update_analysis_and_status = \
-    DummyOperator(
+    PythonOperator(
       task_id='update_analysis_and_status',
       dag=dag,
+      queue='hpc_4G',
+      python_callable=None,
       trigger_rule='none_failed_or_skipped')
   ## PIPELINE
   upload_multiqc_to_ftp >> update_analysis_and_status
   upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status
   upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status
   upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
-  upload_scanpy_report_for_vdj_to_ftp >> update_analysis_and_status
-  upload_scanpy_report_for_vdj_to_box >> update_analysis_and_status
-  upload_scanpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
-  upload_scanpy_report_for_vdj_b_to_box >> update_analysis_and_status
-  upload_scanpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
-  upload_scanpy_report_for_vdj_t_to_box >> update_analysis_and_status
+  upload_scirpy_report_for_vdj_to_ftp >> update_analysis_and_status
+  upload_scirpy_report_for_vdj_to_box >> update_analysis_and_status
+  upload_scirpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
+  upload_scirpy_report_for_vdj_b_to_box >> update_analysis_and_status
+  upload_scirpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
+  upload_scirpy_report_for_vdj_t_to_box >> update_analysis_and_status
   upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
   upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
-  upload_results_to_irods >> update_analysis_and_status
-  upload_report_to_ftp >> update_analysis_and_status
-  upload_report_to_box >> update_analysis_and_status
+  upload_cellranger_results_to_irods >> update_analysis_and_status
+  upload_cellranger_report_to_ftp >> update_analysis_and_status
+  upload_cellranger_report_to_box >> update_analysis_and_status
   upload_cram_to_irods >> update_analysis_and_status