Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
A conda-smithy repository for boost-histogram.

The official Boost.Histogram Python bindings. Provides fast, efficient histogramming with a variety of different storages combined with dozens of composable axes. Part of the Scikit-HEP family.

conda-forge 0 Dec 17, 2021
Blender addon to import images as meshes

ImagesAsMesh Blender addon to import images as meshes. Inspired by: ImagesAsPlanes Installation It's like just about every other Blender addon. Downlo

Niccolo Zuppichini 4 Jan 04, 2022
Aplicação que envia regularmente um email ao utilizador com todos os filmes disponíveis no cartaz dos cinemas Nos.

Cartaz-Cinemas-Nos Aplicação que envia regularmente uma notificação ao utilizador com todos os filmes disponíveis no cartaz dos cinemas Nos. Só funcio

Cavalex 1 Jan 09, 2022
IST-Website - IST Tutoring Portal for python

IST Tutoring Portal This portal is a web based interface to handle student help

Jean 3 Jan 03, 2022
because rico hates uuid's

terrible-uuid-lambda because rico hates uuid's sub 200ms response times! Try it out here: https://api.mathisvaneetvelde.com/uuid https://api.mathisvan

Mathis Van Eetvelde 2 Feb 15, 2022
Blender addons - A collection of Blender tools I've written for myself over the years.

gret A collection of Blender tools I've written for myself over the years. I use these daily so they should be bug-free, mostly. Feel free to take and

217 Jan 08, 2023
A wide AOI generator tool.

Dark Generator A wide AOI generator tool. Information Installation To Install you have to have python 3.x and pip installed on your system. If you hav

Darkest Surface 12 Dec 26, 2022
A python script that automatically joins a zoom meeting based on your timetable.

Zoom Automation A python script that automatically joins a zoom meeting based on your timetable. What does it do? It performs the following processes:

Shourya Gupta 3 Jan 01, 2022
This Program Automates The Procces Of Adding Camos On Guns And Saving Them On Modern Warfare Guns

This Program Automates The Procces Of Adding Camos On Guns And Saving Them On Modern Warfare Guns

Flex Tools 6 May 26, 2022
The blancmange curve can be visually built up out of triangle wave functions if the infinite sum is approximated by finite sums of the first few terms.

Blancmange-curve The blancmange curve can be visually built up out of triangle wave functions if the infinite sum is approximated by finite sums of th

Shankar Mahadevan L 1 Nov 30, 2021
Object-data mapper and advanced query manager for non relational databases

Object data mapper and advanced query manager for non relational databases. The data is owned by different, configurable back-end databases and it is

Luca Sbardella 121 Aug 11, 2022
Org agenda in the console

This Python script reads an org agenda file (i.e. a regular org file with some active dates) and displays an interactive and colored year calendar with detailed information for each day when the mous

Nicolas P. Rougier 113 Jan 03, 2023
Análise do Aplicativo Prévias PSDB 2021

Análise do Aplicativo Prévias PSDB 2021 Com a recente polêmica sobre o aplicativo usado nas Prévias do PSDB de 2021, fiquei curioso para saber como er

Paulo Matias 18 Jul 31, 2022
Wagtail + Lottie is a Wagtail package for playing Adobe After Effects animations exported as json with Bodymovin.

Wagtail Lottie Wagtail + Lottie is a Wagtail package for playing Adobe After Effects animations exported as json with Bodymovin. Usage Export your ani

Alexis Le Baron 7 Aug 18, 2022
A program for calculating the divisor function

DivisorsFunctionCalculator A program for calculating the divisor function A script to find the "Sigma" (divisors function) of any number. To find the

1 Oct 31, 2021
Markov Chain Composer

Markov Chain Composer Using Markov Chain to represent relationships between words in song lyrics and then generating new lyrics.. ahem interpretive po

Kylie 85 Dec 09, 2022
Yandex Media Browser

Браузер медиа для плагина Yandex Station Включайте музыку, плейлисты и радио на Яндекс.Станции из Home Assistant! Скриншот Корневой раздел: Библиотека

Alexander Ryazanov 35 Dec 19, 2022
Modelling the 30 salamander problem from `Pure Mathematics` by Martin Liebeck

Salamanders on an island The Problem From A Concise Introduction to Pure Mathematics By Martin Liebeck Critic Ivor Smallbrain is watching the horror m

Faisal Jina 1 Jul 10, 2022
A patch and keygen tools for typora.

A patch and keygen tools for typora.

Mason Shi 1.4k Apr 12, 2022
flake8 plugin which checks that there is no use of sleep in the code.

flake8-sleep flake8 plugin which checks for use of sleep function. installation Using Pypi: pip install flake8-sleep flake8 codes Code Description SLP

1 Nov 26, 2021