|
@@ -0,0 +1,56 @@
|
|
|
|
+import os
|
|
|
|
+from datetime import timedelta
|
|
|
|
+from airflow.models import DAG
|
|
|
|
+from airflow.utils.dates import days_ago
|
|
|
|
+from airflow.operators.python_operator import PythonOperator
|
|
|
|
+from airflow.operators.python_operator import BranchPythonOperator
|
|
|
|
+from airflow.operators.bash_operator import BashOperator
|
|
|
|
+from igf_airflow.utils.dag17_create_transcriptome_ref_utils import download_gtf_file_func
|
|
|
|
+from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_star_index_func
|
|
|
|
+
|
|
|
|
+args = {
|
|
|
|
+ 'owner': 'airflow',
|
|
|
|
+ 'start_date': days_ago(2),
|
|
|
|
+ 'retries': 1,
|
|
|
|
+ 'retry_delay': timedelta(minutes=5),
|
|
|
|
+ 'provide_context': True,
|
|
|
|
+ 'email_on_failure': False,
|
|
|
|
+ 'email_on_retry': False,
|
|
|
|
+ 'catchup': False,
|
|
|
|
+ 'max_active_runs': 1}
|
|
|
|
+DAG_ID = \
|
|
|
|
+ os.path.basename(__file__).\
|
|
|
|
+ replace(".pyc", "").\
|
|
|
|
+ replace(".py", "")
|
|
|
|
+dag = \
|
|
|
|
+ DAG(
|
|
|
|
+ dag_id=DAG_ID,
|
|
|
|
+ schedule_interval=None,
|
|
|
|
+ default_args=args,
|
|
|
|
+ tags=['hpc'])
|
|
|
|
+with dag:
|
|
|
|
+ ## TASK
|
|
|
|
+ download_gtf_file = \
|
|
|
|
+ PythonOperator(
|
|
|
|
+ task_id="download_gtf_file",
|
|
|
|
+ dag=dag,
|
|
|
|
+ queue='hpc_4G',
|
|
|
|
+ params={
|
|
|
|
+ 'gtf_xcom_key': 'gtf_file'
|
|
|
|
+ },
|
|
|
|
+ python_callable=download_gtf_file_func)
|
|
|
|
+ ## TASK
|
|
|
|
+ create_star_index = \
|
|
|
|
+ PythonOperator(
|
|
|
|
+ task_id="create_star_index",
|
|
|
|
+ dag=dag,
|
|
|
|
+ queue='hpc_32G8t',
|
|
|
|
+ params={
|
|
|
|
+ 'gtf_xcom_task': 'download_gtf_file',
|
|
|
|
+ 'gtf_xcom_key': 'gtf_file',
|
|
|
|
+ 'star_ref_xcom_key': 'star_ref',
|
|
|
|
+ 'threads': 8,
|
|
|
|
+ 'star_options': [
|
|
|
|
+ '--sjdbOverhang', 149]
|
|
|
|
+ },
|
|
|
|
+ python_callable=create_star_index_func)
|