dynamic_batcher package

class BatchProcessor(batch_size: int = 64, batch_time: int = 2)[source]

Bases: object

A Client class for dynamic batch processing. A BatchProcessor tries to connect a redis server with connection info., given by the following ENVVAR:

REDIS__HOST=localhost
REDIS__PORT=6379
REDIS__DB=0
REDIS__PASSWORD=

DYNAMIC_BATCHER__BATCH_SIZE=64
DYNAMIC_BATCHER__BATCH_TIME=2
Parameters:
  • batch_size (int) –

    Number of requests for a batch. Defaults to 64. If DYNAMIC_BATCHER__BATCH_SIZE is set, the argument default value is overrided. When the argument value is passed, all other settings are ignored.

    Priority:

    values passed > ENVVAR > default value
    

  • batch_time (int) – Seconds of deadline to wait for requests. Defaults to 2. If timeout is too large, it will be stuck on waiting too long, which is not intended. If timeout is too small, it will work as impatient, not waiting for the batch process is finished.

delay

Seconds of frequency to parse a request.

Type:

int

batch_size

Number of requests for a batch.

Type:

int

batch_time

Seconds of deadline to wait for requests.

Type:

int

Example

Create a processor:
>>> import asyncio
>>> from dynamic_batcher import BatchProcessor
>>> processor = BatchProcessor()
>>> asyncio.run(batch_processor.start_daemon(lambda x: x))
Raises:

redis.exceptions.ConnectionError – a redis server is not available.

async start_daemon(func: Callable) None[source]

Start a single batch process as a daemon. This will concatenate given requests to one batch, call func, and split into corresponding responses.

Parameters:

( (func) – obj: Callable): A callable object, like function or method. func should have only one positional argument, and its type should be List; to handle the argument as a scalable batch. The type of the argument and the returning value should be List, to handle a scalable batch and operate elementwisely. Also both argument and returning value should be JSON (de)serializable.

Returns:

None

Example

First, define a function to run:

>>> import asyncio
>>> from dynamic_batcher import BatchProcessor
>>> from typing import List, Dict
>>> body_list = [
...     {'values': [1, 2, 3]},
...     {'values': [4, 5, 6]}
... ]
>>> def sum_values(bodies: List[Dict]) -> List[Dict]:
...     result = []
...     for body in bodies:
...         result.append( { 'sum': sum(body['values']) } )
...     return result
>>> sum_values(body_list)
[{'sum': 6}, {'sum': 15}]

Then, run a BatchProcessor:

>>> import asyncio
>>> from dynamic_batcher import BatchProcessor
>>> batch_processor = BatchProcessor()
>>> asyncio.run(batch_processor.start_daemon(sum_values))
class DynamicBatcher(delay: int = 0.01, timeout: int = 100)[source]

Bases: object

A Client class for dynamic batch processing. A DynamicBatcher tries to connect a redis server with connection info., given by the following ENVVAR:

REDIS__HOST=localhost
REDIS__PORT=6379
REDIS__DB=0
REDIS__PASSWORD=
Parameters:
  • delay (int) – Seconds of frequency to parse a response, corresponding a request sent. Defaults to 0.01.

  • timeout (int) – Seconds of deadline to wait for a response. Defaults to 100. If timeout is too large, it will be stuck on waiting too long, which is not intended. If timeout is too small, it will work as impatient, not waiting for the batch process is finished.

delay

Seconds of frequency to parse a response, corresponding a request sent.

Type:

int

timeout

Seconds of deadline to wait for a response.

Type:

int

Example

Create a batcher:
>>> from dynamic_batcher import DynamicBatcher
>>> batcher = DynamicBatcher()
You can give some parameters:
>>> lazy_batcher = DynamicBatcher(delay=1)
Or, create a fail-fast batcher:
>>> fail_fast_batcher = DynamicBatcher(timeout=3)
Raises:

redis.exceptions.ConnectionError – a redis server is not available.

async asend(body: Dict | List, *args, **kwargs) Dict | List | None[source]

Send a request and wait for a response, with JSON-serializable body.

Parameters:
  • ( (body) – obj: Dict or List): A JSON-serializable object, especially Dict or List.

  • *args – Variable length argument list.

  • **kwargs – Arbitrary keyword arguments.

Returns:

optional

Return type:

Dict or List

Example

>>> import time
>>> import uvicorn
>>> from typing import List
>>> from fastapi import FastAPI
>>> from pydantic import BaseModel
>>> from dynamic_batcher import DynamicBatcher
>>>
>>> app = FastAPI()
>>> batcher = DynamicBatcher()
>>> class RequestItem(BaseModel):
...     key: str
...     values: List[int] = [1, 5, 2]
>>>
>>> @app.post("/batch/{key}")
>>> async def run_batch(key: str, body: RequestItem):
...     start_time = time.time()
...     resp_body = await batcher.asend(body.model_dump())
...     result = {
...         "key": key,
...         "values": body.values,
...         "values_sum": resp_body,
...         "elapsed": time.time() - start_time
...     }
...     return result
>>>
>>> if __name__ == "__main__":
>>>     uvicorn.run(app)
INFO:     Started server process [27085]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
class PendingRequestStream(message_id: bytes, consumer: bytes, time_since_delivered: int, times_delivered: int)[source]

Bases: NamedTuple

(Internally used) DTO Class for Request.

Example

assign case 1:
>>> stream = PendingRequestStream(
...     message_id=b'1234',
...     consumer=b'consumer',
...     time_since_delivered=11234,
...     times_delivered=1,
... )
assign case 2:
>>> request = {
...     'message_id': b'1234',
...     'consumer': b'consumer',
...     'time_since_delivered': 11234,
...     'times_delivered': 1,
... }
>>> stream = ResponseStream(**request)
{'stream_id': b'1234', 'body': {'values': [1, 2, 3]}}
>>> stream = ResponseStream(**request)
dump:
>>> stream = PendingRequestStream(
...     message_id=b'1234',
...     consumer=b'consumer',
...     time_since_delivered=11234,
...     times_delivered=1,
... )
>>> request = stream.model_dump()
>>> request
{'message_id': b'1234', 'consumer': b'consumer', 'time_since_delivered': 11234, 'times_delivered': 1}
consumer: bytes

Alias for field number 1

message_id: bytes

Alias for field number 0

time_since_delivered: int

Alias for field number 2

times_delivered: int

Alias for field number 3

class ResponseStream(stream_id: bytes, body: List | Dict)[source]

Bases: NamedTuple

(Internally used) DTO Class for Response.

Example

assign case 1:
>>> stream = ResponseStream(stream_id=b'1234', body={'values': [1, 2, 3]})
assign case 2:
>>> response = {'stream_id': b'1234', 'body': {'values': [1, 2, 3]}}
>>> stream = ResponseStream(**response)
dump:
>>> stream = ResponseStream(stream_id=b'1234', body={'values': [1, 2, 3]})
>>> response = stream.model_dump()
>>> response
{'stream_id': b'1234', 'body': {'values': [1, 2, 3]}}
body: List | Dict

Alias for field number 1

stream_id: bytes

Alias for field number 0

Submodules