Avik Datta пре 4 година
родитељ
комит
6902c7b7a7
1 измењених фајлова са 70 додато и 12 уклоњено
  1. 70 12
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 70 - 12
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -161,6 +161,7 @@ with dag:
       python_callable=ftp_files_upload_for_analysis,
       params={'xcom_pull_task':'load_cellranger_result_to_db',
               'xcom_pull_files_key':'html_report_file',
+              'collection_name_task':'load_cellranger_result_to_db',
               'collection_name_key':'sample_igf_id',
               'collection_type':'FTP_CELLRANGER_MULTI',
               'collection_table':'sample',
@@ -213,9 +214,17 @@ with dag:
               'collection_table':'sample',
               'output_files_key':'output_db_files'})
   upload_scanpy_report_for_sc_5p_to_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scanpy_report_for_sc_5p_to_ftp',
-      dag=dag)
+      dag=dag,
+      python_callable=ftp_files_upload_for_analysis,
+      params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
+              'xcom_pull_files_key':'output_db_files',
+              'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'collection_type':'FTP_SCANPY_HTML',
+              'collection_table':'sample',
+              'collect_remote_file':True})
   upload_scanpy_report_for_sc_5p_to_box = \
     DummyOperator(
       task_id='upload_scanpy_report_for_sc_5p_to_box',
@@ -223,7 +232,15 @@ with dag:
   upload_cellbrowser_for_sc_5p_to_ftp = \
     DummyOperator(
       task_id='upload_cellbrowser_for_sc_5p_to_ftp',
-      dag=dag)
+      dag=dag,
+      python_callable=ftp_files_upload_for_analysis,
+      params={'xcom_pull_task':'run_scanpy_for_sc_5p',
+              'xcom_pull_files_key':'cellbrowser_dirs',
+              'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'collection_type':'FTP_CELLBROWSER',
+              'collection_table':'sample',
+              'collect_remote_file':True})
   ## PIPELINE
   decide_analysis_branch >> run_scanpy_for_sc_5p
   run_scanpy_for_sc_5p >> load_scanpy_report_for_sc_5p_to_db
@@ -263,9 +280,17 @@ with dag:
               'collection_table':'sample',
               'output_files_key':'output_db_files'})
   upload_scanpy_report_for_vdj_to_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scanpy_report_for_vdj_to_ftp',
-      dag=dag)
+      dag=dag,
+      python_callable=ftp_files_upload_for_analysis,
+      params={'xcom_pull_task':'load_scanpy_report_for_vdj_to_db',
+              'xcom_pull_files_key':'output_db_files',
+              'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'collection_type':'FTP_SCIRPY_VDJ_HTML',
+              'collection_table':'sample',
+              'collect_remote_file':True})
   upload_scanpy_report_for_vdj_to_box = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_to_box',
@@ -308,9 +333,17 @@ with dag:
               'collection_table':'sample',
               'output_files_key':'output_db_files'})
   upload_scanpy_report_for_vdj_b_to_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scanpy_report_for_vdj_b_to_ftp',
-      dag=dag)
+      dag=dag,
+      python_callable=ftp_files_upload_for_analysis,
+      params={'xcom_pull_task':'load_scanpy_report_for_vdj_b_to_db',
+              'xcom_pull_files_key':'output_db_files',
+              'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'collection_type':'FTP_SCIRPY_VDJ_B_HTML',
+              'collection_table':'sample',
+              'collect_remote_file':True})
   upload_scanpy_report_for_vdj_b_to_box = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_b_to_box',
@@ -353,9 +386,17 @@ with dag:
               'collection_table':'sample',
               'output_files_key':'output_db_files'})
   upload_scanpy_report_for_vdj_t_to_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scanpy_report_for_vdj_t_to_ftp',
-      dag=dag)
+      dag=dag,
+      python_callable=ftp_files_upload_for_analysis,
+      params={'xcom_pull_task':'load_scanpy_report_for_vdj_t_to_db',
+              'xcom_pull_files_key':'output_db_files',
+              'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'collection_type':'FTP_SCIRPY_VDJ_T_HTML',
+              'collection_table':'sample',
+              'collect_remote_file':True})
   upload_scanpy_report_for_vdj_t_to_box = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_t_to_box',
@@ -385,13 +426,30 @@ with dag:
               'template_ipynb_path':Variable.get('seurat_single_sample_template'),
               'singularity_image_path':Variable.get('seurat_notebook_image')})
   load_seurat_report_for_sc_5p_db = \
-    DummyOperator(
+    PythonOperator(
       task_id='load_seurat_report_for_sc_5p_db',
-      dag=dag)
+      dag=dag,
+      python_callable=load_analysis_files_func,
+      params={'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'file_name_task':'run_seurat_for_sc_5p',
+              'file_name_key':'seurat_notebook',
+              'analysis_name':'seurat_5p',
+              'collection_type':'SEURAT_HTML',
+              'collection_table':'sample',
+              'output_files_key':'output_db_files'})
   upload_seurat_report_for_sc_5p_ftp = \
     DummyOperator(
       task_id='upload_seurat_report_for_sc_5p_ftp',
-      dag=dag)
+      dag=dag,
+      python_callable=ftp_files_upload_for_analysis,
+      params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
+              'xcom_pull_files_key':'output_db_files',
+              'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'collection_type':'FTP_SEURAT_HTML',
+              'collection_table':'sample',
+              'collect_remote_file':True})
   upload_seurat_report_for_sc_5p_to_box = \
     DummyOperator(
       task_id='upload_seurat_report_for_sc_5p_to_box',