Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions .github/workflows/agentex-tutorials-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@ jobs:
id: get-tutorials
run: |
cd examples/tutorials
# Find all tutorials and exclude specific temporal ones
# Find all tutorials with a manifest.yaml
all_tutorials=$(find . -name "manifest.yaml" -exec dirname {} \; | sort | sed 's|^\./||')
# Filter out the specified temporal tutorials that are being updated
filtered_tutorials=$(echo "$all_tutorials" | grep -v -E "(temporal)")
# Include all tutorials (temporal tutorials are now included)
filtered_tutorials="$all_tutorials"
# Convert to JSON array
tutorials=$(echo "$filtered_tutorials" | jq -R -s -c 'split("\n") | map(select(length > 0))')
echo "tutorials=$tutorials" >> $GITHUB_OUTPUT
echo "All tutorials found: $(echo "$all_tutorials" | wc -l)"
echo "Filtered tutorials: $(echo "$filtered_tutorials" | wc -l)"
echo "Excluded tutorials:"
echo "$all_tutorials" | grep -E "(10_temporal/050_|10_temporal/070_|10_temporal/080_)" || echo " (none matched exclusion pattern)"
echo "Final tutorial list: $tutorials"
test-tutorial:
Expand All @@ -58,8 +55,20 @@ jobs:
- name: Pull latest AgentEx image
run: |
echo "🐳 Pulling latest Scale AgentEx Docker image..."
docker pull ghcr.io/scaleapi/scale-agentex/agentex:latest
echo "✅ Successfully pulled AgentEx Docker image"
max_attempts=3
attempt=1
while [ $attempt -le $max_attempts ]; do
echo "Attempt $attempt of $max_attempts..."
if docker pull ghcr.io/scaleapi/scale-agentex/agentex:latest; then
echo "✅ Successfully pulled AgentEx Docker image"
exit 0
fi
echo "❌ Pull failed, waiting before retry..."
sleep $((attempt * 10))
attempt=$((attempt + 1))
done
echo "❌ Failed to pull image after $max_attempts attempts"
exit 1
- name: Checkout scale-agentex repo
uses: actions/checkout@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,3 @@ def test_send_stream_message(self, client: Agentex, agent_name: str, agent_id: s

if __name__ == "__main__":
pytest.main([__file__, "-v"])

Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,55 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):
assert task is not None

# Poll for the initial task creation message
async for message in poll_messages(
client=client,
task_id=task.id,
timeout=30,
sleep_interval=1.0,
):
assert isinstance(message, TaskMessage)
if message.content and message.content.type == "text" and message.content.author == "agent":
assert "Hello! I've received your task" in message.content.content
break
task_creation_message_found = False

async def poll_for_task_creation() -> None:
nonlocal task_creation_message_found
async for message in poll_messages(
client=client,
task_id=task.id,
timeout=30,
sleep_interval=1.0,
):
assert isinstance(message, TaskMessage)
if message.content and message.content.type == "text" and message.content.author == "agent":
assert "Hello! I've received your task" in message.content.content
task_creation_message_found = True
break

try:
await asyncio.wait_for(poll_for_task_creation(), timeout=30)
Copy link
Contributor

@smoreinis smoreinis Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we wrap poll_messages in an asyncio.wait_for if poll_messages already has support for a timeout?

i can see you mentioned in the commit description that we now "break out of the loops when the expected messages have been processed" but it looks like at least in this test case we were also breaking out of the for loops when we found the expected messages before?

were we somehow polling for 30 seconds either way even if we found the task creation message?

except asyncio.TimeoutError:
pytest.fail("Polling timed out waiting for task creation message")

assert task_creation_message_found, "Task creation message not found"

