Quickstart

Release:

v1.0.6.1

Gathering API requests as one batch

First, run a redis server:

$ docker run --rm -p 6379:6379 -e ALLOW_EMPTY_PASSWORD=yes bitnami/redis:latest
redis 00:02:54.05
redis 00:02:54.06 Welcome to the Bitnami redis container
redis 00:02:54.06 Subscribe to project updates by watching https://github.com/bitnami/containers
redis 00:02:54.06 Submit issues and feature requests at https://github.com/bitnami/containers/issues
redis 00:02:54.06
redis 00:02:54.07 INFO  ==> ** Starting Redis setup **
redis 00:02:54.08 WARN  ==> You set the environment variable ALLOW_EMPTY_PASSWORD=yes. For safety reasons, do not use this flag in a production environment.
redis 00:02:54.09 INFO  ==> Initializing Redis
redis 00:02:54.11 INFO  ==> Setting Redis config file
redis 00:02:54.15 INFO  ==> ** Redis setup finished! **

redis 00:02:54.16 INFO  ==> ** Starting Redis **
1:C 20 Nov 2023 00:02:54.189 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
1:C 20 Nov 2023 00:02:54.189 # Redis version=7.0.12, bits=64, commit=00000000, modified=0, pid=1, just started
1:C 20 Nov 2023 00:02:54.189 # Configuration loaded
1:M 20 Nov 2023 00:02:54.189 * monotonic clock: POSIX clock_gettime
1:M 20 Nov 2023 00:02:54.191 * Running mode=standalone, port=6379.
1:M 20 Nov 2023 00:02:54.191 # Server initialized
1:M 20 Nov 2023 00:02:54.197 * Creating AOF base file appendonly.aof.1.base.rdb on server start
1:M 20 Nov 2023 00:02:54.200 * Creating AOF incr file appendonly.aof.1.incr.aof on server start
1:M 20 Nov 2023 00:02:54.200 * Ready to accept connections

Create an app with DynamicBatcher:

# app.py

import time
import uvicorn
from typing import List
from fastapi import FastAPI
from pydantic import BaseModel
from dynamic_batcher import DynamicBatcher

app = FastAPI()
batcher = DynamicBatcher()

class RequestItem(BaseModel):
    values: List[int] = [1, 5, 2]

@app.post("/batch/{key}")
async def run_batch(key: str, body: RequestItem):
    start_time = time.time()
    resp_body = await batcher.asend(body.model_dump())
    result = {
        "key": key,
        "values": body.values,
        "values_sum": resp_body,
        "elapsed": time.time() - start_time
    }
    return result

if __name__ == "__main__":
    uvicorn.run(app)
INFO:     Started server process [27085]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

Create a BatchProcessor:

Note

BatchProcessor should run with single process, as designed so.

# batch_func.py

## function_for_batch: `sum_values`
from typing import List, Dict
body_list = [
    {'values': [1, 2, 3]},
    {'values': [4, 5, 6]}
]
def sum_values(bodies: List[Dict]) -> List[Dict]:
    result = []
    for body in bodies:
        result.append( { 'sum': sum(body['values']) } )
    return result

# sum_values(body_list) -> [{'sum': 6}, {'sum': 15}]
$ python -m dynamic_batcher batch_func.sum_values --batch-size=64 --batch-time=2

Create a request:

$ curl -X POST localhost:8000/batch/single \
    -H 'Content-Type: application/json' \
    -d '{"key": "a", "values": [1, 3, 5]}'
{"key":"single","values":[1,3,5],"values_sum":{"sum":9},"elapsed":2.470838212966919}

Create requests simultaneously:

$ seq 1 17 | xargs -n1 -I {} -P20 curl -w "\n" -X POST localhost:8000/batch/{} \
    -H 'Content-Type: application/json' \
    -d '{"key": "a", "values": [1, 3, 5]}'
{"key":"2","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.026194095611572266}
{"key":"1","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.02919602394104004}
{"key":"4","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03496694564819336}
{"key":"3","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03712725639343262}
{"key":"5","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03885698318481445}
{"key":"6","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.07140994071960449}
{"key":"7","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.07084202766418457}
{"key":"8","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03500699996948242}
{"key":"9","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03584885597229004}
{"key":"10","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03512001037597656}
{"key":"11","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03521585464477539}
{"key":"13","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03747105598449707}
{"key":"12","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.0377802848815918}
{"key":"14","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.03834390640258789}
{"key":"16","values":[1,3,5],"values_sum":{"sum":9},"elapsed":0.039556026458740234}
{"key":"15","values":[1,3,5],"values_sum":{"sum":9},"elapsed":3.737711191177368}
{"key":"17","values":[1,3,5],"values_sum":{"sum":9},"elapsed":3.739470720291138}

The remainder(2) was delayed until batch_time is up.