123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import os
- from datetime import timedelta
- from airflow.models import DAG
- from airflow.utils.dates import days_ago
- from airflow.operators.python_operator import PythonOperator
- from airflow.operators.python_operator import BranchPythonOperator
- from airflow.operators.bash_operator import BashOperator
- from igf_airflow.utils.dag17_create_transcriptome_ref_utils import download_gtf_file_func
- from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_star_index_func
- from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_rsem_index_func
- from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_reflat_index_func
- from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_ribosomal_interval_func
- from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_cellranger_ref_func
- from igf_airflow.utils.dag17_create_transcriptome_ref_utils import add_refs_to_db_collection_func
- args = {
- 'owner': 'airflow',
- 'start_date': days_ago(2),
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5),
- 'provide_context': True,
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'catchup': False,
- 'max_active_runs': 1}
- DAG_ID = \
- os.path.basename(__file__).\
- replace(".pyc", "").\
- replace(".py", "")
- dag = \
- DAG(
- dag_id=DAG_ID,
- schedule_interval=None,
- default_args=args,
- tags=['hpc'])
- with dag:
- ## TASK
- download_gtf_file = \
- PythonOperator(
- task_id="download_gtf_file",
- dag=dag,
- queue='hpc_4G',
- params={
- 'gtf_xcom_key': 'gtf_file'
- },
- python_callable=download_gtf_file_func)
- ## TASK
- create_star_index = \
- PythonOperator(
- task_id="create_star_index",
- dag=dag,
- queue='hpc_42G16t',
- params={
- 'gtf_xcom_task': 'download_gtf_file',
- 'gtf_xcom_key': 'gtf_file',
- 'star_ref_xcom_key': 'star_ref',
- 'threads': 12,
- 'star_options': [
- '--sjdbOverhang', 149]
- },
- python_callable=create_star_index_func)
- ## TASK
- create_rsem_index = \
- PythonOperator(
- task_id="create_rsem_index",
- dag=dag,
- queue='hpc_4G',
- params={
- 'gtf_xcom_task': 'download_gtf_file',
- 'gtf_xcom_key': 'gtf_file',
- 'rsem_ref_xcom_key': 'rsem_ref',
- 'threads': 8
- },
- python_callable=create_rsem_index_func)
- ## TASK
- create_reflat_index = \
- PythonOperator(
- task_id='create_reflat_index',
- dag=dag,
- queue='hpc_4G',
- params={
- 'gtf_xcom_task': 'download_gtf_file',
- 'gtf_xcom_key': 'gtf_file',
- 'refflat_ref_xcom_key': 'refflat_ref'
- },
- python_callable=create_reflat_index_func)
- ## TASK
- create_ribosomal_interval = \
- PythonOperator(
- task_id='create_ribosomal_interval',
- dag=dag,
- queue='hpc_4G',
- params={
- 'gtf_xcom_task': 'download_gtf_file',
- 'gtf_xcom_key': 'gtf_file',
- 'ribosomal_ref_xcom_key': 'ribosomal_ref',
- 'skip_gtf_rows': 5
- },
- python_callable=create_ribosomal_interval_func)
- ## TASK
- create_cellranger_ref = \
- PythonOperator(
- task_id='create_cellranger_ref',
- dag=dag,
- queue='hpc_32G8t',
- params={
- 'gtf_xcom_task': 'download_gtf_file',
- 'gtf_xcom_key': 'gtf_file',
- 'cellranger_ref_xcom_key': 'cellranger_ref',
- 'skip_gtf_rows': 5,
- 'threads': 7,
- 'memory': 30
- },
- python_callable=create_cellranger_ref_func)
- ## TASK
- add_refs_to_db_collection = \
- PythonOperator(
- task_id='add_refs_to_db_collection',
- dag=dag,
- queue='hpc_4G',
- params={
- 'task_list': [
- ['download_gtf_file', 'gtf_file', 'GENE_GTF'],
- ['create_star_index', 'star_ref', 'TRANSCRIPTOME_STAR'],
- ['create_rsem_index', 'rsem_ref', 'TRANSCRIPTOME_RSEM'],
- ['create_reflat_index', 'refflat_ref', 'GENE_REFFLAT'],
- ['create_ribosomal_interval', 'ribosomal_ref', 'RIBOSOMAL_INTERVAL'],
- ['create_cellranger_ref', 'cellranger_ref', 'TRANSCRIPTOME_TENX']
- ]
- },
- python_callable=add_refs_to_db_collection_func)
- ## PIPELINE
- download_gtf_file >> create_star_index >> add_refs_to_db_collection
- download_gtf_file >> create_rsem_index >> add_refs_to_db_collection
- download_gtf_file >> create_reflat_index >> add_refs_to_db_collection
- download_gtf_file >> create_ribosomal_interval >> add_refs_to_db_collection
- download_gtf_file >> create_cellranger_ref >> add_refs_to_db_collection
|