dynamic_batcher.types module

class PendingRequestStream(message_id: bytes, consumer: bytes, time_since_delivered: int, times_delivered: int)[source]

Bases: NamedTuple

(Internally used) DTO Class for Request.

Example

assign case 1:
>>> stream = PendingRequestStream(
...     message_id=b'1234',
...     consumer=b'consumer',
...     time_since_delivered=11234,
...     times_delivered=1,
... )
assign case 2:
>>> request = {
...     'message_id': b'1234',
...     'consumer': b'consumer',
...     'time_since_delivered': 11234,
...     'times_delivered': 1,
... }
>>> stream = ResponseStream(**request)
{'stream_id': b'1234', 'body': {'values': [1, 2, 3]}}
>>> stream = ResponseStream(**request)
dump:
>>> stream = PendingRequestStream(
...     message_id=b'1234',
...     consumer=b'consumer',
...     time_since_delivered=11234,
...     times_delivered=1,
... )
>>> request = stream.model_dump()
>>> request
{'message_id': b'1234', 'consumer': b'consumer', 'time_since_delivered': 11234, 'times_delivered': 1}
consumer: bytes

Alias for field number 1

message_id: bytes

Alias for field number 0

time_since_delivered: int

Alias for field number 2

times_delivered: int

Alias for field number 3

class ResponseStream(stream_id: bytes, body: List | Dict)[source]

Bases: NamedTuple

(Internally used) DTO Class for Response.

Example

assign case 1:
>>> stream = ResponseStream(stream_id=b'1234', body={'values': [1, 2, 3]})
assign case 2:
>>> response = {'stream_id': b'1234', 'body': {'values': [1, 2, 3]}}
>>> stream = ResponseStream(**response)
dump:
>>> stream = ResponseStream(stream_id=b'1234', body={'values': [1, 2, 3]})
>>> response = stream.model_dump()
>>> response
{'stream_id': b'1234', 'body': {'values': [1, 2, 3]}}
body: List | Dict

Alias for field number 1

stream_id: bytes

Alias for field number 0