浏览代码

changes to dag8

Avik Datta 4 年之前
父节点
当前提交
29b68e8933
共有 1 个文件被更改,包括 18 次插入7 次删除
  1. 18 7
      dags/dag8_test2.py

+ 18 - 7
dags/dag8_test2.py

@@ -4,6 +4,7 @@ from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
+from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 
 default_args = {
@@ -38,13 +39,23 @@ with dag:
       dag = dag,
       ssh_hook = orwell_ssh_hook,
       queue='hpc_4G',
-      command = 'python /home/igf/igf_code/t1.py'
-    )
-  copy_files_to_hpc = \
+      command = 'python /home/igf/igf_code/t1.py')
+  branch_to = \
+    BranchPythonOperator(
+      task_id='calculate_new_worker_size_and_branch',
+      dag=dag,
+      python_callable=lambda: ['copy_files_to_hpc1','copy_files_to_hpc2'],
+      queue='hpc_4G')
+  copy_files_to_hpc1 = \
     BashOperator(
-      task_id = 'copy_files_to_hpc',
+      task_id = 'copy_files_to_hpc1',
       dag = dag,
       queue='hpc_4G',
-      bash_command = 'bash /project/tgu/data2/airflow_test/github/t2.sh '
-    )
-  generate_file_list >> copy_files_to_hpc
+      bash_command = 'bash /project/tgu/data2/airflow_test/github/t1.sh ')
+  copy_files_to_hpc2 = \
+    BashOperator(
+      task_id = 'copy_files_to_hpc2',
+      dag = dag,
+      queue='hpc_4G',
+      bash_command = 'bash /project/tgu/data2/airflow_test/github/t2.sh ')
+  generate_file_list >> branch_to >> [copy_files_to_hpc1,copy_files_to_hpc2]