Avik Datta 3 anos atrás
pai
commit
ae4ac56592
2 arquivos alterados com 76 adições e 0 exclusões
  1. 1 0
      airflow_var/var.json
  2. 75 0
      dags/dag18_upload_and_trigger_analysis.py

+ 1 - 0
airflow_var/var.json

@@ -45,6 +45,7 @@
   "eliot_server_hostname": "eliot.med.ic.ac.uk",
   "igf_lims_server_hostname": "igf-lims.cc.ic.ac.uk",
   "tenx_single_cell_immune_profiling_feature_types": {"gene_expression": "", "vdj-t": "", "vdj-b": "", "antibody_capture": "", "antigen_capture": "", "crisper_guide_capture": ""},
+  "analysis_dag_list": {"dag9_tenx_single_cell_immune_profiling": "", "dag12_nextflow_chipseq_pipeline": "", "dag10_nextflow_atacseq_pipeline": ""},
   "scanpy_single_sample_template": "/project/tgu/software/scanpy-notebook-image/templates/scanpy_single_sample_analysis_v0.0.6.ipynb",
   "scvelo_single_sample_template": "/project/tgu/software/scanpy-notebook-image/templates/scvelo_basic_single_sample_v0.0.1.ipynb",
   "scanpy_notebook_image": "/project/tgu/resources/pipeline_resource/singularity_images/scanpy-notebook-image/scanpy-notebook-image_v0.0.4.sif",

+ 75 - 0
dags/dag18_upload_and_trigger_analysis.py

@@ -0,0 +1,75 @@
+import os
+from datetime import timedelta
+from airflow.models import DAG, Variable
+from airflow.utils.dates import days_ago
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dagrun_operator import TriggerDagRunOperator
+#from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(2),
+    'retries': 1,
+    'retry_delay': timedelta(minutes=5),
+    'provide_context': True,
+    'email_on_failure': False,
+    'email_on_retry': False,
+    'catchup': False,
+    'max_active_runs': 1}
+DAG_ID = \
+    os.path.basename(__file__).\
+        replace(".pyc", "").\
+        replace(".py", "")
+dag = \
+    DAG(
+        dag_id=DAG_ID,
+        schedule_interval=None,
+        default_args=args,
+        tags=['hpc'])
+ANALYSIS_LIST = \
+    Variable.get("analysis_dag_list", default_var={})
+with dag:
+    ## TASK
+    find_analysis_designs = \
+        BranchPythonOperator(
+            task_id="find_analysis_designs",
+            dag=dag,
+            queue='hpc_4G',
+            params={},
+            python_callable=None)
+    ## TASK
+    load_analysis_design_tasks = list()
+    for i in range(1, 21):
+        t = \
+            PythonOperator(
+                task_id="load_analysis_design_{i}".format(i),
+                dag=dag,
+                params={},
+                python_callable=None)
+        load_analysis_design_tasks.append(t)
+    ## TASK
+    find_analysis_to_trigger_dags = \
+        BranchPythonOperator(
+            task_id="find_analysis_to_trigger_dags",
+            dag=dag,
+            queue='hpc_4G',
+            params={},
+            python_callable=None)
+    ## TASK
+    trigger_analysis_dag_tasks = list()
+    for analysis_name in ANALYSIS_LIST.keys():
+        for i in (range(1, 21)):
+            t = \
+                TriggerDagRunOperator(
+                    task_id="trigger_{0}_{0}".format(analysis_name, i),
+                    dag=dag,
+                    trigger_dag_id=analysis_name,
+                    queue='hpc_4G',
+                    params={},
+                    python_callable=None)
+            trigger_analysis_dag_tasks.append(t)
+    ## PIPELINE
+    find_analysis_designs >> load_analysis_design_tasks 
+    load_analysis_design_tasks >> find_analysis_to_trigger_dags
+    find_analysis_to_trigger_dags >> trigger_analysis_dag_tasks