Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 193 additions & 15 deletions src/agents/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import os
import uuid
import json
import requests
from typing import Dict, Any, Optional
from pydantic import SecretStr
from openhands.sdk import LLM, Agent, Conversation, RemoteWorkspace

from openhands.tools.preset.default import get_default_tools

from config.config import Config
Expand All @@ -25,49 +28,224 @@ def __init__(self, config: Config, agent_id: str, workspace: RemoteWorkspace):
api_key=SecretStr(config.llm_api_key) if config.llm_api_key else None,
base_url=config.llm_base_url,
)

# Note: Secrets are created dynamically to avoid serialization issues

def _create_secrets(self) -> Dict[str, Any]:
"""Create secrets dictionary for conversation use."""
secrets = {}

# Add GITHUB_TOKEN if available
github_token = os.getenv('GITHUB_TOKEN')
if github_token:
secrets['GITHUB_TOKEN'] = github_token # Pass as string directly

# Add other common secrets as needed
# You can add more secrets here following the same pattern

return secrets

def create_workspace_directory(self, prefix: str) -> str:
"""Create a unique workspace directory."""
workspace_dir = f"/workspace/{prefix}_{self.agent_id}_{uuid.uuid4().hex[:8]}"
self.workspace.execute_command(f"mkdir -p {workspace_dir}")
return workspace_dir

def clone_repository(self, repo_url: str, target_dir: str) -> bool:
"""Clone a repository to the target directory."""
github_token = os.getenv('GITHUB_TOKEN')
if github_token and 'github.com' in repo_url:
# Use token for GitHub repositories
def clone_repository(self, repo_url: str, target_dir: str, conversation: Optional[Conversation] = None) -> bool:
"""Clone a repository to the target directory using secrets manager.

Args:
repo_url: Git URL of the repository
target_dir: Target directory for cloning
conversation: Optional conversation instance for secret injection

Returns:
True if clone was successful, False otherwise
"""
if 'github.com' in repo_url and os.getenv('GITHUB_TOKEN'):
# Use authenticated clone with token from secrets manager
if repo_url.startswith('https://github.com/'):
auth_url = repo_url.replace('https://github.com/', f'https://{github_token}@github.com/')
# The GITHUB_TOKEN will be automatically injected by the secrets manager
# when the conversation executes the git clone command
auth_url = repo_url.replace('https://github.com/', 'https://[email protected]/')
else:
auth_url = repo_url
clone_result = self.workspace.execute_command(f"cd {os.path.dirname(target_dir)} && git clone {auth_url} {os.path.basename(target_dir)}")
else:
# Clone without authentication for public repos
clone_result = self.workspace.execute_command(f"cd {os.path.dirname(target_dir)} && git clone {repo_url} {os.path.basename(target_dir)}")

if conversation:
# Use conversation to execute with automatic secret injection
conversation.send_message(f"cd {os.path.dirname(target_dir)} && git clone {auth_url} {os.path.basename(target_dir)}")
conversation.run()
# Check if directory was created successfully
check_result = self.workspace.execute_command(f"test -d {target_dir}")
return check_result.exit_code == 0
else:
# Fallback to direct workspace execution (less secure)
github_token = os.getenv('GITHUB_TOKEN')
if github_token:
auth_url = repo_url.replace('https://github.com/', f'https://{github_token}@github.com/')
clone_result = self.workspace.execute_command(f"cd {os.path.dirname(target_dir)} && git clone {auth_url} {os.path.basename(target_dir)}")
return clone_result.exit_code == 0

# Clone without authentication for public repos
clone_result = self.workspace.execute_command(f"cd {os.path.dirname(target_dir)} && git clone {repo_url} {os.path.basename(target_dir)}")
return clone_result.exit_code == 0

def create_agent_conversation(self) -> tuple[Agent, Conversation]:
"""Create an agent and conversation instance."""
"""Create an agent and conversation instance with secrets properly configured."""
agent = Agent(
llm=self.llm,
tools=get_default_tools(enable_browser=False),
)

# Create secrets dynamically to avoid serialization issues
secrets = self._create_secrets()

conversation = Conversation(
agent=agent,
workspace=self.workspace,
visualizer=None,
secrets=secrets, # Pass secrets to the conversation
)