# Send an event and poll for response
user_message = "Hello, this is a test message!"
async for message in send_event_and_poll_yielding(
client=client,
agent_id=agent_id,
task_id=task.id,
user_message=user_message,
timeout=30,
sleep_interval=1.0,
):
assert isinstance(message, TaskMessage)
if message.content and message.content.type == "text" and message.content.author == "agent":
assert "Hello! I've received your task" in message.content.content
break
agent_response_found = False

async def poll_for_response() -> None:
nonlocal agent_response_found
async for message in send_event_and_poll_yielding(
client=client,
agent_id=agent_id,
task_id=task.id,
user_message=user_message,
timeout=30,
sleep_interval=1.0,
):
assert isinstance(message, TaskMessage)
if message.content and message.content.type == "text" and message.content.author == "agent":
if "Hello! I've received your message" in message.content.content:
agent_response_found = True
break

try:
await asyncio.wait_for(poll_for_response(), timeout=30)
except asyncio.TimeoutError:
pytest.fail("Polling timed out waiting for agent response")

assert agent_response_found, "Agent response not found"


class TestStreamingEvents:
Expand All @@ -111,18 +135,26 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str):

assert task is not None
task_creation_found = False

# Poll for the initial task creation message
async for message in poll_messages(
client=client,
task_id=task.id,
timeout=30,
sleep_interval=1.0,
):
assert isinstance(message, TaskMessage)
if message.content and message.content.type == "text" and message.content.author == "agent":
assert "Hello! I've received your task" in message.content.content
task_creation_found = True
break
async def poll_for_task_creation() -> None:
nonlocal task_creation_found
async for message in poll_messages(
client=client,
task_id=task.id,
timeout=30,
sleep_interval=1.0,
):
assert isinstance(message, TaskMessage)
if message.content and message.content.type == "text" and message.content.author == "agent":
assert "Hello! I've received your task" in message.content.content
task_creation_found = True
break

try:
await asyncio.wait_for(poll_for_task_creation(), timeout=30)
except asyncio.TimeoutError:
pytest.fail("Polling timed out waiting for task creation message")

assert task_creation_found, "Task creation message not found in poll"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,48 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):

user_message = "Hello! Here is my test message"
messages = []
async for message in send_event_and_poll_yielding(
client=client,
agent_id=agent_id,
task_id=task.id,
user_message=user_message,
timeout=30,
sleep_interval=1.0,
):
messages.append(message)
if len(messages) == 1:
assert message.content == TextContent(
author="user",
content=user_message,
type="text",
)
else:
assert message.content is not None
assert message.content.author == "agent"
break

# Flags to track what we've received
user_message_found = False
agent_response_found = False

async def poll_for_messages() -> None:
nonlocal user_message_found, agent_response_found

async for message in send_event_and_poll_yielding(
client=client,
agent_id=agent_id,
task_id=task.id,
user_message=user_message,
timeout=30,
sleep_interval=1.0,
):
messages.append(message)

# Validate messages as they arrive
if message.content and hasattr(message.content, 'author'):
if message.content.author == "user" and message.content.content == user_message:
assert message.content == TextContent(
author="user",
content=user_message,
type="text",
)
user_message_found = True
elif message.content.author == "agent":
assert user_message_found, "Agent response arrived before user message"
agent_response_found = True

# Exit early if we've found all expected messages
if user_message_found and agent_response_found:
break

try:
await asyncio.wait_for(poll_for_messages(), timeout=30)
except asyncio.TimeoutError:
pytest.fail("Polling timed out after 30s waiting for expected messages")

assert user_message_found, "User message not found"
assert agent_response_found, "Agent response not found"

await asyncio.sleep(1) # wait for state to be updated
states = await client.states.list(agent_id=agent_id, task_id=task.id)
Expand Down
115 changes: 69 additions & 46 deletions examples/tutorials/10_async/00_base/020_streaming/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
)

from agentex import AsyncAgentex
from agentex.types import TaskMessage, TextContent
from agentex.types.agent_rpc_params import ParamsCreateTaskRequest
from agentex.types.text_content_param import TextContentParam

