dynamic_batcher package
- class BatchProcessor(batch_size: int = 64, batch_time: int = 2)[source]
Bases:
object
A Client class for dynamic batch processing. A BatchProcessor tries to connect a redis server with connection info., given by the following
ENVVAR
:REDIS__HOST=localhost REDIS__PORT=6379 REDIS__DB=0 REDIS__PASSWORD= DYNAMIC_BATCHER__BATCH_SIZE=64 DYNAMIC_BATCHER__BATCH_TIME=2
- Parameters:
batch_size (int) –
Number of requests for a batch. Defaults to
64
. IfDYNAMIC_BATCHER__BATCH_SIZE
is set, the argument default value is overrided. When the argument value is passed, all other settings are ignored.Priority:
values passed > ENVVAR > default value
batch_time (int) – Seconds of deadline to wait for requests. Defaults to
2
. If timeout is too large, it will be stuck on waiting too long, which is not intended. If timeout is too small, it will work as impatient, not waiting for the batch process is finished.
- delay
Seconds of frequency to parse a request.
- Type:
int
- batch_size
Number of requests for a batch.
- Type:
int
- batch_time
Seconds of deadline to wait for requests.
- Type:
int
Example
- Create a processor:
>>> import asyncio >>> from dynamic_batcher import BatchProcessor >>> processor = BatchProcessor() >>> asyncio.run(batch_processor.start_daemon(lambda x: x))
- Raises:
redis.exceptions.ConnectionError – a redis server is not available.
- async start_daemon(func: Callable) None [source]
Start a single batch process as a daemon. This will concatenate given requests to one batch, call func, and split into corresponding responses.
- Parameters:
( (func) – obj: Callable): A callable object, like function or method. func should have only one positional argument, and its type should be
List
; to handle the argument as a scalable batch. The type of the argument and the returning value should beList
, to handle a scalable batch and operate elementwisely. Also both argument and returning value should be JSON (de)serializable.- Returns:
None
Example
First, define a function to run:
>>> import asyncio >>> from dynamic_batcher import BatchProcessor >>> from typing import List, Dict >>> body_list = [ ... {'values': [1, 2, 3]}, ... {'values': [4, 5, 6]} ... ] >>> def sum_values(bodies: List[Dict]) -> List[Dict]: ... result = [] ... for body in bodies: ... result.append( { 'sum': sum(body['values']) } ) ... return result
>>> sum_values(body_list) [{'sum': 6}, {'sum': 15}]
Then, run a
BatchProcessor
:>>> import asyncio >>> from dynamic_batcher import BatchProcessor >>> batch_processor = BatchProcessor() >>> asyncio.run(batch_processor.start_daemon(sum_values))
- class DynamicBatcher(delay: int = 0.01, timeout: int = 100)[source]
Bases:
object
A Client class for dynamic batch processing. A DynamicBatcher tries to connect a redis server with connection info., given by the following
ENVVAR
:REDIS__HOST=localhost REDIS__PORT=6379 REDIS__DB=0 REDIS__PASSWORD=
- Parameters:
delay (int) – Seconds of frequency to parse a response, corresponding a request sent. Defaults to
0.01
.timeout (int) – Seconds of deadline to wait for a response. Defaults to
100
. If timeout is too large, it will be stuck on waiting too long, which is not intended. If timeout is too small, it will work as impatient, not waiting for the batch process is finished.
- delay
Seconds of frequency to parse a response, corresponding a request sent.
- Type:
int
- timeout
Seconds of deadline to wait for a response.
- Type:
int
Example
- Create a batcher:
>>> from dynamic_batcher import DynamicBatcher >>> batcher = DynamicBatcher()
- You can give some parameters:
>>> lazy_batcher = DynamicBatcher(delay=1)
- Or, create a fail-fast batcher:
>>> fail_fast_batcher = DynamicBatcher(timeout=3)
- Raises:
redis.exceptions.ConnectionError – a redis server is not available.
- async asend(body: Dict | List, *args, **kwargs) Dict | List | None [source]
Send a request and wait for a response, with JSON-serializable body.
- Parameters:
( (body) – obj:
Dict
orList
): A JSON-serializable object, especiallyDict
orList
.*args – Variable length argument list.
**kwargs – Arbitrary keyword arguments.
- Returns:
optional
- Return type:
Dict or List
Example
>>> import time >>> import uvicorn >>> from typing import List >>> from fastapi import FastAPI >>> from pydantic import BaseModel >>> from dynamic_batcher import DynamicBatcher >>> >>> app = FastAPI() >>> batcher = DynamicBatcher() >>> class RequestItem(BaseModel): ... key: str ... values: List[int] = [1, 5, 2] >>> >>> @app.post("/batch/{key}") >>> async def run_batch(key: str, body: RequestItem): ... start_time = time.time() ... resp_body = await batcher.asend(body.model_dump()) ... result = { ... "key": key, ... "values": body.values, ... "values_sum": resp_body, ... "elapsed": time.time() - start_time ... } ... return result >>> >>> if __name__ == "__main__": >>> uvicorn.run(app) INFO: Started server process [27085] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
- class PendingRequestStream(message_id: bytes, consumer: bytes, time_since_delivered: int, times_delivered: int)[source]
Bases:
NamedTuple
(Internally used) DTO Class for Request.
Example
- assign case 1:
>>> stream = PendingRequestStream( ... message_id=b'1234', ... consumer=b'consumer', ... time_since_delivered=11234, ... times_delivered=1, ... )
- assign case 2:
>>> request = { ... 'message_id': b'1234', ... 'consumer': b'consumer', ... 'time_since_delivered': 11234, ... 'times_delivered': 1, ... } >>> stream = ResponseStream(**request) {'stream_id': b'1234', 'body': {'values': [1, 2, 3]}} >>> stream = ResponseStream(**request)
- dump:
>>> stream = PendingRequestStream( ... message_id=b'1234', ... consumer=b'consumer', ... time_since_delivered=11234, ... times_delivered=1, ... ) >>> request = stream.model_dump() >>> request {'message_id': b'1234', 'consumer': b'consumer', 'time_since_delivered': 11234, 'times_delivered': 1}
- consumer: bytes
Alias for field number 1
- message_id: bytes
Alias for field number 0
- time_since_delivered: int
Alias for field number 2
- times_delivered: int
Alias for field number 3
- class ResponseStream(stream_id: bytes, body: List | Dict)[source]
Bases:
NamedTuple
(Internally used) DTO Class for Response.
Example
- assign case 1:
>>> stream = ResponseStream(stream_id=b'1234', body={'values': [1, 2, 3]})
- assign case 2:
>>> response = {'stream_id': b'1234', 'body': {'values': [1, 2, 3]}} >>> stream = ResponseStream(**response)
- dump:
>>> stream = ResponseStream(stream_id=b'1234', body={'values': [1, 2, 3]}) >>> response = stream.model_dump() >>> response {'stream_id': b'1234', 'body': {'values': [1, 2, 3]}}
- body: List | Dict
Alias for field number 1
- stream_id: bytes
Alias for field number 0