dag17_create_transcriptome_ref.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import os
  2. from datetime import timedelta
  3. from airflow.models import DAG
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.bash_operator import BashOperator
  8. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import download_gtf_file_func
  9. from igf_airflow.utils.dag17_create_transcriptome_ref_utils import create_star_index_func
  10. args = {
  11. 'owner': 'airflow',
  12. 'start_date': days_ago(2),
  13. 'retries': 1,
  14. 'retry_delay': timedelta(minutes=5),
  15. 'provide_context': True,
  16. 'email_on_failure': False,
  17. 'email_on_retry': False,
  18. 'catchup': False,
  19. 'max_active_runs': 1}
  20. DAG_ID = \
  21. os.path.basename(__file__).\
  22. replace(".pyc", "").\
  23. replace(".py", "")
  24. dag = \
  25. DAG(
  26. dag_id=DAG_ID,
  27. schedule_interval=None,
  28. default_args=args,
  29. tags=['hpc'])
  30. with dag:
  31. ## TASK
  32. download_gtf_file = \
  33. PythonOperator(
  34. task_id="download_gtf_file",
  35. dag=dag,
  36. queue='hpc_4G',
  37. params={
  38. 'gtf_xcom_key': 'gtf_file'
  39. },
  40. python_callable=download_gtf_file_func)
  41. ## TASK
  42. create_star_index = \
  43. PythonOperator(
  44. task_id="create_star_index",
  45. dag=dag,
  46. queue='hpc_32G8t',
  47. params={
  48. 'gtf_xcom_task': 'download_gtf_file',
  49. 'gtf_xcom_key': 'gtf_file',
  50. 'star_ref_xcom_key': 'star_ref',
  51. 'threads': 8,
  52. 'star_options': [
  53. '--sjdbOverhang', 149]
  54. },
  55. python_callable=create_star_index_func)