Skip to content

Commit c66eeb9

Browse files
authored
Added edges without network latency plus tests (#28)
1 parent 6ea44ff commit c66eeb9

File tree

28 files changed

+630
-358
lines changed

28 files changed

+630
-358
lines changed

asyncflow_queue_limit/asyncflow_mm1.ipynb

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
},
2020
{
2121
"cell_type": "code",
22-
"execution_count": 34,
22+
"execution_count": 1,
2323
"id": "c3a69413",
2424
"metadata": {},
2525
"outputs": [],
@@ -34,7 +34,7 @@
3434
"from asyncflow import AsyncFlow, SimulationRunner\n",
3535
"from asyncflow.analysis import MMc, ResultsAnalyzer, SweepAnalyzer\n",
3636
"from asyncflow.components import (\n",
37-
" Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
37+
" Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
3838
")\n",
3939
"from asyncflow.settings import SimulationSettings\n",
4040
"\n",
@@ -43,18 +43,22 @@
4343
},
4444
{
4545
"cell_type": "code",
46-
"execution_count": 35,
46+
"execution_count": null,
4747
"metadata": {
4848
"tags": [
4949
"imports"
5050
]
5151
},
5252
"outputs": [
5353
{
54-
"name": "stdout",
55-
"output_type": "stream",
56-
"text": [
57-
"Imports OK.\n"
54+
"ename": "ImportError",
55+
"evalue": "cannot import name 'Edge' from 'asyncflow.components' (/home/gioele/projects/AsyncFlow/src/asyncflow/components/__init__.py)",
56+
"output_type": "error",
57+
"traceback": [
58+
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
59+
"\u001b[31mImportError\u001b[39m Traceback (most recent call last)",
60+
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 6\u001b[39m\n\u001b[32m 4\u001b[39m \u001b[38;5;66;03m# Public AsyncFlow API\u001b[39;00m\n\u001b[32m 5\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m AsyncFlow, SimulationRunner, Sweep\n\u001b[32m----> \u001b[39m\u001b[32m6\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01mcomponents\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m Client, Server, Edge, Endpoint, ArrivalsGenerator\n\u001b[32m 7\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01msettings\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m SimulationSettings\n\u001b[32m 8\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01manalysis\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m MMc, ResultsAnalyzer, SweepAnalyzer\n",
61+
"\u001b[31mImportError\u001b[39m: cannot import name 'Edge' from 'asyncflow.components' (/home/gioele/projects/AsyncFlow/src/asyncflow/components/__init__.py)"
5862
]
5963
}
6064
],
@@ -64,7 +68,7 @@
6468
"\n",
6569
"# Public AsyncFlow API\n",
6670
"from asyncflow import AsyncFlow, SimulationRunner, Sweep\n",
67-
"from asyncflow.components import Client, Server, Edge, Endpoint, ArrivalsGenerator\n",
71+
"from asyncflow.components import Client, Server, LinkEdge, Endpoint, ArrivalsGenerator\n",
6872
"from asyncflow.settings import SimulationSettings\n",
6973
"from asyncflow.analysis import MMc, ResultsAnalyzer, SweepAnalyzer\n",
7074
"from asyncflow.enums import Distribution\n",
@@ -103,7 +107,7 @@
103107
},
104108
{
105109
"cell_type": "code",
106-
"execution_count": 36,
110+
"execution_count": null,
107111
"metadata": {
108112
"tags": [
109113
"build"
@@ -139,9 +143,9 @@
139143
" endpoints=[endpoint],\n",
140144
" )\n",
141145
"\n",
142-
" e_gen_client = Edge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\", latency=0.0001, dropout_rate=0.0)\n",
143-
" e_client_app = Edge(id=\"client-app\", source=\"client-1\", target=\"app-1\", latency=0.0001, dropout_rate=0.0)\n",
144-
" e_app_client = Edge(id=\"app-client\", source=\"app-1\", target=\"client-1\", latency=0.0001, dropout_rate=0.0)\n",
146+
" e_gen_client = LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\")\n",
147+
" e_client_app = LinkEdge(id=\"client-app\", source=\"client-1\", target=\"app-1\")\n",
148+
" e_app_client = LinkEdge(id=\"app-client\", source=\"app-1\", target=\"client-1\")\n",
145149
"\n",
146150
" settings = SimulationSettings(\n",
147151
" total_simulation_time=2400,\n",
@@ -168,7 +172,7 @@
168172
},
169173
{
170174
"cell_type": "code",
171-
"execution_count": 37,
175+
"execution_count": null,
172176
"metadata": {
173177
"tags": [
174178
"run"
@@ -296,12 +300,12 @@
296300
"\\end{cases}\n",
297301
"$$\n",
298302
"\n",
299-
"> **Why small deltas appear:** warm-up effects, the user-sampling window (piecewise-constant rate), finite simulation horizon, and a (small) deterministic network latency naturally introduce small Theory vs Observed gaps. Increasing the simulation time and reducing network latency typically shrinks these deltas.\n"
303+
"> **Why small deltas appear:** warm-up effects, the user-sampling window (piecewise-constant rate), finite simulation horizon. Increasing the simulation time typically shrinks these deltas.\n"
300304
]
301305
},
302306
{
303307
"cell_type": "code",
304-
"execution_count": 38,
308+
"execution_count": null,
305309
"metadata": {
306310
"tags": [
307311
"mm1"
@@ -378,7 +382,7 @@
378382
},
379383
{
380384
"cell_type": "code",
381-
"execution_count": 39,
385+
"execution_count": null,
382386
"metadata": {
383387
"tags": [
384388
"plots"
@@ -467,7 +471,7 @@
467471
},
468472
{
469473
"cell_type": "code",
470-
"execution_count": 40,
474+
"execution_count": null,
471475
"id": "c9063bbe",
472476
"metadata": {},
473477
"outputs": [
@@ -514,7 +518,7 @@
514518
},
515519
{
516520
"cell_type": "code",
517-
"execution_count": 41,
521+
"execution_count": null,
518522
"id": "48716bc8",
519523
"metadata": {},
520524
"outputs": [
@@ -564,7 +568,7 @@
564568
},
565569
{
566570
"cell_type": "code",
567-
"execution_count": 42,
571+
"execution_count": null,
568572
"id": "9b9f0236",
569573
"metadata": {},
570574
"outputs": [

asyncflow_queue_limit/asyncflow_mmc_split.ipynb

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
},
2121
{
2222
"cell_type": "code",
23-
"execution_count": 1,
23+
"execution_count": null,
2424
"id": "b8a94d93",
2525
"metadata": {},
2626
"outputs": [],
@@ -36,7 +36,7 @@
3636
"from asyncflow import AsyncFlow, SimulationRunner\n",
3737
"from asyncflow.analysis import MMc, ResultsAnalyzer\n",
3838
"from asyncflow.components import (\n",
39-
" Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
39+
" Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
4040
")\n",
4141
"from asyncflow.settings import SimulationSettings\n",
4242
"\n",
@@ -46,7 +46,7 @@
4646
},
4747
{
4848
"cell_type": "code",
49-
"execution_count": 2,
49+
"execution_count": null,
5050
"id": "d1b7ad7d",
5151
"metadata": {},
5252
"outputs": [
@@ -64,7 +64,7 @@
6464
"\n",
6565
"# Public AsyncFlow API\n",
6666
"from asyncflow import AsyncFlow, SimulationRunner, Sweep\n",
67-
"from asyncflow.components import Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
67+
"from asyncflow.components import Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
6868
"from asyncflow.settings import SimulationSettings\n",
6969
"from asyncflow.analysis import ResultsAnalyzer, SweepAnalyzer, MMc\n",
7070
"from asyncflow.enums import Distribution\n",
@@ -94,7 +94,7 @@
9494
" Arrivals are produced by the same **two-stage, windowed Poisson sampler**: in each user-sampling window \\$\\Delta\\$, we draw the active users \\$U\\$ (Poisson or Normal, per config).\n",
9595
" Within the window, arrivals are a **homogeneous Poisson process** with rate \\$\\Lambda = U \\cdot \\lambda\\_r/60\\$.\n",
9696
"\n",
97-
" With **small \\$\\Delta\\$**, **Poisson users**, **long runs**, and **tiny edge latency**, the aggregate arrivals seen by the load balancer approximate a global Poisson input, yielding a good empirical match to the M/M/c model.\n",
97+
" \n",
9898
"\n",
9999
"---\n",
100100
"\n",
@@ -128,7 +128,7 @@
128128
},
129129
{
130130
"cell_type": "code",
131-
"execution_count": 3,
131+
"execution_count": null,
132132
"id": "ba93587a",
133133
"metadata": {},
134134
"outputs": [],
@@ -173,12 +173,12 @@
173173
" )\n",
174174
"\n",
175175
" edges = [\n",
176-
" Edge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n",
177-
" Edge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", latency=0.00001, dropout_rate=0),\n",
178-
" Edge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", latency=0.00001, dropout_rate=0),\n",
179-
" Edge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", latency=0.00001, dropout_rate=0),\n",
180-
" Edge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n",
181-
" Edge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n",
176+
" LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\",),\n",
177+
" LinkEdge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", ),\n",
178+
" LinkEdge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", ),\n",
179+
" LinkEdge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", ),\n",
180+
" LinkEdge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\",),\n",
181+
" LinkEdge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\",),\n",
182182
" ]\n",
183183
"\n",
184184
" settings = SimulationSettings(\n",

src/asyncflow/builder/asyncflow_builder.py

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
from typing import Self
66

7-
from asyncflow.config.enums import EventDescription
7+
from asyncflow.config.enums import EventDescription, SystemEdges
88
from asyncflow.schemas.arrivals.generator import ArrivalsGenerator
99
from asyncflow.schemas.events.injection import End, EventInjection, Start
1010
from asyncflow.schemas.payload import SimulationPayload
1111
from asyncflow.schemas.settings.simulation import SimulationSettings
12-
from asyncflow.schemas.topology.edges import Edge
12+
from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge
1313
from asyncflow.schemas.topology.graph import TopologyGraph
1414
from asyncflow.schemas.topology.nodes import (
1515
Client,
@@ -27,10 +27,12 @@ def __init__(self) -> None:
2727
self._arrivals: ArrivalsGenerator | None = None
2828
self._client: Client | None = None
2929
self._servers: list[Server] | None = None
30-
self._edges: list[Edge] | None = None
30+
self._net_edges: list[NetworkEdge] | None = None
31+
self._link_edges: list[LinkEdge] | None = None
3132
self._sim_settings: SimulationSettings | None = None
3233
self._load_balancer: LoadBalancer | None = None
3334
self._events: list[EventInjection] = []
35+
self._edges_kind: SystemEdges | None = None
3436

3537
def add_arrivals_generator(
3638
self,
@@ -64,18 +66,48 @@ def add_servers(self, *servers: Server) -> Self:
6466
self._servers.append(server)
6567
return self
6668

67-
def add_edges(self, *edges: Edge) -> Self:
68-
"""Method to instantiate the list of edges"""
69-
if self._edges is None:
70-
self._edges = []
69+
def add_edges(self, *edges: NetworkEdge | LinkEdge) -> Self:
70+
"""Add edges; enforces homogeneous type (all NetworkEdge or all LinkEdge)."""
71+
if not edges:
72+
return self
73+
74+
if self._edges_kind is None:
75+
first = edges[0]
76+
if isinstance(first, NetworkEdge):
77+
self._edges_kind = SystemEdges.NETWORK_CONNECTION
78+
self._net_edges = []
79+
elif isinstance(first, LinkEdge):
80+
self._edges_kind = SystemEdges.LINK_CONNECTION
81+
self._link_edges = []
82+
else:
83+
msg = "Edges must be NetworkEdge or LinkEdge."
84+
raise TypeError(msg)
85+
86+
assert self._edges_kind is not None
7187

72-
for edge in edges:
73-
if not isinstance(edge, Edge):
74-
msg = "All the instances must be of the type Edge"
88+
if self._edges_kind == SystemEdges.NETWORK_CONNECTION:
89+
assert self._net_edges is not None
90+
if any(not isinstance(e, NetworkEdge) for e in edges):
91+
msg = "Cannot mix LinkEdge with NetworkEdge."
7592
raise TypeError(msg)
76-
self._edges.append(edge)
93+
# ⬇️ Build a typed batch so mypy is happy
94+
net_batch: list[NetworkEdge] = [
95+
e for e in edges if isinstance(e, NetworkEdge)
96+
]
97+
self._net_edges.extend(net_batch)
98+
else:
99+
assert self._link_edges is not None
100+
if any(not isinstance(e, LinkEdge) for e in edges):
101+
msg = "Cannot mix NetworkEdge with LinkEdge."
102+
raise TypeError(msg)
103+
# ⬇️ Typed batch for LinkEdge
104+
link_batch: list[LinkEdge] = [e for e in edges if isinstance(e, LinkEdge)]
105+
self._link_edges.extend(link_batch)
106+
77107
return self
78108

109+
110+
79111
def add_simulation_settings(self, sim_settings: SimulationSettings) -> Self:
80112
"""Method to instantiate the settings for the simulation"""
81113
if not isinstance(sim_settings, SimulationSettings):
@@ -146,17 +178,31 @@ def add_server_outage(
146178
def build_payload(self) -> SimulationPayload:
147179
"""Method to build the payload for the simulation"""
148180
if self._arrivals is None:
149-
msg = "The generator input must be instantiated before the simulation"
181+
msg = "The arrivals generator must be instantiated before the simulation"
150182
raise ValueError(msg)
151183
if self._client is None:
152184
msg = "The client input must be instantiated before the simulation"
153185
raise ValueError(msg)
154186
if not self._servers:
155187
msg = "You must instantiate at least one server before the simulation"
156188
raise ValueError(msg)
157-
if not self._edges:
158-
msg = "You must instantiate edges before the simulation"
189+
if self._edges_kind is None:
190+
msg = "You must instantiate edges before the simulation."
159191
raise ValueError(msg)
192+
193+
# mypy facilitator
194+
edges_u: list[NetworkEdge] | list[LinkEdge]
195+
if self._edges_kind == SystemEdges.NETWORK_CONNECTION:
196+
if not self._net_edges:
197+
msg = "You must instantiate edges before the simulation."
198+
raise ValueError(msg)
199+
edges_u = self._net_edges
200+
else:
201+
if not self._link_edges:
202+
msg = "You must instantiate edges before the simulation."
203+
raise ValueError(msg)
204+
edges_u = self._link_edges
205+
160206
if self._sim_settings is None:
161207
msg = "The simulation settings must be instantiated before the simulation"
162208
raise ValueError(msg)
@@ -169,7 +215,7 @@ def build_payload(self) -> SimulationPayload:
169215

170216
graph = TopologyGraph(
171217
nodes = nodes,
172-
edges=self._edges,
218+
edges=edges_u,
173219
)
174220

175221
return SimulationPayload.model_validate({

src/asyncflow/components/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from asyncflow.schemas.arrivals.generator import ArrivalsGenerator
55
from asyncflow.schemas.events.injection import EventInjection
6-
from asyncflow.schemas.topology.edges import Edge
6+
from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge
77
from asyncflow.schemas.topology.endpoint import Endpoint
88
from asyncflow.schemas.topology.nodes import (
99
Client,
@@ -15,10 +15,11 @@
1515
__all__ = [
1616
"ArrivalsGenerator",
1717
"Client",
18-
"Edge",
1918
"Endpoint",
2019
"EventInjection",
20+
"LinkEdge",
2121
"LoadBalancer",
22+
"NetworkEdge",
2223
"NodesResources",
2324
"Server",
2425
]

src/asyncflow/config/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ class SystemEdges(StrEnum):
162162
"""
163163

164164
NETWORK_CONNECTION = "network_connection"
165+
LINK_CONNECTION = "link_connection"
165166

166167
# ======================================================================
167168
# CONSTANTS FOR THE EVENT TO INJECT IN THE SIMULATION

0 commit comments

Comments
 (0)