dag16_test_illumina_demultiplexing.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import os
  2. from datetime import timedelta
  3. from airflow.models import DAG
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from igf_airflow.utils.dag16_test_illumina_demultiplexing_utils import get_samplesheet_and_decide_flow_func
  8. from igf_airflow.utils.dag16_test_illumina_demultiplexing_utils import run_demultiplexing_func
  9. from igf_airflow.utils.dag16_test_illumina_demultiplexing_utils import prepare_merged_report_func
  10. args = {
  11. 'owner': 'airflow',
  12. 'start_date': days_ago(2),
  13. 'retries': 1,
  14. 'retry_delay': timedelta(minutes=5),
  15. 'provide_context': True,
  16. 'email_on_failure': False,
  17. 'email_on_retry': False,
  18. 'catchup': False,
  19. 'max_active_runs': 1}
  20. DAG_ID = \
  21. os.path.basename(__file__).\
  22. replace(".pyc", "").\
  23. replace(".py", "")
  24. dag = \
  25. DAG(
  26. dag_id=DAG_ID,
  27. schedule_interval=None,
  28. default_args=args,
  29. tags=['hpc'])
  30. with dag:
  31. ## TASK
  32. get_samplesheet_and_decide_flow = \
  33. BranchPythonOperator(
  34. task_id="get_samplesheet_and_decide_flow",
  35. dag=dag,
  36. queue='hpc_4G',
  37. params={
  38. 'demult_task_prefix': 'run_demult_for_lane',
  39. 'runParameters_xml_file_name': 'runParameters.xml',
  40. 'runinfo_xml_file_name': 'RunInfo.xml',
  41. 'output_path_xcom_key': 'temp_output_path',
  42. 'samplesheet_xcom_key': 'formatted_samplesheets'},
  43. python_callable=get_samplesheet_and_decide_flow_func)
  44. ## TASK
  45. prepare_merged_report = \
  46. PythonOperator(
  47. task_id='prepare_merged_report',
  48. dag=dag,
  49. queue='hpc_4G',
  50. params={
  51. 'output_path_xcom_task': 'get_samplesheet_and_decide_flow',
  52. 'samplesheet_xcom_key': 'formatted_samplesheets',
  53. 'output_path_xcom_key': 'temp_output_path',
  54. 'script_path': '/project/tgu/software/demultiplexing_report/scripts/generate_illumina_report.py',
  55. 'template_path': '/project/tgu/software/demultiplexing_report/template/illumina_report_v1.html',
  56. 'code_dir': '/project/tgu/software/demultiplexing_report/'},
  57. python_callable=prepare_merged_report_func)
  58. ## TASK
  59. for i in range(1, 9):
  60. t = \
  61. PythonOperator(
  62. task_id='{0}_{1}'.format('run_demult_for_lane', i),
  63. dag=dag,
  64. queue='hpc_4G',
  65. params={
  66. 'lane_id': i,
  67. 'runinfo_xml_file_name': 'RunInfo.xml',
  68. 'threads': 1,
  69. 'samplesheet_xcom_task': 'get_samplesheet_and_decide_flow',
  70. 'samplesheet_xcom_key': 'formatted_samplesheets',
  71. 'output_path_xcom_key': 'temp_output_path'},
  72. python_callable=run_demultiplexing_func)
  73. ## PIPELINE
  74. get_samplesheet_and_decide_flow >> t >> prepare_merged_report