from datetime import timedelta import os,json,logging,subprocess from airflow.models import DAG,Variable from airflow.utils.dates import days_ago from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import fetch_analysis_info_and_branch_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import configure_cellranger_run_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_sc_read_trimmming_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_cellranger_tool from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import decide_analysis_branch_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_result_to_db_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import ftp_files_upload_for_analysis from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods_files_upload_for_analysis from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import task_branch_function from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import upload_analysis_file_to_box from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import convert_bam_to_cram_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_picard_for_cellranger from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_samtools_for_cellranger from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_multiqc_for_cellranger from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import index_and_copy_bam_for_parallel_analysis from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import change_pipeline_status from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import clean_up_files from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import create_and_update_qc_pages from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_metrices_to_collection from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import generate_cell_sorted_bam_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_velocyto_func from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scvelo_for_sc_5p_func ## ARGS default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email_on_failure': False, 'email_on_retry': False, 'retries': 4, 'max_active_runs': 10, 'catchup': True, 'retry_delay': timedelta(minutes=5), 'provide_context': True, } FEATURE_TYPE_LIST = \ Variable.get('tenx_single_cell_immune_profiling_feature_types', default_var={}) ## DAG dag = \ DAG( dag_id='dag9_tenx_single_cell_immune_profiling', schedule_interval=None, tags=['hpc', 'analysis', 'tenx', 'sc'], default_args=default_args, concurrency=100, max_active_runs=20, orientation='LR') with dag: ## TASK fetch_analysis_info_and_branch = \ BranchPythonOperator( task_id='fetch_analysis_info', dag=dag, queue='hpc_4G', params={ 'no_analysis_task': 'no_analysis', 'analysis_description_xcom_key': 'analysis_description', 'analysis_info_xcom_key': 'analysis_info'}, python_callable=fetch_analysis_info_and_branch_func) ## TASK configure_cellranger_run = \ PythonOperator( task_id='configure_cellranger_run', dag=dag, queue='hpc_4G', trigger_rule='none_failed_or_skipped', params={ 'xcom_pull_task_id': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'analysis_info_xcom_key': 'analysis_info', 'library_csv_xcom_key': 'cellranger_library_csv'}, python_callable=configure_cellranger_run_func) for analysis_name in FEATURE_TYPE_LIST.keys(): ## TASK task_branch = \ BranchPythonOperator( task_id=analysis_name, dag=dag, queue='hpc_4G', params={ 'xcom_pull_task_id': 'fetch_analysis_info', 'analysis_info_xcom_key': 'analysis_info', 'analysis_name': analysis_name, 'task_prefix': 'run_trim'}, python_callable=task_branch_function) run_trim_list = list() for run_id in range(0,10): ## TASK t = \ PythonOperator( task_id='run_trim_{0}_{1}'.format(analysis_name, run_id), dag=dag, queue='hpc_4G', params={ 'xcom_pull_task_id': 'fetch_analysis_info', 'analysis_info_xcom_key': 'analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'analysis_name': analysis_name, 'run_id': run_id, 'r1-length': 0, 'r2-length': 0, 'fastq_input_dir_tag': 'fastq_dir', 'use_ephemeral_space': True, 'fastq_output_dir_tag': 'output_path'}, python_callable=run_sc_read_trimmming_func) run_trim_list.append(t) ## TASK collect_trimmed_files = \ DummyOperator( task_id='collect_trimmed_files_{0}'.format(analysis_name), trigger_rule='none_failed_or_skipped', dag=dag) ## PIPELINE fetch_analysis_info_and_branch >> task_branch task_branch >> run_trim_list run_trim_list >> collect_trimmed_files collect_trimmed_files >> configure_cellranger_run ## TASK no_analysis = \ DummyOperator( task_id='no_analysis', dag=dag) ## PIPELINE fetch_analysis_info_and_branch >> no_analysis ## TASK run_cellranger = \ PythonOperator( task_id='run_cellranger', dag=dag, queue='hpc_64G16t24hr', params={ 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'library_csv_xcom_key': 'cellranger_library_csv', 'library_csv_xcom_pull_task': 'configure_cellranger_run', 'cellranger_xcom_key': 'cellranger_output', 'cellranger_options': ['--localcores 16', '--localmem 64']}, python_callable=run_cellranger_tool) ## PIPELINE configure_cellranger_run >> run_cellranger ## TASK decide_analysis_branch = \ BranchPythonOperator( task_id='decide_analysis_branch', dag=dag, queue='hpc_4G', python_callable=decide_analysis_branch_func, params={ 'load_cellranger_result_to_db_task': 'load_cellranger_result_to_db', 'run_scanpy_for_sc_5p_task': 'run_scanpy_for_sc_5p', 'run_scirpy_for_vdj_task': 'run_scirpy_for_vdj', 'run_scirpy_for_vdj_b_task': 'run_scirpy_for_vdj_b', 'run_scirpy_vdj_t_task': 'run_scirpy_for_vdj_t', 'run_seurat_for_sc_5p_task': 'run_seurat_for_sc_5p', 'convert_cellranger_bam_to_cram_task': 'convert_cellranger_bam_to_cram', 'library_csv_xcom_key': 'cellranger_library_csv', 'library_csv_xcom_pull_task': 'configure_cellranger_run'}) ## PIPELINE run_cellranger >> decide_analysis_branch ## TASK load_cellranger_result_to_db = \ PythonOperator( task_id='load_cellranger_result_to_db', dag=dag, queue='hpc_4G', python_callable=load_cellranger_result_to_db_func, params={ 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'cellranger_xcom_key': 'cellranger_output', 'cellranger_xcom_pull_task': 'run_cellranger', 'collection_type': 'CELLRANGER_MULTI', 'html_collection_type': 'CELLRANGER_HTML', 'collection_table': 'sample', 'xcom_collection_name_key': 'sample_igf_id', 'genome_column': 'genome_build', 'analysis_name': 'cellranger_multi', 'output_xcom_key': 'loaded_output_files', 'html_xcom_key': 'html_report_file', 'html_report_file_name': 'web_summary.html'}) upload_cellranger_report_to_ftp = \ PythonOperator( task_id='upload_cellranger_report_to_ftp', dag=dag, queue='hpc_4G', python_callable=ftp_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_cellranger_result_to_db', 'xcom_pull_files_key': 'html_report_file', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'collection_type': 'FTP_CELLRANGER_HTML', 'collection_table': 'sample', 'collect_remote_file': True}) upload_cellranger_report_to_box = \ PythonOperator( task_id='upload_cellranger_report_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'load_cellranger_result_to_db', 'xcom_pull_files_key': 'html_report_file', 'analysis_tag': 'cellranger_multi'}) upload_cellranger_results_to_irods = \ PythonOperator( task_id='upload_cellranger_results_to_irods', dag=dag, queue='hpc_4G', python_callable=irods_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_cellranger_result_to_db', 'xcom_pull_files_key': 'loaded_output_files', 'collection_name_key': 'sample_igf_id', 'collection_name_task': 'load_cellranger_result_to_db', 'analysis_name': 'cellranger_multi'}) ## PIPELINE decide_analysis_branch >> load_cellranger_result_to_db load_cellranger_result_to_db >> upload_cellranger_report_to_ftp load_cellranger_result_to_db >> upload_cellranger_report_to_box load_cellranger_result_to_db >> upload_cellranger_results_to_irods ## TASK run_scanpy_for_sc_5p = \ PythonOperator( task_id='run_scanpy_for_sc_5p', dag=dag, queue='hpc_4G', python_callable=run_singlecell_notebook_wrapper_func, params={ 'cellranger_xcom_key': 'cellranger_output', 'cellranger_xcom_pull_task': 'run_cellranger', 'scanpy_timeout': 1200, 'allow_errors': False, 'kernel_name': 'python3', 'count_dir': 'count', 'analysis_name': 'scanpy', 'output_notebook_key': 'scanpy_notebook', 'output_cellbrowser_key': 'cellbrowser_dirs', 'output_scanpy_h5ad_key': 'scanpy_h5ad', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description'}) load_cellranger_gex_matrics_to_db = \ PythonOperator( task_id='load_cellranger_gex_matrics_to_db', dag=dag, queue='hpc_4G', python_callable=load_cellranger_metrices_to_collection, params={ 'cellranger_xcom_key': 'cellranger_output', 'cellranger_xcom_pull_task': 'run_cellranger', 'collection_type': 'CELLRANGER_MULTI', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'metrics_summary_file': 'metrics_summary.csv', 'attribute_prefix': 'CELLRANGER_COUNT'}) load_scanpy_report_for_sc_5p_to_db = \ PythonOperator( task_id='load_scanpy_report_for_sc_5p_to_db', dag=dag, queue='hpc_4G', python_callable=load_analysis_files_func, params={ 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'file_name_task': 'run_scanpy_for_sc_5p', 'file_name_key': 'scanpy_notebook', 'analysis_name': 'scanpy_5p', 'collection_type': 'SCANPY_HTML', 'collection_table': 'sample', 'output_files_key': 'output_db_files'}) upload_scanpy_report_for_sc_5p_to_ftp = \ PythonOperator( task_id='upload_scanpy_report_for_sc_5p_to_ftp', dag=dag, queue='hpc_4G', python_callable=ftp_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_scanpy_report_for_sc_5p_to_db', 'xcom_pull_files_key': 'output_db_files', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'collection_type': 'FTP_SCANPY_HTML', 'collection_table': 'sample', 'collect_remote_file': True}) upload_scanpy_report_for_sc_5p_to_box = \ PythonOperator( task_id='upload_scanpy_report_for_sc_5p_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'load_scanpy_report_for_sc_5p_to_db', 'xcom_pull_files_key': 'output_db_files', 'analysis_tag': 'scanpy_single_sample_report'}) upload_cellbrowser_for_sc_5p_to_ftp = \ PythonOperator( task_id='upload_cellbrowser_for_sc_5p_to_ftp', dag=dag, queue='hpc_4G', python_callable=ftp_files_upload_for_analysis, params={ 'xcom_pull_task': 'run_scanpy_for_sc_5p', 'xcom_pull_files_key': 'cellbrowser_dirs', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'collection_type': 'FTP_CELLBROWSER', 'collection_table': 'sample', 'collect_remote_file': True}) ## PIPELINE decide_analysis_branch >> run_scanpy_for_sc_5p run_scanpy_for_sc_5p >> load_scanpy_report_for_sc_5p_to_db run_scanpy_for_sc_5p >> load_cellranger_gex_matrics_to_db load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_ftp load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_box run_scanpy_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp ## TASK run_scirpy_for_vdj_b = \ PythonOperator( task_id='run_scirpy_for_vdj_b', dag=dag, queue='hpc_4G', python_callable=run_singlecell_notebook_wrapper_func, params={ 'cellranger_xcom_key': 'cellranger_output', 'cellranger_xcom_pull_task': 'run_cellranger', 'scanpy_timeout': 1200, 'allow_errors': False, 'kernel_name': 'python3', 'analysis_name': 'scirpy', 'vdj_dir': 'vdj_b', 'count_dir': 'count', 'output_notebook_key': 'scirpy_notebook', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description'}) load_cellranger_vdjB_matrics_to_db = \ PythonOperator( task_id='load_cellranger_vdjB_matrics_to_db', dag=dag, queue='hpc_4G', python_callable=load_cellranger_metrices_to_collection, params={ 'cellranger_xcom_key': 'cellranger_output', 'cellranger_xcom_pull_task': 'run_cellranger', 'collection_type': 'CELLRANGER_MULTI', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'metrics_summary_file': 'metrics_summary.csv', 'attribute_prefix': 'CELLRANGER_VDJB'}) load_scirpy_report_for_vdj_b_to_db = \ PythonOperator( task_id='load_scirpy_report_for_vdj_b_to_db', dag=dag, queue='hpc_4G', python_callable=load_analysis_files_func, params={ 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'file_name_task': 'run_scirpy_for_vdj_b', 'file_name_key': 'scirpy_notebook', 'analysis_name': 'scirpy_vdj_b', 'collection_type': 'SCIRPY_VDJ_B_HTML', 'collection_table': 'sample', 'output_files_key': 'output_db_files'}) upload_scirpy_report_for_vdj_b_to_ftp = \ PythonOperator( task_id='upload_scirpy_report_for_vdj_b_to_ftp', dag=dag, queue='hpc_4G', python_callable=ftp_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_scirpy_report_for_vdj_b_to_db', 'xcom_pull_files_key': 'output_db_files', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'collection_type': 'FTP_SCIRPY_VDJ_B_HTML', 'collection_table': 'sample', 'collect_remote_file': True}) upload_scirpy_report_for_vdj_b_to_box = \ PythonOperator( task_id='upload_scanpy_report_for_vdj_b_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'load_scirpy_report_for_vdj_b_to_db', 'xcom_pull_files_key': 'output_db_files', 'analysis_tag': 'scirpy_vdj_b_single_sample_report'}) ## PIPELINE decide_analysis_branch >> run_scirpy_for_vdj_b run_scirpy_for_vdj_b >> load_scirpy_report_for_vdj_b_to_db run_scirpy_for_vdj_b >> load_cellranger_vdjB_matrics_to_db load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_ftp load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_box ## TASK run_scirpy_for_vdj_t = \ PythonOperator( task_id='run_scirpy_for_vdj_t', dag=dag, queue='hpc_4G', python_callable=run_singlecell_notebook_wrapper_func, params={ 'cellranger_xcom_key': 'cellranger_output', 'cellranger_xcom_pull_task': 'run_cellranger', 'scanpy_timeout': 1200, 'allow_errors': False, 'kernel_name': 'python3', 'analysis_name': 'scirpy', 'vdj_dir': 'vdj_t', 'count_dir': 'count', 'output_notebook_key': 'scirpy_notebook', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description'}) load_cellranger_vdjT_matrics_to_db = \ PythonOperator( task_id='load_cellranger_vdjT_matrics_to_db', dag=dag, queue='hpc_4G', python_callable=load_cellranger_metrices_to_collection, params={ 'cellranger_xcom_key': 'cellranger_output', 'cellranger_xcom_pull_task': 'run_cellranger', 'collection_type': 'CELLRANGER_MULTI', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'metrics_summary_file': 'metrics_summary.csv', 'attribute_prefix': 'CELLRANGER_VDJT'}) load_scirpy_report_for_vdj_t_to_db = \ PythonOperator( task_id='load_scirpy_report_for_vdj_t_to_db', dag=dag, queue='hpc_4G', python_callable=load_analysis_files_func, params={ 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'file_name_task': 'run_scirpy_for_vdj_t', 'file_name_key': 'scirpy_notebook', 'analysis_name': 'scirpy_vdj_t', 'collection_type': 'SCIRPY_VDJ_T_HTML', 'collection_table': 'sample', 'output_files_key': 'output_db_files'}) upload_scirpy_report_for_vdj_t_to_ftp = \ PythonOperator( task_id='upload_scirpy_report_for_vdj_t_to_ftp', dag=dag, queue='hpc_4G', python_callable=ftp_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_scirpy_report_for_vdj_t_to_db', 'xcom_pull_files_key': 'output_db_files', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'collection_type': 'FTP_SCIRPY_VDJ_T_HTML', 'collection_table': 'sample', 'collect_remote_file': True}) upload_scirpy_report_for_vdj_t_to_box = \ PythonOperator( task_id='upload_scirpy_report_for_vdj_t_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'load_scirpy_report_for_vdj_t_to_db', 'xcom_pull_files_key': 'output_db_files', 'analysis_tag': 'scirpy_vdj_t_single_sample_report'}) ## PIPELINE decide_analysis_branch >> run_scirpy_for_vdj_t run_scirpy_for_vdj_t >> load_scirpy_report_for_vdj_t_to_db run_scirpy_for_vdj_t >> load_cellranger_vdjT_matrics_to_db load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_ftp load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_box ## TASK run_seurat_for_sc_5p = \ PythonOperator( task_id='run_seurat_for_sc_5p', dag=dag, queue='hpc_4G', python_callable=run_singlecell_notebook_wrapper_func, params={ 'cellranger_xcom_key': 'cellranger_output', 'cellranger_xcom_pull_task': 'run_cellranger', 'scanpy_timeout': 1200, 'allow_errors': False, 'kernel_name': 'ir', 'analysis_name': 'seurat', 'vdj_dir': 'vdj', 'count_dir': 'count', 'output_notebook_key': 'seurat_notebook', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description'}) load_seurat_report_for_sc_5p_db = \ PythonOperator( task_id='load_seurat_report_for_sc_5p_db', dag=dag, queue='hpc_4G', python_callable=load_analysis_files_func, params={ 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'file_name_task': 'run_seurat_for_sc_5p', 'file_name_key': 'seurat_notebook', 'analysis_name': 'seurat_5p', 'collection_type': 'SEURAT_HTML', 'collection_table': 'sample', 'output_files_key': 'output_db_files'}) upload_seurat_report_for_sc_5p_ftp = \ PythonOperator( task_id='upload_seurat_report_for_sc_5p_ftp', dag=dag, queue='hpc_4G', python_callable=ftp_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_seurat_report_for_sc_5p_db', 'xcom_pull_files_key': 'output_db_files', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'collection_type': 'FTP_SEURAT_HTML', 'collection_table': 'sample', 'collect_remote_file': True}) upload_seurat_report_for_sc_5p_to_box = \ PythonOperator( task_id='upload_seurat_report_for_sc_5p_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'load_seurat_report_for_sc_5p_db', 'xcom_pull_files_key': 'output_db_files', 'analysis_tag': 'seurat_single_sample_report'}) ## PIPELINE decide_analysis_branch >> run_seurat_for_sc_5p run_seurat_for_sc_5p >> load_seurat_report_for_sc_5p_db load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_ftp load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_to_box ## TASK convert_cellranger_bam_to_cram = \ PythonOperator( task_id='convert_cellranger_bam_to_cram', dag=dag, queue='hpc_4G4t', python_callable=convert_bam_to_cram_func, params={ 'xcom_pull_files_key': 'cellranger_output', 'xcom_pull_task': 'run_cellranger', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'threads': 4, 'cellranger_bam_path': 'count/sample_alignments.bam', 'analysis_name': 'cellranger', 'collection_type': 'ANALYSIS_CRAM', 'collection_table': 'sample', 'cram_files_xcom_key': 'cram_files'}) ## PIPELINE decide_analysis_branch >> convert_cellranger_bam_to_cram ## TASK generate_cell_sorted_bam = \ PythonOperator( task_id='generate_cell_sorted_bam', dag=dag, queue='hpc_16G8t', python_callable=generate_cell_sorted_bam_func, params={ 'xcom_pull_task': 'run_cellranger', 'xcom_pull_files_key': 'cellranger_output', 'cellranger_bam_path': 'count/sample_alignments.bam', 'cellsorted_bam_path': 'count/cellsorted_sample_alignments.bam', 'samtools_mem': '2G', 'threads': 7}) run_velocyto = \ PythonOperator( task_id='run_velocyto', queue='hpc_16G_long', python_callable=run_velocyto_func, params={ 'xcom_pull_task': 'run_cellranger', 'xcom_pull_files_key': 'cellranger_output', 'cell_sorted_bam_name': 'count/cellsorted_sample_alignments.bam', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description' }) run_scvelo_for_sc_5p = \ PythonOperator( task_id='run_scvelo_for_sc_5p', dag=dag, queue='hpc_32G16t', python_callable=run_scvelo_for_sc_5p_func, params={ 'xcom_pull_task': 'run_cellranger', 'xcom_pull_files_key': 'cellranger_output', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'loom_file_key': 'loom_output', 'loom_file_task': 'run_velocyto', 'scanpy_h5ad_task':'run_scanpy_for_sc_5p', 'scanpy_h5ad_key': 'scanpy_h5ad', 'timeout': 2400, 'allow_errors': False, 'cpu_threads': 14, 'output_notebook_key': 'scvelo_notebook'}) load_loom_file_to_rds = \ PythonOperator( task_id='load_loom_file_to_rds', dag=dag, queue='hpc_4G', python_callable=load_analysis_files_func, params={ 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'file_name_task': 'run_velocyto', 'file_name_key': 'loom_output', 'analysis_name': 'velocyto_5p', 'collection_type': 'VELOCYTO_LOOM', 'collection_table': 'sample', 'output_files_key': 'output_db_files'}) upload_loom_file_to_irods = \ PythonOperator( task_id='upload_loom_file_to_irods', dag=dag, queue='hpc_4G', python_callable=irods_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_loom_file_to_rds', 'xcom_pull_files_key': 'output_db_files', 'collection_name_key': 'sample_igf_id', 'collection_name_task': 'load_cellranger_result_to_db', 'analysis_name': 'velocyto_loom'}) load_scvelo_report_to_rds = \ PythonOperator( task_id='load_scvelo_report_to_rds', dag=dag, queue='hpc_4G', python_callable=load_analysis_files_func, params={ 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'file_name_task': 'run_scvelo_for_sc_5p', 'file_name_key': 'scvelo_notebook', 'analysis_name': 'scvelo_5p', 'collection_type': 'SCVELO_HTML', 'collection_table': 'sample', 'output_files_key': 'output_db_files'}) upload_scvelo_report_to_ftp = \ PythonOperator( task_id='upload_scvelo_report_to_ftp', dag=dag, queue='hpc_4G', python_callable=ftp_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_scvelo_report_to_rds', 'xcom_pull_files_key': 'output_db_files', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'collection_type': 'FTP_SCVELO_HTML', 'collection_table': 'sample', 'collect_remote_file': True}) upload_scvelo_report_to_box = \ PythonOperator( task_id='upload_scvelo_report_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'load_scvelo_report_to_rds', 'xcom_pull_files_key': 'output_db_files', 'analysis_tag': 'scvelo_single_sample_report'}) ## PIPELINE convert_cellranger_bam_to_cram >> generate_cell_sorted_bam generate_cell_sorted_bam >> run_velocyto run_velocyto >> run_scvelo_for_sc_5p run_scanpy_for_sc_5p >> run_scvelo_for_sc_5p run_velocyto >> load_loom_file_to_rds load_loom_file_to_rds >> upload_loom_file_to_irods run_scvelo_for_sc_5p >> load_scvelo_report_to_rds load_scvelo_report_to_rds >> upload_scvelo_report_to_ftp load_scvelo_report_to_rds >> upload_scvelo_report_to_box ## TASK copy_bam_for_parallel_runs = \ BranchPythonOperator( task_id='copy_bam_for_parallel_runs', dag=dag, queue='hpc_4G', python_callable=index_and_copy_bam_for_parallel_analysis, params={ 'xcom_pull_files_key': 'cellranger_output', 'xcom_pull_task': 'run_cellranger', 'cellranger_bam_path': 'count/sample_alignments.bam', 'list_of_tasks': [ 'run_picard_alignment_summary', 'run_picard_qual_summary', 'run_picard_rna_summary', 'run_picard_gc_summary', 'run_picard_base_dist_summary', 'run_samtools_stats']}) ## TASK upload_cram_to_irods = \ PythonOperator( task_id='upload_cram_to_irods', dag=dag, queue='hpc_4G', python_callable=irods_files_upload_for_analysis, params={ 'xcom_pull_task': 'convert_cellranger_bam_to_cram', 'xcom_pull_files_key': 'cram_files', 'collection_name_key': 'sample_igf_id', 'collection_name_task': 'load_cellranger_result_to_db', 'analysis_name': 'cellranger_multi'}) ## PIPELINE #convert_cellranger_bam_to_cram >> copy_bam_for_parallel_runs # we need to load metrics to cram generate_cell_sorted_bam >> copy_bam_for_parallel_runs convert_cellranger_bam_to_cram >> upload_cram_to_irods ## TASK run_picard_alignment_summary = \ PythonOperator( task_id='run_picard_alignment_summary', dag=dag, queue='hpc_4G', python_callable=run_picard_for_cellranger, params={ 'xcom_pull_files_key': 'run_picard_alignment_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'load_metrics_to_cram': True, 'java_param': '-Xmx4g', 'picard_command': 'CollectAlignmentSummaryMetrics', 'picard_option': {}, 'analysis_files_xcom_key': 'picard_alignment_summary', 'bam_files_xcom_key': None}) ## PIPELINE copy_bam_for_parallel_runs >> run_picard_alignment_summary ## TASK cleanup_picard_alignment_summary_input = \ PythonOperator( task_id='cleanup_picard_alignment_summary_input', dag=dag, queue='hpc_4G', python_callable=clean_up_files, params={ 'xcom_pull_files_key': 'run_picard_alignment_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs'}) ## PIPELINE run_picard_alignment_summary >> cleanup_picard_alignment_summary_input ## TASK upload_picard_alignment_summary_to_box = \ PythonOperator( task_id='upload_picard_alignment_summary_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'run_picard_alignment_summary', 'xcom_pull_files_key': 'picard_alignment_summary', 'analysis_tag': 'Picard-CollectAlignmentSummaryMetrics'}) ## PIPELINE run_picard_alignment_summary >> upload_picard_alignment_summary_to_box ## TASK run_picard_qual_summary = \ PythonOperator( task_id='run_picard_qual_summary', dag=dag, queue='hpc_4G', python_callable=run_picard_for_cellranger, params={ 'xcom_pull_files_key': 'run_picard_qual_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'load_metrics_to_cram': True, 'java_param': '-Xmx4g', 'picard_command': 'QualityScoreDistribution', 'picard_option': {}, 'analysis_files_xcom_key': 'picard_qual_summary', 'bam_files_xcom_key': None}) ## PIPELINE copy_bam_for_parallel_runs >> run_picard_qual_summary ## TASK cleanup_picard_qual_summary_input = \ PythonOperator( task_id='cleanup_picard_qual_summary_input', dag=dag, queue='hpc_4G', python_callable=clean_up_files, params={ 'xcom_pull_files_key': 'run_picard_qual_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs'}) ## PIPELINE run_picard_qual_summary >> cleanup_picard_qual_summary_input ## TASK upload_picard_qual_summary_to_box = \ PythonOperator( task_id='upload_picard_qual_summary_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'run_picard_qual_summary', 'xcom_pull_files_key': 'picard_qual_summary', 'analysis_tag': 'Picard-QualityScoreDistribution'}) ## PIPELINE run_picard_qual_summary >> upload_picard_qual_summary_to_box ## TASK run_picard_rna_summary = \ PythonOperator( task_id='run_picard_rna_summary', dag=dag, queue='hpc_8G', python_callable=run_picard_for_cellranger, params={ 'xcom_pull_files_key': 'run_picard_rna_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'load_metrics_to_cram': True, 'java_param': '-Xmx7g', 'picard_command': 'CollectRnaSeqMetrics', 'picard_option': {}, 'analysis_files_xcom_key': 'picard_rna_summary', 'bam_files_xcom_key': None}) ## PIPELINE copy_bam_for_parallel_runs >> run_picard_rna_summary ## TASK cleanup_picard_rna_summary_input = \ PythonOperator( task_id='cleanup_picard_rna_summary_input', dag=dag, queue='hpc_4G', python_callable=clean_up_files, params={ 'xcom_pull_files_key': 'run_picard_rna_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs'}) ## PIPELINE run_picard_rna_summary >> cleanup_picard_rna_summary_input ## TASK upload_picard_rna_summary_to_box = \ PythonOperator( task_id='upload_picard_rna_summary_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'run_picard_rna_summary', 'xcom_pull_files_key': 'picard_rna_summary', 'analysis_tag': 'Picard-CollectRnaSeqMetrics'}) ## PIPELINE run_picard_rna_summary >> upload_picard_rna_summary_to_box ## TASK run_picard_gc_summary = \ PythonOperator( task_id='run_picard_gc_summary', dag=dag, queue='hpc_4G', python_callable=run_picard_for_cellranger, params={ 'xcom_pull_files_key': 'run_picard_gc_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'load_metrics_to_cram': True, 'java_param': '-Xmx4g', 'picard_command': 'CollectGcBiasMetrics', 'picard_option': {}, 'analysis_files_xcom_key': 'picard_gc_summary', 'bam_files_xcom_key': None}) ## PIPELINE copy_bam_for_parallel_runs >> run_picard_gc_summary ## TASK cleanup_picard_gc_summary_input = \ PythonOperator( task_id='cleanup_picard_gc_summary_input', dag=dag, queue='hpc_4G', python_callable=clean_up_files, params={ 'xcom_pull_files_key': 'run_picard_gc_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs'}) ## PIPELINE run_picard_gc_summary >> cleanup_picard_gc_summary_input ## TASK upload_picard_gc_summary_to_box = \ PythonOperator( task_id='upload_picard_gc_summary_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'run_picard_gc_summary', 'xcom_pull_files_key': 'picard_gc_summary', 'analysis_tag': 'Picard-CollectGcBiasMetrics'}) ## PIPELINE run_picard_gc_summary >> upload_picard_gc_summary_to_box ## TASK run_picard_base_dist_summary = \ PythonOperator( task_id='run_picard_base_dist_summary', dag=dag, queue='hpc_4G', python_callable=run_picard_for_cellranger, params={ 'xcom_pull_files_key': 'run_picard_base_dist_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'load_metrics_to_cram': True, 'java_param': '-Xmx4g', 'picard_command': 'CollectBaseDistributionByCycle', 'picard_option': {}, 'analysis_files_xcom_key': 'picard_base_summary', 'bam_files_xcom_key': None}) ## PIPELINE copy_bam_for_parallel_runs >> run_picard_base_dist_summary ## TASK cleanup_picard_base_dist_summary_input = \ PythonOperator( task_id='cleanup_picard_base_dist_summary_input', dag=dag, queue='hpc_4G', python_callable=clean_up_files, params={ 'xcom_pull_files_key': 'run_picard_base_dist_summary', 'xcom_pull_task': 'copy_bam_for_parallel_runs'}) ## PIPELINE run_picard_base_dist_summary >> cleanup_picard_base_dist_summary_input ## TASK upload_picard_base_dist_summary_to_box = \ PythonOperator( task_id='upload_picard_base_dist_summary_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'run_picard_base_dist_summary', 'xcom_pull_files_key': 'picard_base_summary', 'analysis_tag': 'Picard-CollectBaseDistributionByCycle'}) ## PIPELINE run_picard_base_dist_summary >> upload_picard_base_dist_summary_to_box ## TASK run_samtools_stats = \ PythonOperator( task_id='run_samtools_stats', dag=dag, queue='hpc_4G4t', python_callable=run_samtools_for_cellranger, params={ 'xcom_pull_files_key': 'run_samtools_stats', 'xcom_pull_task': 'copy_bam_for_parallel_runs', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'load_metrics_to_cram': True, 'samtools_command': 'stats', 'threads': 4, 'analysis_files_xcom_key': 'samtools_stats'}) ## PIPELINE copy_bam_for_parallel_runs >> run_samtools_stats ## TASK upload_samtools_stats_to_box = \ PythonOperator( task_id='upload_samtools_stats_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'run_samtools_stats', 'xcom_pull_files_key': 'samtools_stats', 'analysis_tag': 'Samtools-stats'}) ## PIPELINE run_samtools_stats >> upload_samtools_stats_to_box ## TASK run_samtools_idxstats = \ PythonOperator( task_id='run_samtools_idxstats', dag=dag, queue='hpc_4G4t', python_callable=run_samtools_for_cellranger, params={ 'xcom_pull_files_key': 'run_samtools_stats', 'xcom_pull_task': 'copy_bam_for_parallel_runs', 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'load_metrics_to_cram': True, 'samtools_command': 'idxstats', 'threads': 4, 'analysis_files_xcom_key': 'samtools_idxstats'}) ## PIPELINE run_samtools_stats >> run_samtools_idxstats ## TASK cleanup_samtools_stats_input = \ PythonOperator( task_id='cleanup_samtools_stats_input', dag=dag, queue='hpc_4G', python_callable=clean_up_files, params={ 'xcom_pull_files_key': 'run_samtools_stats', 'xcom_pull_task': 'copy_bam_for_parallel_runs'}) ## PIPELINE run_samtools_idxstats >> cleanup_samtools_stats_input ## TASK upload_samtools_idxstats_to_box = \ PythonOperator( task_id='upload_samtools_idxstats_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'run_samtools_idxstats', 'xcom_pull_files_key': 'samtools_idxstats', 'analysis_tag': 'Samtools-idxstats'}) ## PIPELINE run_samtools_idxstats >> upload_samtools_idxstats_to_box ## TASK run_multiqc = \ PythonOperator( task_id='run_multiqc', dag=dag, queue='hpc_4G', trigger_rule='none_failed_or_skipped', python_callable=run_multiqc_for_cellranger, params={ 'list_of_analysis_xcoms_and_tasks': { 'run_cellranger': 'cellranger_output', 'run_picard_alignment_summary': 'picard_alignment_summary', 'run_picard_qual_summary': 'picard_qual_summary', 'run_picard_rna_summary': 'picard_rna_summary', 'run_picard_gc_summary': 'picard_gc_summary', 'run_picard_base_dist_summary': 'picard_base_summary', 'run_samtools_stats': 'samtools_stats', 'run_samtools_idxstats': 'samtools_idxstats'}, 'analysis_description_xcom_pull_task': 'fetch_analysis_info', 'analysis_description_xcom_key': 'analysis_description', 'use_ephemeral_space': True, 'multiqc_html_file_xcom_key': 'multiqc_html', 'multiqc_data_file_xcom_key': 'multiqc_data', 'tool_order_list': ['picad', 'samtools']}) ## PIPELINE run_picard_alignment_summary >> run_multiqc run_picard_qual_summary >> run_multiqc run_picard_rna_summary >> run_multiqc run_picard_gc_summary >> run_multiqc run_picard_base_dist_summary >> run_multiqc run_samtools_idxstats >> run_multiqc ## TASK load_multiqc_html = \ PythonOperator( task_id='load_multiqc_html', dag=dag, queue='hpc_4G', python_callable=load_analysis_files_func, params={ 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'file_name_task': 'run_multiqc', 'file_name_key': 'multiqc_html', 'analysis_name': 'multiqc', 'collection_type': 'MULTIQC_HTML', 'collection_table': 'sample', 'output_files_key': 'output_db_files'}) ## PIPELINE run_multiqc >> load_multiqc_html ## TASK upload_multiqc_to_ftp = \ PythonOperator( task_id='upload_multiqc_to_ftp', dag=dag, queue='hpc_4G', python_callable=ftp_files_upload_for_analysis, params={ 'xcom_pull_task': 'load_multiqc_html', 'xcom_pull_files_key': 'output_db_files', 'collection_name_task': 'load_cellranger_result_to_db', 'collection_name_key': 'sample_igf_id', 'collection_type': 'FTP_MULTIQC_HTML', 'collection_table': 'sample', 'collect_remote_file': True}) ## PIPELINE load_multiqc_html >> upload_multiqc_to_ftp ## TASK upload_multiqc_to_box = \ PythonOperator( task_id='upload_multiqc_to_box', dag=dag, queue='hpc_4G', python_callable=upload_analysis_file_to_box, params={ 'xcom_pull_task': 'load_multiqc_html', 'xcom_pull_files_key': 'output_db_files', 'analysis_tag': 'multiqc_report'}) ## PIPELINE load_multiqc_html >> upload_multiqc_to_box ## TASK update_analysis_and_status = \ PythonOperator( task_id='update_analysis_and_status', dag=dag, queue='hpc_4G', python_callable=change_pipeline_status, trigger_rule='none_failed_or_skipped', params={ 'new_status': 'FINISHED', 'no_change_status': 'SEEDED'}) ## PIPELINE upload_multiqc_to_ftp >> update_analysis_and_status upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status #upload_scirpy_report_for_vdj_to_ftp >> update_analysis_and_status # no more ambiguous VDJ #upload_scirpy_report_for_vdj_to_box >> update_analysis_and_status upload_scirpy_report_for_vdj_b_to_ftp >> update_analysis_and_status upload_scirpy_report_for_vdj_b_to_box >> update_analysis_and_status upload_scirpy_report_for_vdj_t_to_ftp >> update_analysis_and_status upload_scirpy_report_for_vdj_t_to_box >> update_analysis_and_status upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status upload_cellranger_results_to_irods >> update_analysis_and_status upload_cellranger_report_to_ftp >> update_analysis_and_status upload_cellranger_report_to_box >> update_analysis_and_status upload_cram_to_irods >> update_analysis_and_status upload_scvelo_report_to_box >> update_analysis_and_status upload_scvelo_report_to_ftp >> update_analysis_and_status ## TASK update_qc_pages = \ PythonOperator( task_id='update_qc_pages', dag=dag, queue='hpc_4G', python_callable=create_and_update_qc_pages, params={ 'use_ephemeral_space': True, 'collection_type_list': [ 'FTP_MULTIQC_HTML', 'FTP_SEURAT_HTML', 'FTP_SCIRPY_VDJ_T_HTML', 'FTP_SCIRPY_VDJ_B_HTML', 'FTP_SCIRPY_VDJ_HTML', 'FTP_CELLBROWSER', 'FTP_SCANPY_HTML', 'FTP_SCVELO_HTML', 'FTP_CELLRANGER_HTML']}) ## PIPELINE update_analysis_and_status >> update_qc_pages