A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
This interactive script demonstrates the Menezes-Vanstone-EC-Cryptosystem

Menezes-Vanstone-EC-Cryptosystem This interactive script demonstrates the Meneze

Nishaant Goswamy 1 Jan 02, 2022
Youtube Channel Website

Videos-By-Sanjeevi Youtube Channel Website YouTube Channel Website Features: Free Hosting using GitHub Pages and open-source code base in GitHub. It c

Sanjeevi Subramani 5 Mar 26, 2022
Игра реализована с помощью языке python3.9, библиотеки pygame

Игра в танки Игра реализована с помощью языке python3.9, библиотеки pygame. Игра имеет несколько уровней. Правила: есть танки, которые стреляют, есть

1 Jan 01, 2022
Stopmagic gives you the power of creating amazing Stop Motion animations faster and easier than ever before.

Stopmagic gives you the power of creating amazing Stop Motion animations faster and easier than ever before. This project is maintained by Aldrin Mathew.

Aldrin's Art Factory 67 Dec 31, 2022
An Airflow operator to call the main function from the dbt-core Python package

airflow-dbt-python An Airflow operator to call the main function from the dbt-core Python package Motivation Airflow running in a managed environment

Tomás Farías Santana 93 Jan 08, 2023
A script that convert WiiU BotW mods to Switch

UltimateBoTWConverter A script that convert WiiU BotW mods to Switch. It uses every resource I could find under the sun that allows for conversion, wi

11 Nov 08, 2022
A python package template that can be adapted for RAP projects

Warning - this repository is a snapshot of a repository internal to NHS Digital. This means that links to videos and some URLs may not work. Repositor

NHS Digital 3 Nov 08, 2022
A rough GSL work DynSAGE of my graduation project

DynSAGE Codes w.r.t DynSAGE-Diffuse can be found in function apply_dyn_model_v2 of src/utils.py. The training entrance is Line 144 - 155 of src/main.p

Yuhan Wang 3 Mar 22, 2022
Data and analysis relating to the 5.8M Melbourne quake of 2021

quake2021 Data and analysis relating to the 5.8M Melbourne quake of 2021 Monash University Woodside Living Lab Building The building is located here T

Colin Caprani 6 May 16, 2022
This code extracts line width of phonons from specular energy density (SED) calculated with LAMMPS.

This code extracts line width of phonons from specular energy density (SED) calculated with LAMMPS.

Masato Ohnishi 3 Jun 15, 2022
Meaningful and minimalist release notes for developers

Managing manual release notes is hard. Therefore, everyone tends to generate release notes from commit messages. But, you won't get a meaningful release note at the end.

codezri 31 Dec 30, 2022
1 May 12, 2022
A Unified Framework for Hydrology

Unified Framework for Hydrology The Python package unifhy (Unified Framework for Hydrology) is a hydrological modelling framework which combines inter

Unified Framefork for Hydrology - Community Organisation 6 Jan 01, 2023
vFuzzer is a tool developed for fuzzing buffer overflows, For now, It can be used for fuzzing plain vanilla stack based buffer overflows

vFuzzer vFuzzer is a tool developed for fuzzing buffer overflows, For now, It can be used for fuzzing plain vanilla stack based buffer overflows, The

Vedant Bhalgama 5 Nov 12, 2022
A python script for compiling and executing .cc files

Debug And Run A python script for compiling and executing .cc files Example dbrun fname.cc [DEBUG MODE] Compiling fname.cc with C++17 ------------

1 May 28, 2022
The most widely used Python to C compiler

Welcome to Cython! Cython is a language that makes writing C extensions for Python as easy as Python itself. Cython is based on Pyrex, but supports mo

7.6k Jan 03, 2023
ELF file deserializer and serializer library

elfo ELF file deserializer and serializer library. import elfo elf = elfo.ELF.from_path('main') elf ELF( header=ELFHeader( e_ident=e

Filipe Laíns 3 Aug 23, 2021
Automatically find solutions when your Python code encounters an issue.

What The Python?! Helping you find answers to the errors Python spits out. Installation You can find the source code on GitHub at: https://github.com/

What The Python?! 139 Dec 14, 2022
Converts a base copy of Pokemon BDSP's masterdatas into a more readable and editable Pokemon Showdown Format.

Showdown-BDSP-Converter Converts a base copy of Pokemon BDSP's masterdatas into a more readable and editable Pokemon Showdown Format. Download the lat

Alden Mo 2 Jan 02, 2022
One line Brainfuck interpreter in Python

One line Brainfuck interpreter in Python

16 Dec 21, 2022