Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
A python library what works with numbers.

pynum A python library what works with numbers. Prime Prime class have everithing you want about prime numbers. check_prime The check_prime method is

Mohammad Mahdi Paydar Puya 1 Jan 07, 2022
ripgrep recursively searches directories for a regex pattern while respecting your gitignore

ripgrep (rg) ripgrep is a line-oriented search tool that recursively searches the current directory for a regex pattern. By default, ripgrep will resp

Andrew Gallant 35k Dec 31, 2022
Final project in KAIST AI class

mmodal_mixer MLP-Mixer based Multi-modal image-text retrieval Image: Original image is cropped with 16 x 16 patch size without overlap. Then, it is re

SuperSuperMoon 5 May 30, 2022
Metal Gear Rising: Revengeance's DAT archive (un)packer

DOOMP Metal Gear Rising: Revengeance's DAT archive (un)packer

Christopher Holzmann Pérez 5 Sep 02, 2022
Repls goes to sleep due to inactivity, but to keep it awake, simply host a webserver and ping it.

Repls goes to sleep due to inactivity, but to keep it awake, simply host a webserver and ping it. This repo will help you make a webserver with a bit of console controls.

2 Mar 01, 2022
Library support get vocabulary from MEM

Features: Support scraping the courses in MEM to take the vocabulary Translate the words to your own language Get the IPA for the English course Insta

Joseph Quang 4 Aug 13, 2022
Simple project to learn more about Bézier curves

Python Quadratic Bézier Simple project to learn more about Bézier curves. On this project i used some api's to graphics and gui pygame thorpy in theor

Kenned Ferreira 2 Mar 06, 2022
Simple dotfile pre-processor with a per-file configuration

ix (eeks) Simple dotfile pre-processor with a per-file configuration Summary (TL;DR) ix.py is all you need config is an ini file. files to be processe

Poly 12 Dec 16, 2021
CRC Reverse Engineering Tool in Python

CRC Beagle CRC Beagle is a tool for reverse engineering CRCs. It is designed for commnication protocols where you often have several messages of the s

Colin O'Flynn 51 Jan 05, 2023
Security-related flags and options for C compilers

Getting the maximum of your C compiler, for security

135 Nov 11, 2022
Process GPX files (adding sensor metrics, uploading to InfluxDB, etc.) exported from imxingzhe.com

Xingzhe GPX Processor 行者轨迹处理工具 Xingzhe sells cheap GPS bike meters with sensor support including cadence, heart rate and power. But the GPX files expo

Shengqi Chen 8 Sep 23, 2022
Make creating Excel XLSX files fun again

Poi: Make creating Excel XLSX files fun again. Poi helps you write Excel sheet in a declarative way, ensuring you have a better Excel writing experien

Ryan Wang 11 Apr 01, 2022
An implementation of Ray Tracing in One Weekend using Taichi

又一个Taichi语言的Ray Tracer 背景简介 这个Ray Tracer基本上是照搬了Peter Shirley的第一本小书Ray Tracing in One Weekend,在我写的时候参考的是Version 3.2.3这个版本。应该比其他中文博客删改了不少内容。果然Peter Shir

张皓 30 Nov 21, 2022
ColabFold / AlphaFold2_advanced on your local PC (or macOS)

LocalColabFold ColabFold / AlphaFold2_advanced on your local PC (or macOS) Installation For Linux Make sure curl and wget commands are already install

Yoshitaka Moriwaki 207 Dec 22, 2022
My programming language named JoLang. (Mainly created for fun)

JoLang status: not ready So this is my programming language which I decided to name 'JoLang' (inspired by Jonathan and GoLang). Features I implemented

Jonathan 14 Dec 22, 2022
Add any Program in any language you like or add a hello world Program ❣️ if you like give us :star:

Welcome to the Hacktoberfest 2018 Hello-world 📋 This Project aims to help you to get started with using Github. You can find a tutorial here What is

Aniket Sharma 1.5k Nov 16, 2022
Multiperiod Reports by Month/Quarter/Year in Beancount.

Multiperiod Reports by Month/Quarter/Year in Beancount. Plotting income and expenses over time. Treemap plot of expenses.

Altynbek Isabekov 16 Aug 13, 2022
Cross-platform config and manager for click console utilities.

climan Help the project financially: Donate: https://smartlegion.github.io/donate/ Yandex Money: https://yoomoney.ru/to/4100115206129186 PayPal: https

3 Aug 31, 2021
Use Ghidra Structs in Python

Strudra Welcome to Strudra, a way to craft Ghidra structs in python, using ghidra_bridge. Example First, init Strudra - you can pass in a custom Ghidr

Dominik Maier 27 Nov 24, 2022
Um pequeno painel de consulta

Spynel Um pequeno painel com consultas de: IP CEP PLACA CNPJ OBS: caso execute o script pelo termux, recomendo que use o da F-Droid por ser mais atual

Spyware 12 Oct 25, 2022