Преглед на файлове

uploading rns velocity output files

Avik Datta преди 3 години
родител
ревизия
308dd40dd6
променени са 1 файла, в които са добавени 27 реда и са изтрити 7 реда
  1. 27 7
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 27 - 7
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -29,7 +29,6 @@ from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import generate_cell_sorted_bam_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import generate_cell_sorted_bam_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_velocyto_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_velocyto_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scvelo_for_sc_5p_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scvelo_for_sc_5p_func
-from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_loom_file_to_rds_func
 
 
 ## ARGS
 ## ARGS
 default_args = {
 default_args = {
@@ -585,11 +584,18 @@ with dag:
               'analysis_name':'velocyto_5p',
               'analysis_name':'velocyto_5p',
               'collection_type':'VELOCYTO_LOOM',
               'collection_type':'VELOCYTO_LOOM',
               'collection_table':'sample',
               'collection_table':'sample',
-              'output_files_key':'output_db_files'}))
+              'output_files_key':'output_db_files'})
   upload_loom_file_to_irods = \
   upload_loom_file_to_irods = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_loom_file_to_irods',
       task_id='upload_loom_file_to_irods',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=irods_files_upload_for_analysis,
+      params={'xcom_pull_task':'load_loom_file_to_rds',
+              'xcom_pull_files_key':'output_db_files',
+              'collection_name_key':'sample_igf_id',
+              'collection_name_task':'load_cellranger_result_to_db',
+              'analysis_name':'velocyto_loom'})
   load_scvelo_report_to_rds = \
   load_scvelo_report_to_rds = \
     PythonOperator(
     PythonOperator(
       task_id='load_scvelo_report_to_rds',
       task_id='load_scvelo_report_to_rds',
@@ -605,13 +611,27 @@ with dag:
               'collection_table':'sample',
               'collection_table':'sample',
               'output_files_key':'output_db_files'})
               'output_files_key':'output_db_files'})
   upload_scvelo_report_to_ftp = \
   upload_scvelo_report_to_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_scvelo_report_to_ftp',
       task_id='upload_scvelo_report_to_ftp',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=ftp_files_upload_for_analysis,
+      params={'xcom_pull_task':'load_scvelo_report_to_rds',
+              'xcom_pull_files_key':'output_db_files',
+              'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'collection_type':'FTP_SCVELO_HTML',
+              'collection_table':'sample',
+              'collect_remote_file':True})
   upload_scvelo_report_to_box = \
   upload_scvelo_report_to_box = \
     DummyOperator(
     DummyOperator(
       task_id='upload_scvelo_report_to_box',
       task_id='upload_scvelo_report_to_box',
-      dag=dag)
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=upload_analysis_file_to_box,
+      params={'xcom_pull_task':'load_scvelo_report_to_rds',
+              'xcom_pull_files_key':'output_db_files',
+              'analysis_tag':'scvelo_single_sample_report'})
   ## PIPELINE
   ## PIPELINE
   convert_cellranger_bam_to_cram >> generate_cell_sorted_bam
   convert_cellranger_bam_to_cram >> generate_cell_sorted_bam
   generate_cell_sorted_bam >> run_velocyto
   generate_cell_sorted_bam >> run_velocyto