diff --git a/src/agents/base_agent.py b/src/agents/base_agent.py index e57d5ab..e206daa 100644 --- a/src/agents/base_agent.py +++ b/src/agents/base_agent.py @@ -2,9 +2,12 @@ import os import uuid +import json +import requests from typing import Dict, Any, Optional from pydantic import SecretStr from openhands.sdk import LLM, Agent, Conversation, RemoteWorkspace + from openhands.tools.preset.default import get_default_tools from config.config import Config @@ -25,6 +28,22 @@ def __init__(self, config: Config, agent_id: str, workspace: RemoteWorkspace): api_key=SecretStr(config.llm_api_key) if config.llm_api_key else None, base_url=config.llm_base_url, ) + + # Note: Secrets are created dynamically to avoid serialization issues + + def _create_secrets(self) -> Dict[str, Any]: + """Create secrets dictionary for conversation use.""" + secrets = {} + + # Add GITHUB_TOKEN if available + github_token = os.getenv('GITHUB_TOKEN') + if github_token: + secrets['GITHUB_TOKEN'] = github_token # Pass as string directly + + # Add other common secrets as needed + # You can add more secrets here following the same pattern + + return secrets def create_workspace_directory(self, prefix: str) -> str: """Create a unique workspace directory.""" @@ -32,42 +51,201 @@ def create_workspace_directory(self, prefix: str) -> str: self.workspace.execute_command(f"mkdir -p {workspace_dir}") return workspace_dir - def clone_repository(self, repo_url: str, target_dir: str) -> bool: - """Clone a repository to the target directory.""" - github_token = os.getenv('GITHUB_TOKEN') - if github_token and 'github.com' in repo_url: - # Use token for GitHub repositories + def clone_repository(self, repo_url: str, target_dir: str, conversation: Optional[Conversation] = None) -> bool: + """Clone a repository to the target directory using secrets manager. + + Args: + repo_url: Git URL of the repository + target_dir: Target directory for cloning + conversation: Optional conversation instance for secret injection + + Returns: + True if clone was successful, False otherwise + """ + if 'github.com' in repo_url and os.getenv('GITHUB_TOKEN'): + # Use authenticated clone with token from secrets manager if repo_url.startswith('https://github.com/'): - auth_url = repo_url.replace('https://github.com/', f'https://{github_token}@github.com/') + # The GITHUB_TOKEN will be automatically injected by the secrets manager + # when the conversation executes the git clone command + auth_url = repo_url.replace('https://github.com/', 'https://$GITHUB_TOKEN@github.com/') else: auth_url = repo_url - clone_result = self.workspace.execute_command(f"cd {os.path.dirname(target_dir)} && git clone {auth_url} {os.path.basename(target_dir)}") - else: - # Clone without authentication for public repos - clone_result = self.workspace.execute_command(f"cd {os.path.dirname(target_dir)} && git clone {repo_url} {os.path.basename(target_dir)}") + + if conversation: + # Use conversation to execute with automatic secret injection + conversation.send_message(f"cd {os.path.dirname(target_dir)} && git clone {auth_url} {os.path.basename(target_dir)}") + conversation.run() + # Check if directory was created successfully + check_result = self.workspace.execute_command(f"test -d {target_dir}") + return check_result.exit_code == 0 + else: + # Fallback to direct workspace execution (less secure) + github_token = os.getenv('GITHUB_TOKEN') + if github_token: + auth_url = repo_url.replace('https://github.com/', f'https://{github_token}@github.com/') + clone_result = self.workspace.execute_command(f"cd {os.path.dirname(target_dir)} && git clone {auth_url} {os.path.basename(target_dir)}") + return clone_result.exit_code == 0 + # Clone without authentication for public repos + clone_result = self.workspace.execute_command(f"cd {os.path.dirname(target_dir)} && git clone {repo_url} {os.path.basename(target_dir)}") return clone_result.exit_code == 0 def create_agent_conversation(self) -> tuple[Agent, Conversation]: - """Create an agent and conversation instance.""" + """Create an agent and conversation instance with secrets properly configured.""" agent = Agent( llm=self.llm, tools=get_default_tools(enable_browser=False), ) + # Create secrets dynamically to avoid serialization issues + secrets = self._create_secrets() + conversation = Conversation( agent=agent, workspace=self.workspace, visualizer=None, + secrets=secrets, # Pass secrets to the conversation ) + # Update secrets in the conversation (this ensures they're properly registered) + if secrets: + conversation.update_secrets(secrets) + return agent, conversation - def setup_git_environment(self): - """Set up git environment variables.""" + def create_github_pr(self, repo_owner: str, repo_name: str, title: str, body: str, + head_branch: str, base_branch: str = "main") -> Dict[str, Any]: + """Create a GitHub Pull Request using the GitHub API. + + Args: + repo_owner: GitHub repository owner + repo_name: GitHub repository name + title: PR title + body: PR description + head_branch: Source branch for the PR + base_branch: Target branch for the PR (default: main) + + Returns: + Dictionary with PR creation results + """ github_token = os.getenv('GITHUB_TOKEN') - if github_token: - self.workspace.execute_command(f"export GITHUB_TOKEN={github_token}") + if not github_token: + return { + "success": False, + "error": "GITHUB_TOKEN not available", + "pr_url": None + } + + url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/pulls" + headers = { + "Authorization": f"token {github_token}", + "Accept": "application/vnd.github.v3+json", + "Content-Type": "application/json" + } + + data = { + "title": title, + "body": body, + "head": head_branch, + "base": base_branch + } + + try: + response = requests.post(url, headers=headers, json=data) + response.raise_for_status() + + pr_data = response.json() + return { + "success": True, + "pr_url": pr_data.get("html_url"), + "pr_number": pr_data.get("number"), + "pr_data": pr_data + } + except requests.exceptions.RequestException as e: + return { + "success": False, + "error": f"Failed to create PR: {str(e)}", + "pr_url": None + } + + def create_github_branch(self, conversation: Conversation, repo_dir: str, + branch_name: str, base_branch: str = "main") -> bool: + """Create a new branch in the repository using the conversation. + + Args: + conversation: Conversation instance with secrets configured + repo_dir: Repository directory path + branch_name: Name of the new branch + base_branch: Base branch to create from (default: main) + + Returns: + True if branch creation was successful + """ + try: + # Create and checkout new branch + conversation.send_message(f""" +cd {repo_dir} +git checkout {base_branch} +git pull origin {base_branch} +git checkout -b {branch_name} +""") + conversation.run() + + # Verify branch was created + check_result = self.workspace.execute_command(f"cd {repo_dir} && git branch --show-current") + return check_result.stdout.strip() == branch_name + except Exception: + return False + + def push_changes_and_create_pr(self, conversation: Conversation, repo_dir: str, + repo_owner: str, repo_name: str, branch_name: str, + commit_message: str, pr_title: str, pr_body: str) -> Dict[str, Any]: + """Push changes to GitHub and create a Pull Request. + + Args: + conversation: Conversation instance with secrets configured + repo_dir: Repository directory path + repo_owner: GitHub repository owner + repo_name: GitHub repository name + branch_name: Branch name to push + commit_message: Git commit message + pr_title: Pull Request title + pr_body: Pull Request description + + Returns: + Dictionary with operation results + """ + try: + # Stage, commit, and push changes + conversation.send_message(f""" +cd {repo_dir} +git add . +git commit -m "{commit_message}" +git push -u origin {branch_name} +""") + conversation.run() + + # Create PR using GitHub API + pr_result = self.create_github_pr( + repo_owner=repo_owner, + repo_name=repo_name, + title=pr_title, + body=pr_body, + head_branch=branch_name + ) + + return { + "success": pr_result["success"], + "pr_url": pr_result.get("pr_url"), + "pr_number": pr_result.get("pr_number"), + "error": pr_result.get("error") + } + except Exception as e: + return { + "success": False, + "error": f"Failed to push changes and create PR: {str(e)}", + "pr_url": None + } def cleanup_directory(self, directory: str): """Clean up a workspace directory.""" diff --git a/src/agents/cve_solver.py b/src/agents/cve_solver.py index 50c8430..db55e36 100644 --- a/src/agents/cve_solver.py +++ b/src/agents/cve_solver.py @@ -50,22 +50,19 @@ def solve_vulnerability(self, vulnerability: Dict[str, Any], repo_url: str, port if activity_callback: activity_callback("Setting up workspace") - # Clone repository to workspace + # Create agent with default tools and secrets if activity_callback: - activity_callback("Cloning repository") + activity_callback("Initializing AI agent with secrets") - if not self.clone_repository(repo_url, repo_dir): - result["error"] = "Failed to clone repository" - return result + agent, conversation = self.create_agent_conversation() - # Create agent with default tools + # Clone repository to workspace using conversation (for secret injection) if activity_callback: - activity_callback("Initializing AI agent") - - agent, conversation = self.create_agent_conversation() + activity_callback("Cloning repository with authentication") - # Set up environment variables for the agent - self.setup_git_environment() + if not self.clone_repository(repo_url, repo_dir, conversation): + result["error"] = "Failed to clone repository" + return result # Create fix instructions fix_message = self.prompt_manager.get_solver_main_prompt( @@ -88,7 +85,7 @@ def solve_vulnerability(self, vulnerability: Dict[str, Any], repo_url: str, port result = self._get_fix_summary(conversation, solver_dir, result, activity_callback) # Final step - ensure PR creation - self._finalize_pull_request(conversation, vulnerability, activity_callback) + self._finalize_pull_request(conversation, vulnerability, activity_callback, repo_url, repo_dir) except Exception as e: result["error"] = str(e) @@ -147,11 +144,78 @@ def _get_fix_summary(self, conversation, solver_dir: str, result: Dict[str, Any] return result - def _finalize_pull_request(self, conversation, vulnerability: Dict[str, Any], activity_callback): - """Ensure pull request is created.""" + def _finalize_pull_request(self, conversation, vulnerability: Dict[str, Any], activity_callback, repo_url: str = None, repo_dir: str = None): + """Create pull request using GitHub API.""" if activity_callback: - activity_callback("Finalizing pull request") - pr_message = self.prompt_manager.get_solver_pr_finalize_prompt(vulnerability=vulnerability) - conversation.send_message(pr_message) + activity_callback("Creating pull request via GitHub API") + + if not repo_url or not repo_dir: + # Fallback to old method if repo info not available + pr_message = self.prompt_manager.get_solver_pr_finalize_prompt(vulnerability=vulnerability) + conversation.send_message(pr_message) + self._run_with_activity_updates(conversation, activity_callback) + return - self._run_with_activity_updates(conversation, activity_callback) \ No newline at end of file + try: + # Extract repo owner and name from URL + repo_parts = repo_url.replace('https://github.com/', '').replace('.git', '').split('/') + if len(repo_parts) >= 2: + repo_owner, repo_name = repo_parts[0], repo_parts[1] + + # Create branch name + branch_name = f"fix/cve-{vulnerability.get('id', 'unknown')}" + + # Create and push branch with fixes + if activity_callback: + activity_callback("Creating feature branch and pushing changes") + + # Create branch + self.create_github_branch(conversation, repo_dir, branch_name) + + # Push changes and create PR + pr_result = self.push_changes_and_create_pr( + conversation=conversation, + repo_dir=repo_dir, + repo_owner=repo_owner, + repo_name=repo_name, + branch_name=branch_name, + commit_message=f"Fix {vulnerability.get('id', 'CVE')}: {vulnerability.get('title', 'Security vulnerability')}", + pr_title=f"Security Fix: {vulnerability.get('title', 'CVE Vulnerability')}", + pr_body=f"""## Summary +This PR fixes the security vulnerability {vulnerability.get('id', 'identified by our scanner')}. + +## Vulnerability Details +- **CVE ID**: {vulnerability.get('id', 'N/A')} +- **Severity**: {vulnerability.get('severity', 'Unknown')} +- **Description**: {vulnerability.get('description', 'Security vulnerability detected')} + +## Changes Made +The OpenHands agent has automatically analyzed and applied the necessary security fixes. + +## Testing +Please review the changes and run your security tests to verify the fix. + +--- +*This PR was created automatically by OpenHands CVE Solver Agent* +""" + ) + + if pr_result.get("success"): + if activity_callback: + activity_callback(f"Pull request created: {pr_result.get('pr_url')}") + else: + if activity_callback: + activity_callback(f"Failed to create PR: {pr_result.get('error')}") + else: + # Fallback to old method + pr_message = self.prompt_manager.get_solver_pr_finalize_prompt(vulnerability=vulnerability) + conversation.send_message(pr_message) + self._run_with_activity_updates(conversation, activity_callback) + + except Exception as e: + if activity_callback: + activity_callback(f"PR creation failed: {str(e)}") + # Fallback to old method + pr_message = self.prompt_manager.get_solver_pr_finalize_prompt(vulnerability=vulnerability) + conversation.send_message(pr_message) + self._run_with_activity_updates(conversation, activity_callback) \ No newline at end of file