Source code for dynamic_batcher.types

from typing import List, Dict, NamedTuple


__all__ = [
    "ResponseStream",
    "PendingRequestStream",
]


[docs] class ResponseStream(NamedTuple): """(Internally used) DTO Class for Response. Example: assign case 1: >>> stream = ResponseStream(stream_id=b'1234', body={'values': [1, 2, 3]}) assign case 2: >>> response = {'stream_id': b'1234', 'body': {'values': [1, 2, 3]}} >>> stream = ResponseStream(**response) dump: >>> stream = ResponseStream(stream_id=b'1234', body={'values': [1, 2, 3]}) >>> response = stream.model_dump() >>> response {'stream_id': b'1234', 'body': {'values': [1, 2, 3]}} """ stream_id: bytes body: List | Dict
[docs] class PendingRequestStream(NamedTuple): """(Internally used) DTO Class for Request. Example: assign case 1: >>> stream = PendingRequestStream( ... message_id=b'1234', ... consumer=b'consumer', ... time_since_delivered=11234, ... times_delivered=1, ... ) assign case 2: >>> request = { ... 'message_id': b'1234', ... 'consumer': b'consumer', ... 'time_since_delivered': 11234, ... 'times_delivered': 1, ... } >>> stream = ResponseStream(**request) {'stream_id': b'1234', 'body': {'values': [1, 2, 3]}} >>> stream = ResponseStream(**request) dump: >>> stream = PendingRequestStream( ... message_id=b'1234', ... consumer=b'consumer', ... time_since_delivered=11234, ... times_delivered=1, ... ) >>> request = stream.model_dump() >>> request {'message_id': b'1234', 'consumer': b'consumer', 'time_since_delivered': 11234, 'times_delivered': 1} """ message_id: bytes consumer: bytes time_since_delivered: int times_delivered: int