Forráskód Böngészése

updated dag and added hpc script for worker submission

Avik Datta 4 éve
szülő
commit
6adb786a2b
2 módosított fájl, 18 hozzáadás és 3 törlés
  1. 10 3
      dags/dag1_calculate_hpc_worker.py
  2. 8 0
      scripts/hpc/airflow_worker.sh

+ 10 - 3
dags/dag1_calculate_hpc_worker.py

@@ -89,11 +89,18 @@ with dag:
   for q,data in hpc_queue_list.items():
     pbs_resource = data.get('pbs_resource')
     airflow_queue = data.get('airflow_queue')
-    t = BashOperator(
+    t = SSHOperator(
       task_id=q,
+      ssh_hook=hpc_hook,
       dag=dag,
       queue='igf-lims',
-      bash_command='echo $pbs_resource $airflow_queue',
-      envs={'pbs_resource':pbs_resource,'airflow_queue':airflow_queue}
+      command="""
+      {% if ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) > 1 %}
+        echo "qsub -V -N {{ params.job_name }} -J 1-{{ ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) }}  {{ params.pbs_resource }} -- /script/airflow_worker.sh {{  params.airflow_queue }} {{ params.job_name }}" 
+      {% else %}
+        echo "qsub -V -N {{ params.job_name }} {{ params.pbs_resource }} -- /script/airflow_worker.sh {{  params.airflow_queue }} {{ params.job_name }}"
+      {% endif %}
+      """,
+      params={'pbs_resource':pbs_resource,'airflow_queue':airflow_queue,'job_name':q}
     )
     calculate_new_worker_size_and_branch >> t

+ 8 - 0
scripts/hpc/airflow_worker.sh

@@ -0,0 +1,8 @@
+#!/bin/bash
+
+job_queue=${1:?'Missing job queue'}
+job_name=${1:?'Missing job name'}
+
+source /path/hpc_env.sh
+
+airflow worker --pid $TMPDIR -cn hpc-${PBS_JOBID}-${job_name} -q ${job_queue}