diff --git a/docs/api/components.md b/docs/api/components.md index 15f97c2..d775dc5 100644 --- a/docs/api/components.md +++ b/docs/api/components.md @@ -17,7 +17,7 @@ These classes are Pydantic models with strict validation and are the from asyncflow.components import ( Client, Server, - ServerResources, + NodesResources, LoadBalancer, Endpoint, Edge, @@ -32,7 +32,7 @@ from asyncflow.enums import Distribution ```python from asyncflow.components import ( - Client, Server, ServerResources, LoadBalancer, Endpoint, Edge + Client, Server, NodesResources, LoadBalancer, Endpoint, Edge ) # Nodes @@ -49,7 +49,7 @@ endpoint = Endpoint( server = Server( id="srv-1", - server_resources=ServerResources(cpu_cores=2, ram_mb=2048), + server_resources=NodesResources(cpu_cores=2, ram_mb=2048), endpoints=[endpoint], ) @@ -103,10 +103,10 @@ Client(id: str) --- -### `ServerResources` +### `NodesResources` ```python -ServerResources( +NodesResources( cpu_cores: int = 1, # ≥ 1 NOW MUST BE FIXED TO ONE ram_mb: int = 1024, # ≥ 256 db_connection_pool: int | None = None, @@ -114,7 +114,7 @@ ServerResources( ``` * Server capacity knobs used by the runtime (CPU tokens, RAM reservoir, optional DB pool). -* You may pass a **dict** instead of `ServerResources`; Pydantic will coerce it. +* You may pass a **dict** instead of `NodesResources`; Pydantic will coerce it. **Bounds & defaults** @@ -166,7 +166,7 @@ Each step is a dict with **exactly one** operation: ```python Server( id: str, - server_resources: ServerResources | dict, + server_resources: NodesResources | dict, endpoints: list[Endpoint], ) ``` @@ -234,7 +234,7 @@ Edge( ## Type coercion & enums * You may pass strings for enums (`kind`, `distribution`, etc.); they will be validated against the allowed values. -* For `ServerResources` and `Edge.latency` you can pass dictionaries; Pydantic will coerce them to typed models. +* For `NodesResources` and `Edge.latency` you can pass dictionaries; Pydantic will coerce them to typed models. * If you prefer, you can import and use the enums: ```python diff --git a/docs/index.md b/docs/index.md index a4affc2..cdb8768 100644 --- a/docs/index.md +++ b/docs/index.md @@ -14,7 +14,7 @@ AsyncFlow is a discrete-event simulator for Python async backends (FastAPI/Uvico ## Public API (stable surface) * **[High-Level API](api/high-level.md)** — The two entry points you’ll use most: `AsyncFlow` (builder) and `SimulationRunner` (orchestrator). -* **[Components](api/components.md)** — Public Pydantic models for topology: `Client`, `Server`, `Endpoint`, `Edge`, `LoadBalancer`, `ServerResources`. +* **[Components](api/components.md)** — Public Pydantic models for topology: `Client`, `Server`, `Endpoint`, `Edge`, `LoadBalancer`, `NodesResources`. * **[Workload](api/workload.md)** — Traffic inputs: `RqsGenerator` and `RVConfig` (random variables). * **[Settings](api/settings.md)** — Global controls: `SimulationSettings` (duration, sampling cadence, metrics). * **[Enums](api/enums.md)** — Optional importable enums: distributions, step kinds/ops, metric names, node/edge types, LB algorithms. diff --git a/docs/internals/runtime-and-resources.md b/docs/internals/runtime-and-resources.md index 32f1611..903fddf 100644 --- a/docs/internals/runtime-and-resources.md +++ b/docs/internals/runtime-and-resources.md @@ -88,7 +88,7 @@ AsyncFlow mirrors that physical constraint through the **Resource layer**, which | Responsibility | Implementation detail | | --------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| **Discover capacity** | Walks the *validated* `TopologyGraph.nodes.servers`, reading `cpu_cores` and `ram_mb` from each `ServerResources` spec. | +| **Discover capacity** | Walks the *validated* `TopologyGraph.nodes.servers`, reading `cpu_cores` and `ram_mb` from each `NodesResources` spec. | | **Mint containers** | Calls `build_containers(env, spec)` which returns
`{"CPU": simpy.Container(init=cpu_cores), "RAM": simpy.Container(init=ram_mb)}` — the containers start **full** so a server can immediately consume tokens. | | **Registry map** | Stores them in a private dict `_by_server: dict[str, ServerContainers]`. | | **Public API** | `registry[server_id] → ServerContainers` (raises `KeyError` if the ID is unknown). | diff --git a/docs/internals/simulation-input.md b/docs/internals/simulation-input.md index f6bbbcd..aa9a8c5 100644 --- a/docs/internals/simulation-input.md +++ b/docs/internals/simulation-input.md @@ -114,15 +114,15 @@ class Client(BaseModel): # validator: type must equal SystemNodes.CLIENT ``` -#### `ServerResources` +#### `NodesResources` ```python -class ServerResources(BaseModel): - cpu_cores: PositiveInt = Field(ServerResourcesDefaults.CPU_CORES, - ge=ServerResourcesDefaults.MINIMUM_CPU_CORES) - db_connection_pool: PositiveInt | None = Field(ServerResourcesDefaults.DB_CONNECTION_POOL) - ram_mb: PositiveInt = Field(ServerResourcesDefaults.RAM_MB, - ge=ServerResourcesDefaults.MINIMUM_RAM_MB) +class NodesResources(BaseModel): + cpu_cores: PositiveInt = Field(NodesResourcesDefaults.CPU_CORES, + ge=NodesResourcesDefaults.MINIMUM_CPU_CORES) + db_connection_pool: PositiveInt | None = Field(NodesResourcesDefaults.DB_CONNECTION_POOL) + ram_mb: PositiveInt = Field(NodesResourcesDefaults.RAM_MB, + ge=NodesResourcesDefaults.MINIMUM_RAM_MB) ``` Each attribute maps directly to a SimPy primitive (core tokens, RAM container, optional DB pool). @@ -164,7 +164,7 @@ Canonical lowercase names avoid accidental duplicates by case. class Server(BaseModel): id: str type: SystemNodes = SystemNodes.SERVER - server_resources: ServerResources + server_resources: NodesResources endpoints: list[Endpoint] # validator: type must equal SystemNodes.SERVER ``` @@ -302,7 +302,7 @@ class SimulationSettings(BaseModel): ### Nodes * `Client.type == client`, `Server.type == server`, `LoadBalancer.type == load_balancer` (enforced). -* `ServerResources` obey lower bounds: `cpu_cores ≥ 1`, `ram_mb ≥ 256`. +* `NodesResources` obey lower bounds: `cpu_cores ≥ 1`, `ram_mb ≥ 256`. * `TopologyNodes` contains **unique ids** across `client`, `servers[]`, and (optional) `load_balancer`. Duplicates → `ValueError`. * `TopologyNodes` forbids unknown fields (`extra="forbid"`). diff --git a/src/asyncflow/components/__init__.py b/src/asyncflow/components/__init__.py index 12e9e9e..9ba1da3 100644 --- a/src/asyncflow/components/__init__.py +++ b/src/asyncflow/components/__init__.py @@ -7,8 +7,8 @@ from asyncflow.schemas.topology.nodes import ( Client, LoadBalancer, + NodesResources, Server, - ServerResources, ) __all__ = [ @@ -17,8 +17,8 @@ "Endpoint", "EventInjection", "LoadBalancer", + "NodesResources", "Server", - "ServerResources", ] diff --git a/src/asyncflow/config/constants.py b/src/asyncflow/config/constants.py index 29b2229..fb23820 100644 --- a/src/asyncflow/config/constants.py +++ b/src/asyncflow/config/constants.py @@ -117,7 +117,7 @@ class StepOperation(StrEnum): # CONSTANTS FOR THE RESOURCES OF A SERVER # ====================================================================== -class ServerResourcesDefaults: +class NodesResourcesDefaults: """Resources available for a single server""" CPU_CORES = 1 diff --git a/src/asyncflow/resources/server_containers.py b/src/asyncflow/resources/server_containers.py index 1401247..e92f3e5 100644 --- a/src/asyncflow/resources/server_containers.py +++ b/src/asyncflow/resources/server_containers.py @@ -12,7 +12,7 @@ import simpy from asyncflow.config.constants import ServerResourceName -from asyncflow.schemas.topology.nodes import ServerResources +from asyncflow.schemas.topology.nodes import NodesResources # ============================================================== # DICT FOR THE REGISTRY TO INITIALIZE RESOURCES FOR EACH SERVER @@ -33,12 +33,12 @@ class ServerContainers(TypedDict): # Central funcrion to initialize the dictionary with ram and cpu container def build_containers( env: simpy.Environment, - spec: ServerResources, + spec: NodesResources, ) -> ServerContainers: """ Construct and return a mapping of SimPy Containers for a server's CPU and RAM. - Given a SimPy environment and a validated ServerResources spec, this function + Given a SimPy environment and a validated NodesResources spec, this function initializes one simpy.Container for CPU (with capacity equal to cpu_cores) and one for RAM (with capacity equal to ram_mb), then returns them in a ServerContainers TypedDict keyed by "CPU" and "RAM". @@ -47,7 +47,7 @@ def build_containers( ---------- env : simpy.Environment The simulation environment in which the Containers will be created. - spec : ServerResources + spec : NodesResources A Pydantic model instance defining the server's cpu_cores and ram_mb. Returns diff --git a/src/asyncflow/runtime/actors/server.py b/src/asyncflow/runtime/actors/server.py index d83f94d..d610a9a 100644 --- a/src/asyncflow/runtime/actors/server.py +++ b/src/asyncflow/runtime/actors/server.py @@ -196,7 +196,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 for step in selected_endpoint.steps: - if step.kind in EndpointStepCPU: + if isinstance(step.kind, EndpointStepCPU): # with the boolean we avoid redundant operation of asking # the core multiple time on a given step # for example if we have two consecutive cpu bound step @@ -232,7 +232,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 # since the object is of an Enum class we check if the step.kind # is one member of enum - elif step.kind in EndpointStepIO: + elif isinstance(step.kind, EndpointStepIO): # define the io time io_time = step.step_operation[StepOperation.IO_WAITING_TIME] @@ -244,7 +244,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 if not is_in_io_queue: is_in_io_queue = True self._el_io_queue_len += 1 - + # here is a sage check: the first step should always # be a cpu bound (parsing of the request), if an user # start with a I/O this allow to don't break the flux @@ -305,6 +305,21 @@ def _dispatcher(self) -> Generator[simpy.Event, None, None]: The main dispatcher loop. It pulls requests from the inbox and spawns a new '_handle_request' process for each one. """ + # we assume in the current model that there is a one + # to one correspondence between cpu cores and workers + # before entering in the loop in the current implementation + # we reserve the ram necessary to run the processes + if self.server_config.ram_per_process: + processes_ram = ( + self.server_config.ram_per_process * + self.server_config.server_resources.cpu_cores + ) + + yield self.server_resources[ + ServerResourceName.RAM.value + ].get(processes_ram) + + while True: # Wait for a request to arrive in the server's inbox raw_state = yield self.server_box.get() diff --git a/src/asyncflow/schemas/topology/nodes.py b/src/asyncflow/schemas/topology/nodes.py index 5ada69b..fee6876 100644 --- a/src/asyncflow/schemas/topology/nodes.py +++ b/src/asyncflow/schemas/topology/nodes.py @@ -17,16 +17,42 @@ from asyncflow.config.constants import ( LbAlgorithmsName, - ServerResourcesDefaults, + NodesResourcesDefaults, SystemNodes, ) from asyncflow.schemas.topology.endpoint import Endpoint #------------------------------------------------------------- # Definition of the nodes structure for the graph representing -# the topoogy of the system defined for the simulation +# the topology of the system defined for the simulation #------------------------------------------------------------- +# ------------------------------------------------------------- +# Resources you may assign to a node +# ------------------------------------------------------------- + +class NodesResources(BaseModel): + """ + Quantifiable resources available on a node (server/LB/client). + Each attribute maps to a SimPy resource primitive or container. + """ + + cpu_cores: PositiveInt = Field( + NodesResourcesDefaults.CPU_CORES, + ge = NodesResourcesDefaults.MINIMUM_CPU_CORES, + description="Number of CPU cores available for processing.", + ) + + db_connection_pool: PositiveInt | None = Field( + NodesResourcesDefaults.DB_CONNECTION_POOL, + description="Size of the database connection pool, if applicable.", + ) + + ram_mb: PositiveInt = Field( + NodesResourcesDefaults.RAM_MB, + ge = NodesResourcesDefaults.MINIMUM_RAM_MB, + description="Total available RAM in megabytes.") + # ------------------------------------------------------------- # CLIENT # ------------------------------------------------------------- @@ -37,43 +63,36 @@ class Client(BaseModel): id: str type: SystemNodes = SystemNodes.CLIENT + # A client may be hosted on a virtual machine + # and technically has resources. + # At this stage, client-side bottlenecks + # are not modeled, so resources are optional. + + client_resources: NodesResources | None = None + ram_per_process: PositiveInt | None = None + @field_validator("type", mode="after") def ensure_type_is_standard(cls, v: SystemNodes) -> SystemNodes: # noqa: N805 - """Ensure the type of the client is standard""" + """Ensure the node type is CLIENT.""" if v != SystemNodes.CLIENT: msg = f"The type should have a standard value: {SystemNodes.CLIENT}" raise ValueError(msg) return v -# ------------------------------------------------------------- -# SERVER RESOURCES -# ------------------------------------------------------------- - -class ServerResources(BaseModel): - """ - Defines the quantifiable resources available on a server node. - Each attribute maps directly to a SimPy resource primitive. - """ + @model_validator(mode="after") # type: ignore[arg-type] + def ram_and_ram_per_process_are_coherent( + cls, # noqa: N805 + model: "Client", + ) -> "Client": + """Check that if ram per process exist, ram is assigned to the client""" + if model.ram_per_process and not model.client_resources: + msg = ("To reserve per-process RAM for the client " + f"'{model.id}', define resources in 'client_resources'.") + raise ValueError(msg) - cpu_cores: PositiveInt = Field( - ServerResourcesDefaults.CPU_CORES, - ge = ServerResourcesDefaults.MINIMUM_CPU_CORES, - description="Number of CPU cores available for processing.", - ) - db_connection_pool: PositiveInt | None = Field( - ServerResourcesDefaults.DB_CONNECTION_POOL, - description="Size of the database connection pool, if applicable.", - ) + return model - # Risorse modellate come simpy.Container (livello) - ram_mb: PositiveInt = Field( - ServerResourcesDefaults.RAM_MB, - ge = ServerResourcesDefaults.MINIMUM_RAM_MB, - description="Total available RAM in Megabytes.") - # for the future - # disk_iops_limit: PositiveInt | None = None - # network_throughput_mbps: PositiveInt | None = None # ------------------------------------------------------------- # SERVER @@ -84,20 +103,20 @@ class Server(BaseModel): definition of the server class: - id: is the server identifier - type: is the type of node in the structure - - server resources: is a dictionary to define the resources + - nodes resources: is a dictionary to define the resources of the machine where the server is living - endpoints: is the list of all endpoints in a server """ id: str type: SystemNodes = SystemNodes.SERVER - #Later define a valide structure for the keys of server resources - server_resources : ServerResources - endpoints : list[Endpoint] + server_resources: NodesResources + endpoints: list[Endpoint] = Field(min_length=1) + ram_per_process: PositiveInt | None = None @field_validator("type", mode="after") def ensure_type_is_standard(cls, v: SystemNodes) -> SystemNodes: # noqa: N805 - """Ensure the type of the server is standard""" + """Ensure the node type is SERVER.""" if v != SystemNodes.SERVER: msg = f"The type should have a standard value: {SystemNodes.SERVER}" raise ValueError(msg) @@ -116,16 +135,35 @@ class LoadBalancer(BaseModel): algorithms: LbAlgorithmsName = LbAlgorithmsName.ROUND_ROBIN server_covered: set[str] = Field(default_factory=set) + # In the next release, once the new network model is introduced, + # we will monitor resource-related bottlenecks that can occur at the LB, + # especially RAM pressure. Until then, we keep this optional to maintain + # compatibility with the current public API. + lb_resources: NodesResources | None = None + ram_per_process: PositiveInt | None = None @field_validator("type", mode="after") def ensure_type_is_standard(cls, v: SystemNodes) -> SystemNodes: # noqa: N805 - """Ensure the type of the server is standard""" + """Ensure the node type is LOAD_BALANCER.""" if v != SystemNodes.LOAD_BALANCER: msg = f"The type should have a standard value: {SystemNodes.LOAD_BALANCER}" raise ValueError(msg) return v + @model_validator(mode="after") # type: ignore[arg-type] + def ram_and_ram_per_process_are_coherent( + cls, # noqa: N805 + model: "LoadBalancer", + ) -> "LoadBalancer": + """Check that if ram per process exist, ram is assigned to LB""" + if model.ram_per_process and not model.lb_resources: + msg = ("To reserve per-process RAM for the load balancer " + f"'{model.id}', define resources in 'lb_resources'.") + raise ValueError(msg) + + return model + # ------------------------------------------------------------- # NODES CLASS WITH ALL POSSIBLE OBJECTS REPRESENTED BY A NODE @@ -141,8 +179,8 @@ class TopologyNodes(BaseModel): servers: list[Server] client: Client - # Right now we accept just one LB, in the future we - # will change this + + # For now we accept a single LB; this may change in the future. load_balancer: LoadBalancer | None = None @model_validator(mode="after") # type: ignore[arg-type] @@ -150,7 +188,7 @@ def unique_ids( cls, # noqa: N805 model: "TopologyNodes", ) -> "TopologyNodes": - """Check that all id are unique""" + """Ensure that all node IDs are unique.""" ids = [server.id for server in model.servers] + [model.client.id] if model.load_balancer is not None: @@ -159,8 +197,54 @@ def unique_ids( counter = Counter(ids) duplicate = [node_id for node_id, value in counter.items() if value > 1] if duplicate: - msg = f"The following node ids are duplicate {duplicate}" + msg = f"Duplicate node IDs detected: {duplicate}" raise ValueError(msg) return model + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_servers_covered_by_lb_exist( + cls, # noqa: N805 + model: "TopologyNodes", + ) -> "TopologyNodes": + """Ensure that all servers covered by the LB exist.""" + if not model.load_balancer: + return model + + server_ids = {server.id for server in model.servers} + + for server_id in model.load_balancer.server_covered: + if server_id not in server_ids: + msg = ( + f"Load balancer '{model.load_balancer.id}' " + f"references unknown server '{server_id}'. " + "Define it under 'servers' or remove it from 'server_covered'." + ) + raise ValueError(msg) + + return model + + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_ram_and_ram_per_process_is_valid( + cls, # noqa: N805 + model: "TopologyNodes", + ) -> "TopologyNodes": + """Ensure the total ram for processes is not higher than the ram available""" + for server in model.servers: + if server.ram_per_process: + total_ram_for_processes = ( + server.server_resources.cpu_cores * + server.ram_per_process + ) + if total_ram_for_processes >= server.server_resources.ram_mb: + msg = (f"Server '{server.id}': " + f"per-process RAM total ({total_ram_for_processes} MB) " + f"exceeds or is equal to total RAM " + f"({server.server_resources.ram_mb} MB)." + ) + raise ValueError(msg) + + return model + + + # Reject unknown fields to keep schemas strict and predictable. model_config = ConfigDict(extra="forbid") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..c75472e --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""package to ensure smooth import""" diff --git a/tests/conftest.py b/tests/conftest.py index 6834764..cc54097 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -160,3 +160,4 @@ def payload_base( def env() -> simpy.Environment: """Return a fresh SimPy environment per test.""" return simpy.Environment() + diff --git a/tests/integration/event_injection/lb_two_servers.py b/tests/integration/event_injection/lb_two_servers.py index 4272719..3c29e95 100644 --- a/tests/integration/event_injection/lb_two_servers.py +++ b/tests/integration/event_injection/lb_two_servers.py @@ -31,8 +31,8 @@ from asyncflow.schemas.topology.nodes import ( Client, LoadBalancer, + NodesResources, Server, - ServerResources, TopologyNodes, ) from asyncflow.schemas.workload.rqs_generator import RqsGenerator @@ -42,7 +42,7 @@ def _server(sid: str) -> Server: - return Server(id=sid, server_resources=ServerResources(), endpoints=[]) + return Server(id=sid, server_resources=NodesResources(), endpoints=[]) def _edge(eid: str, src: str, tgt: str, mean: float = 0.002) -> Edge: diff --git a/tests/integration/event_injection/single_server.py b/tests/integration/event_injection/single_server.py index 1698305..4431786 100644 --- a/tests/integration/event_injection/single_server.py +++ b/tests/integration/event_injection/single_server.py @@ -30,8 +30,8 @@ from asyncflow.schemas.topology.nodes import ( Client, LoadBalancer, + NodesResources, Server, - ServerResources, TopologyNodes, ) from asyncflow.schemas.workload.rqs_generator import RqsGenerator @@ -41,7 +41,7 @@ def _server(sid: str) -> Server: - return Server(id=sid, server_resources=ServerResources(), endpoints=[]) + return Server(id=sid, server_resources=NodesResources(), endpoints=[]) def _edge(eid: str, src: str, tgt: str, mean: float = 0.002) -> Edge: diff --git a/tests/integration/load_balancer/test_lb_basic.py b/tests/integration/load_balancer/test_lb_basic.py index 293f5ef..0bcebad 100644 --- a/tests/integration/load_balancer/test_lb_basic.py +++ b/tests/integration/load_balancer/test_lb_basic.py @@ -37,8 +37,8 @@ from asyncflow.schemas.topology.nodes import ( Client, LoadBalancer, + NodesResources, Server, - ServerResources, TopologyNodes, ) from asyncflow.schemas.workload.rqs_generator import RqsGenerator @@ -60,7 +60,7 @@ def _server(server_id: str) -> Server: ) return Server( id=server_id, - server_resources=ServerResources(), # defaults are fine + server_resources=NodesResources(), # defaults are fine endpoints=[ep], ) diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py index e0310a0..c75472e 100644 --- a/tests/unit/__init__.py +++ b/tests/unit/__init__.py @@ -1 +1 @@ -"""Unit tests.""" +"""package to ensure smooth import""" diff --git a/tests/unit/helpers.py b/tests/unit/helpers.py new file mode 100644 index 0000000..2b7c0e2 --- /dev/null +++ b/tests/unit/helpers.py @@ -0,0 +1,14 @@ +from asyncflow.config.constants import EndpointStepCPU, StepOperation +from asyncflow.schemas.topology.endpoint import Endpoint, Step + + +def make_min_ep(ep_id: str = "ep-1", cpu_time: float = 0.1) -> Endpoint: + return Endpoint( + endpoint_name=ep_id, + steps=[ + Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: cpu_time}, + ), + ], + ) diff --git a/tests/unit/public_api/test_import.py b/tests/unit/public_api/test_import.py index 2bea333..0a2401f 100644 --- a/tests/unit/public_api/test_import.py +++ b/tests/unit/public_api/test_import.py @@ -16,8 +16,8 @@ Endpoint, EventInjection, LoadBalancer, + NodesResources, Server, - ServerResources, ) from asyncflow.settings import SimulationSettings from asyncflow.workload import RqsGenerator, RVConfig @@ -47,7 +47,7 @@ def test_components_public_symbols() -> None: "EventInjection", "LoadBalancer", "Server", - "ServerResources", + "NodesResources", ] _assert_all_equals("asyncflow.components", expected) @@ -62,7 +62,7 @@ def test_components_symbols_are_importable_classes() -> None: (EventInjection, "EventInjection"), (LoadBalancer, "LoadBalancer"), (Server, "Server"), - (ServerResources, "ServerResources"), + (NodesResources, "NodesResources"), ]: assert isinstance(cls, type), f"{name} should be a class type" assert cls.__name__ == name diff --git a/tests/unit/resources/test_registry.py b/tests/unit/resources/test_registry.py index 6581ae0..eae6afd 100644 --- a/tests/unit/resources/test_registry.py +++ b/tests/unit/resources/test_registry.py @@ -11,15 +11,15 @@ from asyncflow.schemas.topology.graph import TopologyGraph from asyncflow.schemas.topology.nodes import ( Client, + NodesResources, Server, - ServerResources, TopologyNodes, ) def _minimal_server(server_id: str, cores: int, ram: int) -> Server: """Create a Server with a dummy endpoint and resource spec.""" - res = ServerResources(cpu_cores=cores, ram_mb=ram) + res = NodesResources(cpu_cores=cores, ram_mb=ram) dummy_ep = Endpoint(endpoint_name="/ping", steps=[]) return Server(id=server_id, server_resources=res, endpoints=[dummy_ep]) diff --git a/tests/unit/resources/test_server_containers.py b/tests/unit/resources/test_server_containers.py index b7a8243..bae0587 100644 --- a/tests/unit/resources/test_server_containers.py +++ b/tests/unit/resources/test_server_containers.py @@ -4,12 +4,12 @@ from asyncflow.config.constants import ServerResourceName from asyncflow.resources.server_containers import build_containers -from asyncflow.schemas.topology.nodes import ServerResources +from asyncflow.schemas.topology.nodes import NodesResources def test_containers_start_full() -> None: env = simpy.Environment() - spec = ServerResources(cpu_cores=4, ram_mb=2048) + spec = NodesResources(cpu_cores=4, ram_mb=2048) containers = build_containers(env, spec) cpu = containers[ServerResourceName.CPU.value] diff --git a/tests/unit/runtime/actors/test_server.py b/tests/unit/runtime/actors/test_server.py index f5ff2ef..5251ffe 100644 --- a/tests/unit/runtime/actors/test_server.py +++ b/tests/unit/runtime/actors/test_server.py @@ -34,7 +34,7 @@ from asyncflow.runtime.rqs_state import RequestState from asyncflow.schemas.settings.simulation import SimulationSettings from asyncflow.schemas.topology.endpoint import Endpoint, Step -from asyncflow.schemas.topology.nodes import Server, ServerResources +from asyncflow.schemas.topology.nodes import NodesResources, Server if TYPE_CHECKING: from collections.abc import Generator, Iterable @@ -96,7 +96,7 @@ def _make_server_runtime( steps: Iterable[Step] | None = None, ) -> tuple[ServerRuntime, simpy.Store]: """Return a (ServerRuntime, sink) ready for injection tests.""" - res_spec = ServerResources(cpu_cores=cpu_cores, ram_mb=ram_mb) + res_spec = NodesResources(cpu_cores=cpu_cores, ram_mb=ram_mb) containers = build_containers(env, res_spec) endpoint = _mk_endpoint(steps if steps is not None else _default_steps()) @@ -319,7 +319,7 @@ def test_ram_gating_blocks_before_ready() -> None: """When RAM is scarce, blocks on RAM and must NOT inflate ready.""" env = simpy.Environment() - # Respect ServerResources(min RAM = 256). + # Respect NodesResources(min RAM = 256). # Endpoint needs 256 MB → second request waits on RAM (not in ready). steps = ( Step( diff --git a/tests/unit/runtime/events/test_injection_servers.py b/tests/unit/runtime/events/test_injection_servers.py index 7347fb3..0080214 100644 --- a/tests/unit/runtime/events/test_injection_servers.py +++ b/tests/unit/runtime/events/test_injection_servers.py @@ -7,6 +7,7 @@ import pytest import simpy +from tests.unit.helpers import make_min_ep from asyncflow.config.constants import EventDescription from asyncflow.runtime.actors.edge import EdgeRuntime @@ -14,7 +15,7 @@ from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import EventInjection from asyncflow.schemas.topology.edges import Edge -from asyncflow.schemas.topology.nodes import Server, ServerResources +from asyncflow.schemas.topology.nodes import NodesResources, Server if TYPE_CHECKING: from asyncflow.schemas.settings.simulation import SimulationSettings @@ -24,6 +25,8 @@ # Helpers # # --------------------------------------------------------------------------- # + + def _edge(edge_id: str, source: str, target: str) -> Edge: """Create a minimal LB→server edge with negligible latency.""" return Edge( @@ -38,8 +41,8 @@ def _srv(server_id: str) -> Server: """Create a minimal, fully-typed Server instance for tests.""" return Server( id=server_id, - server_resources=ServerResources(), # uses defaults - endpoints=[], # empty list is valid + server_resources=NodesResources(), # uses defaults + endpoints=[make_min_ep()], ) def _srv_event( diff --git a/tests/unit/runtime/events/test_injection_servers_edges.py b/tests/unit/runtime/events/test_injection_servers_edges.py index 966a9f5..fcf2e69 100644 --- a/tests/unit/runtime/events/test_injection_servers_edges.py +++ b/tests/unit/runtime/events/test_injection_servers_edges.py @@ -7,6 +7,7 @@ import pytest import simpy +from tests.unit.helpers import make_min_ep from asyncflow.config.constants import EventDescription from asyncflow.runtime.actors.edge import EdgeRuntime @@ -14,7 +15,7 @@ from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import EventInjection from asyncflow.schemas.topology.edges import Edge -from asyncflow.schemas.topology.nodes import Server, ServerResources +from asyncflow.schemas.topology.nodes import NodesResources, Server if TYPE_CHECKING: from asyncflow.schemas.settings.simulation import SimulationSettings @@ -31,7 +32,9 @@ def _edge(edge_id: str, source: str, target: str) -> Edge: def _srv(server_id: str) -> Server: """Create a minimal, fully-typed Server instance for tests.""" - return Server(id=server_id, server_resources=ServerResources(), endpoints=[]) + return Server( + id=server_id, server_resources=NodesResources(), endpoints=[make_min_ep()], + ) def _spike_event( diff --git a/tests/unit/runtime/test_simulation_runner.py b/tests/unit/runtime/test_simulation_runner.py index 9ec9299..66c25cb 100644 --- a/tests/unit/runtime/test_simulation_runner.py +++ b/tests/unit/runtime/test_simulation_runner.py @@ -13,6 +13,7 @@ import pytest import simpy import yaml +from tests.unit.helpers import make_min_ep from asyncflow.config.constants import Distribution, EventDescription from asyncflow.runtime.simulation_runner import SimulationRunner @@ -25,8 +26,8 @@ from asyncflow.schemas.topology.nodes import ( Client, LoadBalancer, + NodesResources, Server, - ServerResources, TopologyNodes, ) @@ -149,7 +150,10 @@ def _payload_with_lb_one_server_and_edges( ) -> SimulationPayload: """Build a small payload with LB → server wiring and one net edge.""" client = Client(id="client-1") - server = Server(id="srv-1", server_resources=ServerResources(), endpoints=[]) + server = Server( + id="srv-1", server_resources=NodesResources(), + endpoints=[make_min_ep()], + ) lb = LoadBalancer(id="lb-1") nodes = TopologyNodes(servers=[server], client=client, load_balancer=lb) diff --git a/tests/unit/schemas/test_payload.py b/tests/unit/schemas/test_payload.py index 8547f83..b712996 100644 --- a/tests/unit/schemas/test_payload.py +++ b/tests/unit/schemas/test_payload.py @@ -17,6 +17,7 @@ from typing import TYPE_CHECKING import pytest +from tests.unit.helpers import make_min_ep from asyncflow.config.constants import Distribution, EventDescription from asyncflow.schemas.common.random_variables import RVConfig @@ -96,8 +97,14 @@ def _topology_with_two_servers_and_edge() -> TopologyGraph: """Create a topology with two servers and a minimal edge.""" client = Client(id="client-1") servers = [ - Server(id="srv-1", server_resources={"cpu_cores": 1}, endpoints=[]), - Server(id="srv-2", server_resources={"cpu_cores": 1}, endpoints=[]), + Server( + id="srv-1", server_resources={"cpu_cores": 1}, + endpoints=[make_min_ep()], + ), + Server( + id="srv-2", server_resources={"cpu_cores": 1}, + endpoints=[make_min_ep()], + ), ] edge = Edge( id="gen-to-client", diff --git a/tests/unit/schemas/test_topology.py b/tests/unit/schemas/test_topology.py index 0ef53e0..e9c7f67 100644 --- a/tests/unit/schemas/test_topology.py +++ b/tests/unit/schemas/test_topology.py @@ -1,4 +1,4 @@ -"""Unit-tests for topology schemas (Client, ServerResources, Edge, …)""" +"""Unit-tests for topology schemas (Client, NodesResources, Edge, …)""" from __future__ import annotations @@ -8,7 +8,7 @@ from asyncflow.config.constants import ( EndpointStepCPU, NetworkParameters, - ServerResourcesDefaults, + NodesResourcesDefaults, StepOperation, SystemEdges, SystemNodes, @@ -20,8 +20,8 @@ from asyncflow.schemas.topology.nodes import ( Client, LoadBalancer, + NodesResources, Server, - ServerResources, TopologyNodes, ) @@ -43,22 +43,22 @@ def test_invalid_client_type() -> None: # --------------------------------------------------------------------------- # -# ServerResources # +# NodesResources # # --------------------------------------------------------------------------- # def test_server_resources_defaults() -> None: """All defaults match constant table.""" - res = ServerResources() - assert res.cpu_cores == ServerResourcesDefaults.CPU_CORES - assert res.ram_mb == ServerResourcesDefaults.RAM_MB - assert res.db_connection_pool is ServerResourcesDefaults.DB_CONNECTION_POOL + res = NodesResources() + assert res.cpu_cores == NodesResourcesDefaults.CPU_CORES + assert res.ram_mb == NodesResourcesDefaults.RAM_MB + assert res.db_connection_pool is NodesResourcesDefaults.DB_CONNECTION_POOL def test_server_resources_min_constraints() -> None: """Values below minimum trigger validation failure.""" with pytest.raises(ValidationError): - ServerResources(cpu_cores=0, ram_mb=128) # too small + NodesResources(cpu_cores=0, ram_mb=128) # too small # --------------------------------------------------------------------------- # @@ -80,7 +80,7 @@ def test_valid_server() -> None: srv = Server( id="api-1", type=SystemNodes.SERVER, - server_resources=ServerResources(cpu_cores=2, ram_mb=1024), + server_resources=NodesResources(cpu_cores=2, ram_mb=1024), endpoints=[_dummy_endpoint()], ) assert srv.id == "api-1" @@ -92,7 +92,7 @@ def test_invalid_server_type() -> None: Server( id="bad-srv", type=SystemNodes.CLIENT, - server_resources=ServerResources(), + server_resources=NodesResources(), endpoints=[_dummy_endpoint()], ) @@ -118,7 +118,7 @@ def _single_node_topology() -> TopologyNodes: """Helper returning one server + one client topology.""" srv = Server( id="svc-A", - server_resources=ServerResources(), + server_resources=NodesResources(), endpoints=[_dummy_endpoint()], ) cli = Client(id="browser")