Expand Down Expand Up @@ -89,32 +88,48 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):

user_message = "Hello! Here is my test message"
messages = []
async for message in send_event_and_poll_yielding(
client=client,
agent_id=agent_id,
task_id=task.id,
user_message=user_message,
timeout=30,
sleep_interval=1.0,
yield_updates=False,
):

messages.append(message)

assert len(messages) > 0
# the first message should be the agent re-iterating what the user sent
assert isinstance(messages, List)
assert len(messages) == 2
first_message: TaskMessage = messages[0]
assert first_message.content == TextContent(
author="user",
content=user_message,
type="text",
)

second_message: TaskMessage = messages[1]
assert second_message.content is not None
assert second_message.content.author == "agent"

# Flags to track what we've received
user_message_found = False
agent_response_found = False

async def poll_for_messages() -> None:
nonlocal user_message_found, agent_response_found

async for message in send_event_and_poll_yielding(
client=client,
agent_id=agent_id,
task_id=task.id,
user_message=user_message,
timeout=30,
sleep_interval=1.0,
yield_updates=False,
):
messages.append(message)

# Validate messages as they come in
if message.content and hasattr(message.content, 'author'):
if message.content.author == "user" and message.content.content == user_message:
user_message_found = True
elif message.content.author == "agent":
# Agent response should come after user message
assert user_message_found, "Agent response arrived before user message"
agent_response_found = True

# Exit early if we've found all expected messages
if user_message_found and agent_response_found:
break

# Run polling with timeout
try:
await asyncio.wait_for(poll_for_messages(), timeout=30)
except asyncio.TimeoutError:
pytest.fail("Polling timed out after 30s waiting for expected messages")

# Validate we received expected messages
assert len(messages) >= 2, "Expected at least 2 messages (user + agent)"
assert user_message_found, "User message not found"
assert agent_response_found, "Agent response not found"

# assert the state has been updated
await asyncio.sleep(1) # wait for state to be updated
Expand Down Expand Up @@ -158,41 +173,49 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str):
# Collect events from stream
all_events = []

# Flags to track what we've received
user_message_found = False
full_agent_message_found = False
delta_messages_found = False

async def stream_messages() -> None:
nonlocal user_message_found, full_agent_message_found, delta_messages_found

async for event in stream_agent_response(
client=client,
task_id=task.id,
timeout=15,
):
all_events.append(event)

# Check events as they arrive
event_type = event.get("type")
if event_type == "full":
content = event.get("content", {})
if content.get("content") == user_message and content.get("author") == "user":
user_message_found = True
elif content.get("author") == "agent":
full_agent_message_found = True
elif event_type == "delta":
delta_messages_found = True

# Exit early if we've found all expected messages
if user_message_found and full_agent_message_found and delta_messages_found:
break

stream_task = asyncio.create_task(stream_messages())

event_content = TextContentParam(type="text", author="user", content=user_message)
await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content})

# Wait for streaming to complete
await stream_task
# Wait for streaming to complete (with timeout)
try:
await asyncio.wait_for(stream_task, timeout=15)
except asyncio.TimeoutError:
pytest.fail("Stream timed out after 15s waiting for expected messages")

# Validate we received events
assert len(all_events) > 0, "No events received in streaming response"

# Check for user message, full agent response, and delta messages
user_message_found = False
full_agent_message_found = False
delta_messages_found = False

for event in all_events:
event_type = event.get("type")
if event_type == "full":
content = event.get("content", {})
if content.get("content") == user_message and content.get("author") == "user":
user_message_found = True
elif content.get("author") == "agent":
full_agent_message_found = True
elif event_type == "delta":
delta_messages_found = True

assert user_message_found, "User message not found in stream"
assert full_agent_message_found, "Full agent message not found in stream"
assert delta_messages_found, "Delta messages not found in stream (streaming response expected)"
Expand Down
Loading
Loading