# Update secrets in the conversation (this ensures they're properly registered)
if secrets:
conversation.update_secrets(secrets)

return agent, conversation

def setup_git_environment(self):
"""Set up git environment variables."""
def create_github_pr(self, repo_owner: str, repo_name: str, title: str, body: str,
head_branch: str, base_branch: str = "main") -> Dict[str, Any]:
"""Create a GitHub Pull Request using the GitHub API.

Args:
repo_owner: GitHub repository owner
repo_name: GitHub repository name
title: PR title
body: PR description
head_branch: Source branch for the PR
base_branch: Target branch for the PR (default: main)

Returns:
Dictionary with PR creation results
"""
github_token = os.getenv('GITHUB_TOKEN')
if github_token:
self.workspace.execute_command(f"export GITHUB_TOKEN={github_token}")
if not github_token:
return {
"success": False,
"error": "GITHUB_TOKEN not available",
"pr_url": None
}

url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/pulls"
headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github.v3+json",
"Content-Type": "application/json"
}

data = {
"title": title,
"body": body,
"head": head_branch,
"base": base_branch
}

try:
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()

pr_data = response.json()
return {
"success": True,
"pr_url": pr_data.get("html_url"),
"pr_number": pr_data.get("number"),
"pr_data": pr_data
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"error": f"Failed to create PR: {str(e)}",
"pr_url": None
}

def create_github_branch(self, conversation: Conversation, repo_dir: str,
branch_name: str, base_branch: str = "main") -> bool:
"""Create a new branch in the repository using the conversation.

Args:
conversation: Conversation instance with secrets configured
repo_dir: Repository directory path
branch_name: Name of the new branch
base_branch: Base branch to create from (default: main)

Returns:
True if branch creation was successful
"""
try:
# Create and checkout new branch
conversation.send_message(f"""
cd {repo_dir}
git checkout {base_branch}
git pull origin {base_branch}
git checkout -b {branch_name}
""")
conversation.run()

# Verify branch was created
check_result = self.workspace.execute_command(f"cd {repo_dir} && git branch --show-current")
return check_result.stdout.strip() == branch_name
except Exception:
return False

def push_changes_and_create_pr(self, conversation: Conversation, repo_dir: str,
repo_owner: str, repo_name: str, branch_name: str,
commit_message: str, pr_title: str, pr_body: str) -> Dict[str, Any]:
"""Push changes to GitHub and create a Pull Request.

Args:
conversation: Conversation instance with secrets configured
repo_dir: Repository directory path
repo_owner: GitHub repository owner
repo_name: GitHub repository name
branch_name: Branch name to push
commit_message: Git commit message
pr_title: Pull Request title
pr_body: Pull Request description

Returns:
Dictionary with operation results
"""
try:
# Stage, commit, and push changes
conversation.send_message(f"""
cd {repo_dir}
git add .
git commit -m "{commit_message}"
git push -u origin {branch_name}
""")
conversation.run()

# Create PR using GitHub API
pr_result = self.create_github_pr(
repo_owner=repo_owner,
repo_name=repo_name,
title=pr_title,
body=pr_body,
head_branch=branch_name
)

return {
"success": pr_result["success"],
"pr_url": pr_result.get("pr_url"),
"pr_number": pr_result.get("pr_number"),
"error": pr_result.get("error")
}
except Exception as e:
return {
"success": False,
"error": f"Failed to push changes and create PR: {str(e)}",
"pr_url": None
}

