Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docs/api/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ These classes are Pydantic models with strict validation and are the
from asyncflow.components import (
Client,
Server,
ServerResources,
NodesResources,
LoadBalancer,
Endpoint,
Edge,
Expand All @@ -32,7 +32,7 @@ from asyncflow.enums import Distribution

```python
from asyncflow.components import (
Client, Server, ServerResources, LoadBalancer, Endpoint, Edge
Client, Server, NodesResources, LoadBalancer, Endpoint, Edge
)

# Nodes
Expand All @@ -49,7 +49,7 @@ endpoint = Endpoint(

server = Server(
id="srv-1",
server_resources=ServerResources(cpu_cores=2, ram_mb=2048),
server_resources=NodesResources(cpu_cores=2, ram_mb=2048),
endpoints=[endpoint],
)

Expand Down Expand Up @@ -103,18 +103,18 @@ Client(id: str)

---

### `ServerResources`
### `NodesResources`

```python
ServerResources(
NodesResources(
cpu_cores: int = 1, # ≥ 1 NOW MUST BE FIXED TO ONE
ram_mb: int = 1024, # ≥ 256
db_connection_pool: int | None = None,
)
```

* Server capacity knobs used by the runtime (CPU tokens, RAM reservoir, optional DB pool).
* You may pass a **dict** instead of `ServerResources`; Pydantic will coerce it.
* You may pass a **dict** instead of `NodesResources`; Pydantic will coerce it.

**Bounds & defaults**

Expand Down Expand Up @@ -166,7 +166,7 @@ Each step is a dict with **exactly one** operation:
```python
Server(
id: str,
server_resources: ServerResources | dict,
server_resources: NodesResources | dict,
endpoints: list[Endpoint],
)
```
Expand Down Expand Up @@ -234,7 +234,7 @@ Edge(
## Type coercion & enums

* You may pass strings for enums (`kind`, `distribution`, etc.); they will be validated against the allowed values.
* For `ServerResources` and `Edge.latency` you can pass dictionaries; Pydantic will coerce them to typed models.
* For `NodesResources` and `Edge.latency` you can pass dictionaries; Pydantic will coerce them to typed models.
* If you prefer, you can import and use the enums:

```python
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ AsyncFlow is a discrete-event simulator for Python async backends (FastAPI/Uvico
## Public API (stable surface)

* **[High-Level API](api/high-level.md)** — The two entry points you’ll use most: `AsyncFlow` (builder) and `SimulationRunner` (orchestrator).
* **[Components](api/components.md)** — Public Pydantic models for topology: `Client`, `Server`, `Endpoint`, `Edge`, `LoadBalancer`, `ServerResources`.
* **[Components](api/components.md)** — Public Pydantic models for topology: `Client`, `Server`, `Endpoint`, `Edge`, `LoadBalancer`, `NodesResources`.
* **[Workload](api/workload.md)** — Traffic inputs: `RqsGenerator` and `RVConfig` (random variables).
* **[Settings](api/settings.md)** — Global controls: `SimulationSettings` (duration, sampling cadence, metrics).
* **[Enums](api/enums.md)** — Optional importable enums: distributions, step kinds/ops, metric names, node/edge types, LB algorithms.
Expand Down
2 changes: 1 addition & 1 deletion docs/internals/runtime-and-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ AsyncFlow mirrors that physical constraint through the **Resource layer**, which

| Responsibility | Implementation detail |
| --------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Discover capacity** | Walks the *validated* `TopologyGraph.nodes.servers`, reading `cpu_cores` and `ram_mb` from each `ServerResources` spec. |
| **Discover capacity** | Walks the *validated* `TopologyGraph.nodes.servers`, reading `cpu_cores` and `ram_mb` from each `NodesResources` spec. |
| **Mint containers** | Calls `build_containers(env, spec)` which returns<br>`{"CPU": simpy.Container(init=cpu_cores), "RAM": simpy.Container(init=ram_mb)}` — the containers start **full** so a server can immediately consume tokens. |
| **Registry map** | Stores them in a private dict `_by_server: dict[str, ServerContainers]`. |
| **Public API** | `registry[server_id] → ServerContainers` (raises `KeyError` if the ID is unknown). |
Expand Down
18 changes: 9 additions & 9 deletions docs/internals/simulation-input.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ class Client(BaseModel):
# validator: type must equal SystemNodes.CLIENT
```

#### `ServerResources`
#### `NodesResources`

```python
class ServerResources(BaseModel):
cpu_cores: PositiveInt = Field(ServerResourcesDefaults.CPU_CORES,
ge=ServerResourcesDefaults.MINIMUM_CPU_CORES)
db_connection_pool: PositiveInt | None = Field(ServerResourcesDefaults.DB_CONNECTION_POOL)
ram_mb: PositiveInt = Field(ServerResourcesDefaults.RAM_MB,
ge=ServerResourcesDefaults.MINIMUM_RAM_MB)
class NodesResources(BaseModel):
cpu_cores: PositiveInt = Field(NodesResourcesDefaults.CPU_CORES,
ge=NodesResourcesDefaults.MINIMUM_CPU_CORES)
db_connection_pool: PositiveInt | None = Field(NodesResourcesDefaults.DB_CONNECTION_POOL)
ram_mb: PositiveInt = Field(NodesResourcesDefaults.RAM_MB,
ge=NodesResourcesDefaults.MINIMUM_RAM_MB)
```

Each attribute maps directly to a SimPy primitive (core tokens, RAM container, optional DB pool).
Expand Down Expand Up @@ -164,7 +164,7 @@ Canonical lowercase names avoid accidental duplicates by case.
class Server(BaseModel):
id: str
type: SystemNodes = SystemNodes.SERVER
server_resources: ServerResources
server_resources: NodesResources
endpoints: list[Endpoint]
# validator: type must equal SystemNodes.SERVER
```
Expand Down Expand Up @@ -302,7 +302,7 @@ class SimulationSettings(BaseModel):
### Nodes

* `Client.type == client`, `Server.type == server`, `LoadBalancer.type == load_balancer` (enforced).
* `ServerResources` obey lower bounds: `cpu_cores ≥ 1`, `ram_mb ≥ 256`.
* `NodesResources` obey lower bounds: `cpu_cores ≥ 1`, `ram_mb ≥ 256`.
* `TopologyNodes` contains **unique ids** across `client`, `servers[]`, and (optional) `load_balancer`. Duplicates → `ValueError`.
* `TopologyNodes` forbids unknown fields (`extra="forbid"`).

Expand Down
4 changes: 2 additions & 2 deletions src/asyncflow/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from asyncflow.schemas.topology.nodes import (
Client,
LoadBalancer,
NodesResources,
Server,
ServerResources,
)

__all__ = [
Expand All @@ -17,8 +17,8 @@
"Endpoint",
"EventInjection",
"LoadBalancer",
"NodesResources",
"Server",
"ServerResources",
]


2 changes: 1 addition & 1 deletion src/asyncflow/config/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class StepOperation(StrEnum):
# CONSTANTS FOR THE RESOURCES OF A SERVER
# ======================================================================

class ServerResourcesDefaults:
class NodesResourcesDefaults:
"""Resources available for a single server"""

CPU_CORES = 1
Expand Down
8 changes: 4 additions & 4 deletions src/asyncflow/resources/server_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import simpy

from asyncflow.config.constants import ServerResourceName
from asyncflow.schemas.topology.nodes import ServerResources
from asyncflow.schemas.topology.nodes import NodesResources

# ==============================================================
# DICT FOR THE REGISTRY TO INITIALIZE RESOURCES FOR EACH SERVER
Expand All @@ -33,12 +33,12 @@ class ServerContainers(TypedDict):
# Central funcrion to initialize the dictionary with ram and cpu container
def build_containers(
env: simpy.Environment,
spec: ServerResources,
spec: NodesResources,
) -> ServerContainers:
"""
Construct and return a mapping of SimPy Containers for a server's CPU and RAM.

Given a SimPy environment and a validated ServerResources spec, this function
Given a SimPy environment and a validated NodesResources spec, this function
initializes one simpy.Container for CPU (with capacity equal to cpu_cores)
and one for RAM (with capacity equal to ram_mb), then returns them in a
ServerContainers TypedDict keyed by "CPU" and "RAM".
Expand All @@ -47,7 +47,7 @@ def build_containers(
----------
env : simpy.Environment
The simulation environment in which the Containers will be created.
spec : ServerResources
spec : NodesResources
A Pydantic model instance defining the server's cpu_cores and ram_mb.

Returns
Expand Down
21 changes: 18 additions & 3 deletions src/asyncflow/runtime/actors/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901

for step in selected_endpoint.steps:

if step.kind in EndpointStepCPU:
if isinstance(step.kind, EndpointStepCPU):
# with the boolean we avoid redundant operation of asking
# the core multiple time on a given step
# for example if we have two consecutive cpu bound step
Expand Down Expand Up @@ -232,7 +232,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901

# since the object is of an Enum class we check if the step.kind
# is one member of enum
elif step.kind in EndpointStepIO:
elif isinstance(step.kind, EndpointStepIO):
# define the io time
io_time = step.step_operation[StepOperation.IO_WAITING_TIME]

Expand All @@ -244,7 +244,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901
if not is_in_io_queue:
is_in_io_queue = True
self._el_io_queue_len += 1

# here is a sage check: the first step should always
# be a cpu bound (parsing of the request), if an user
# start with a I/O this allow to don't break the flux
Expand Down Expand Up @@ -305,6 +305,21 @@ def _dispatcher(self) -> Generator[simpy.Event, None, None]:
The main dispatcher loop. It pulls requests from the inbox and
spawns a new '_handle_request' process for each one.
"""
# we assume in the current model that there is a one
# to one correspondence between cpu cores and workers
# before entering in the loop in the current implementation
# we reserve the ram necessary to run the processes
if self.server_config.ram_per_process:
processes_ram = (
self.server_config.ram_per_process *
self.server_config.server_resources.cpu_cores
)

yield self.server_resources[
ServerResourceName.RAM.value
].get(processes_ram)


while True:
# Wait for a request to arrive in the server's inbox
raw_state = yield self.server_box.get()
Expand Down
Loading