def cleanup_directory(self, directory: str):
"""Clean up a workspace directory."""
Expand Down
100 changes: 82 additions & 18 deletions src/agents/cve_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,19 @@ def solve_vulnerability(self, vulnerability: Dict[str, Any], repo_url: str, port
if activity_callback:
activity_callback("Setting up workspace")

# Clone repository to workspace
# Create agent with default tools and secrets
if activity_callback:
activity_callback("Cloning repository")
activity_callback("Initializing AI agent with secrets")

if not self.clone_repository(repo_url, repo_dir):
result["error"] = "Failed to clone repository"
return result
agent, conversation = self.create_agent_conversation()

# Create agent with default tools
# Clone repository to workspace using conversation (for secret injection)
if activity_callback:
activity_callback("Initializing AI agent")

agent, conversation = self.create_agent_conversation()
activity_callback("Cloning repository with authentication")

# Set up environment variables for the agent
self.setup_git_environment()
if not self.clone_repository(repo_url, repo_dir, conversation):
result["error"] = "Failed to clone repository"
return result

# Create fix instructions
fix_message = self.prompt_manager.get_solver_main_prompt(
Expand All @@ -88,7 +85,7 @@ def solve_vulnerability(self, vulnerability: Dict[str, Any], repo_url: str, port
result = self._get_fix_summary(conversation, solver_dir, result, activity_callback)

# Final step - ensure PR creation
self._finalize_pull_request(conversation, vulnerability, activity_callback)
self._finalize_pull_request(conversation, vulnerability, activity_callback, repo_url, repo_dir)

except Exception as e:
result["error"] = str(e)
Expand Down Expand Up @@ -147,11 +144,78 @@ def _get_fix_summary(self, conversation, solver_dir: str, result: Dict[str, Any]

return result

def _finalize_pull_request(self, conversation, vulnerability: Dict[str, Any], activity_callback):
"""Ensure pull request is created."""
def _finalize_pull_request(self, conversation, vulnerability: Dict[str, Any], activity_callback, repo_url: str = None, repo_dir: str = None):
"""Create pull request using GitHub API."""
if activity_callback:
activity_callback("Finalizing pull request")
pr_message = self.prompt_manager.get_solver_pr_finalize_prompt(vulnerability=vulnerability)
conversation.send_message(pr_message)
activity_callback("Creating pull request via GitHub API")

if not repo_url or not repo_dir:
# Fallback to old method if repo info not available
pr_message = self.prompt_manager.get_solver_pr_finalize_prompt(vulnerability=vulnerability)
conversation.send_message(pr_message)
self._run_with_activity_updates(conversation, activity_callback)
return

self._run_with_activity_updates(conversation, activity_callback)
try:
# Extract repo owner and name from URL
repo_parts = repo_url.replace('https://github.com/', '').replace('.git', '').split('/')
if len(repo_parts) >= 2:
repo_owner, repo_name = repo_parts[0], repo_parts[1]

# Create branch name
branch_name = f"fix/cve-{vulnerability.get('id', 'unknown')}"

# Create and push branch with fixes
if activity_callback:
activity_callback("Creating feature branch and pushing changes")

# Create branch
self.create_github_branch(conversation, repo_dir, branch_name)

# Push changes and create PR
pr_result = self.push_changes_and_create_pr(
conversation=conversation,
repo_dir=repo_dir,
repo_owner=repo_owner,
repo_name=repo_name,
branch_name=branch_name,
commit_message=f"Fix {vulnerability.get('id', 'CVE')}: {vulnerability.get('title', 'Security vulnerability')}",
pr_title=f"Security Fix: {vulnerability.get('title', 'CVE Vulnerability')}",
pr_body=f"""## Summary
This PR fixes the security vulnerability {vulnerability.get('id', 'identified by our scanner')}.

## Vulnerability Details
- **CVE ID**: {vulnerability.get('id', 'N/A')}
- **Severity**: {vulnerability.get('severity', 'Unknown')}
- **Description**: {vulnerability.get('description', 'Security vulnerability detected')}

## Changes Made
The OpenHands agent has automatically analyzed and applied the necessary security fixes.

## Testing
Please review the changes and run your security tests to verify the fix.

---
*This PR was created automatically by OpenHands CVE Solver Agent*
"""
)

if pr_result.get("success"):
if activity_callback:
activity_callback(f"Pull request created: {pr_result.get('pr_url')}")
else:
if activity_callback:
activity_callback(f"Failed to create PR: {pr_result.get('error')}")
else:
# Fallback to old method
pr_message = self.prompt_manager.get_solver_pr_finalize_prompt(vulnerability=vulnerability)
conversation.send_message(pr_message)
self._run_with_activity_updates(conversation, activity_callback)

except Exception as e:
if activity_callback:
activity_callback(f"PR creation failed: {str(e)}")
# Fallback to old method
pr_message = self.prompt_manager.get_solver_pr_finalize_prompt(vulnerability=vulnerability)
conversation.send_message(pr_message)
self._run_with_activity_updates(conversation, activity_callback)