forked from geekan/MetaGPT
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
431 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# -*- coding: utf-8 -*- | ||
# @Author : stellahong ([email protected]) | ||
# @Desc : |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
# -*- coding: utf-8 -*- | ||
# @Author : stellahong ([email protected]) | ||
# @Desc : | ||
import os | ||
from pathlib import Path | ||
|
||
from tqdm.auto import tqdm | ||
|
||
from data.inference.const import TESTBED | ||
from metagpt.logs import logger | ||
from swe_bench.make_datasets.make_instance import prompt_style_2_edits_only | ||
from swe_bench.utils.parse_diff import filter_changed_line | ||
from swe_bench.utils.repo_utils import EnvManager | ||
|
||
|
||
def reset_task_env(instance: dict = {}): | ||
# reset the env via git reset and git checkout | ||
env_manager = EnvManager(testbed=TESTBED) | ||
|
||
patch = instance["patch"] | ||
repo = instance["repo"] | ||
instance["version"] | ||
repo_prefix = repo.replace("/", "__") | ||
repo_path = os.path.join(env_manager.testbed, repo_prefix) | ||
|
||
if not os.path.exists(repo_path): | ||
return patch, repo, None | ||
os.chdir(repo_path) | ||
if not env_manager.reset_task_env(instance=instance): | ||
return patch, repo, None | ||
|
||
return patch, repo, repo_path | ||
|
||
|
||
def reset_and_copy(instance: dict = {}): | ||
patch, repo, repo_path = reset_task_env(instance) | ||
if repo_path is None: | ||
return | ||
env_manager = EnvManager(testbed=TESTBED) | ||
repo_prefix = repo.replace("/", "__") | ||
version = instance["version"] | ||
destination_path = os.path.join(repo_path, f"{repo_prefix}__{version}") | ||
env_manager.copy_repo(source_path=repo_path, destination_path=destination_path) | ||
|
||
|
||
def make_oracle_collapsed_instance(instance): | ||
# for each instance, reset task env | ||
patch, repo, repo_path = reset_task_env(instance) | ||
if repo_path is None: | ||
return | ||
file_changes = filter_changed_line(patch) | ||
prompt = prompt_style_2_edits_only(instance, Path(repo_path), list(file_changes.keys())) | ||
logger.info(prompt) | ||
# todo: save output | ||
return {} | ||
|
||
|
||
def make_oracle_collapsed_dataset(dataset): | ||
for datum in tqdm(dataset, desc="Inference "): | ||
make_oracle_collapsed_instance(instance=datum) | ||
# todo: save output |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
from pathlib import Path | ||
|
||
import unidiff | ||
|
||
PATCH_EXAMPLE = """--- a/file.py | ||
+++ b/file.py | ||
@@ -1,27 +1,35 @@ | ||
def euclidean(a, b): | ||
- while b: | ||
- a, b = b, a % b | ||
- return a | ||
+ if b == 0: | ||
+ return a | ||
+ return euclidean(b, a % b) | ||
def bresenham(x0, y0, x1, y1): | ||
points = [] | ||
dx = abs(x1 - x0) | ||
dy = abs(y1 - y0) | ||
- sx = 1 if x0 < x1 else -1 | ||
- sy = 1 if y0 < y1 else -1 | ||
- err = dx - dy | ||
+ x, y = x0, y0 | ||
+ sx = -1 if x0 > x1 else 1 | ||
+ sy = -1 if y0 > y1 else 1 | ||
- while True: | ||
- points.append((x0, y0)) | ||
- if x0 == x1 and y0 == y1: | ||
- break | ||
- e2 = 2 * err | ||
- if e2 > -dy: | ||
+ if dx > dy: | ||
+ err = dx / 2.0 | ||
+ while x != x1: | ||
+ points.append((x, y)) | ||
err -= dy | ||
- x0 += sx | ||
- if e2 < dx: | ||
- err += dx | ||
- y0 += sy | ||
+ if err < 0: | ||
+ y += sy | ||
+ err += dx | ||
+ x += sx | ||
+ else: | ||
+ err = dy / 2.0 | ||
+ while y != y1: | ||
+ points.append((x, y)) | ||
+ err -= dx | ||
+ if err < 0: | ||
+ x += sx | ||
+ err += dy | ||
+ y += sy | ||
+ points.append((x, y)) | ||
return points""" | ||
|
||
FULL_GENERATION_EXAMPLE = """[start of /src/this_file.py] | ||
import os | ||
def euclidean(a, b): | ||
if b == 0: | ||
return a | ||
return euclidean(b, a % b) | ||
[end of /src/this_file.py] | ||
[start of /src/another_file.py] | ||
def bresenham(x0, y0, x1, y1): | ||
points = [] | ||
dx = abs(x1 - x0) | ||
dy = abs(y1 - y0) | ||
x, y = x0, y0 | ||
sx = -1 if x0 > x1 else 1 | ||
sy = -1 if y0 > y1 else 1 | ||
if dx > dy: | ||
err = dx / 2.0 | ||
while x != x1: | ||
points.append((x, y)) | ||
err -= dy | ||
if err < 0: | ||
y += sy | ||
err += dx | ||
x += sx | ||
else: | ||
err = dy / 2.0 | ||
while y != y1: | ||
points.append((x | ||
err -= dx | ||
if err < 0: | ||
x += sx | ||
err += dy | ||
y += sy | ||
points.append((x, y)) | ||
return points | ||
[end of /src/another_file.py]""" | ||
|
||
|
||
def add_lines_list(content): | ||
content_with_lines = list() | ||
for ix, line in enumerate(content.split("\n"), start=1): | ||
content_with_lines.append(f"{ix} {line}") | ||
return content_with_lines | ||
|
||
|
||
def add_lines(content): | ||
return "\n".join(add_lines_list(content)) | ||
|
||
|
||
def make_code_text(files_dict, add_line_numbers=True): | ||
all_text = "" | ||
for filename, contents in sorted(files_dict.items()): | ||
all_text += f"[start of {filename}]\n" | ||
if add_line_numbers: | ||
all_text += add_lines(contents) | ||
else: | ||
all_text += contents | ||
all_text += f"\n[end of {filename}]\n" | ||
return all_text.strip("\n") | ||
|
||
|
||
def make_code_text_edits_only(files_dict, patch, root_path, add_line_numbers=True): | ||
files = dict() | ||
patch = unidiff.PatchSet(patch) | ||
for patched_file in patch: | ||
source_file = root_path / patched_file.source_file.split("a/", 1)[-1] | ||
files[source_file] = list() | ||
for hunk in patched_file: | ||
start = hunk.source_start - 15 | ||
end = start + hunk.source_length + 15 | ||
files[source_file].append((start, end)) | ||
all_text = "" | ||
for filename, content in files_dict.items(): | ||
# filename = str(filename) | ||
all_text += f"[start of {filename}]\n" | ||
content_with_lines = add_lines_list(content) | ||
for start, end in files[filename]: | ||
if start > 0: | ||
all_text += "...\n" | ||
all_text += "\n".join(content_with_lines[start:end]) | ||
all_text += "\n" | ||
if end < len(content_with_lines): | ||
all_text += "...\n" | ||
all_text = all_text.strip("\n") | ||
all_text += f"\n[end of {filename}]\n" | ||
return all_text.strip("\n") | ||
|
||
|
||
def prompt_style_2_edits_only(instance, root_path, filenames): | ||
premise = "You will be provided with a partial code base and an issue statement explaining a problem to resolve." | ||
|
||
readmes = get_readme_files(root_path) | ||
instance["readmes"] = ingest_files([root_path / readme for readme in readmes]) | ||
|
||
readmes_text = make_code_text(instance["readmes"]) | ||
instance["file_contents"] = ingest_files([root_path / filename for filename in filenames]) | ||
code_text = make_code_text_edits_only(instance["file_contents"], instance["patch"], root_path) | ||
instructions = ( | ||
"I need you to solve this issue by generating a single patch file that I can apply " | ||
+ "directly to this repository using git apply. Please respond with a single patch " | ||
+ "file in the following format." | ||
) | ||
problem_statement = instance["problem_statement"] | ||
final_text = [ | ||
premise, | ||
"<issue>", | ||
problem_statement, | ||
"</issue>", | ||
"<code>", | ||
readmes_text, | ||
code_text, | ||
"</code>", | ||
instructions, | ||
"<patch>", | ||
PATCH_EXAMPLE, | ||
"</patch>", | ||
] | ||
final_text = "\n".join(final_text) | ||
return final_text | ||
|
||
|
||
def ingest_files(file_paths): | ||
files_dict = dict() | ||
for file_path in file_paths: | ||
files_dict[file_path] = Path.read_text(file_path, encoding="utf-8") | ||
return files_dict | ||
|
||
|
||
def get_readme_files(repo_path): | ||
path = Path(repo_path) | ||
# 检查文件名是否以 "readme" 开头,不区分大小写 | ||
files = [file.name for file in path.iterdir() if file.is_file() and file.name.lower().startswith("readme")] | ||
return files |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import os | ||
import shutil | ||
import subprocess | ||
from pathlib import Path | ||
from typing import Dict | ||
|
||
import git | ||
from git.exc import GitError | ||
|
||
from metagpt.logs import logger | ||
from metagpt.utils.exceptions import handle_exception | ||
|
||
KEY_INSTANCE_ID = "instance_id" | ||
RESET_FAILED = ">>>>> Reset Failed" | ||
|
||
|
||
class ExecWrapper: | ||
def __init__(self, subprocess_args: Dict = None): | ||
self.subprocess_args = subprocess_args or {} | ||
|
||
@handle_exception(exception_type=subprocess.CalledProcessError) | ||
def __call__(self, cmd, raise_error=True, **kwargs): | ||
combined_args = {**self.subprocess_args, **kwargs} | ||
output = subprocess.run(cmd, **combined_args) | ||
return output | ||
|
||
|
||
class EnvManager: | ||
def __init__(self, testbed): | ||
shellenv = os.environ.copy() | ||
self.testbed = testbed | ||
|
||
self.exec = ExecWrapper( | ||
subprocess_args={ | ||
"check": True, | ||
"shell": False, | ||
"capture_output": True, | ||
"text": True, | ||
"env": shellenv, | ||
} | ||
) | ||
|
||
@handle_exception(exception_type=GitError) | ||
def clone_repo(self, repo_name: str, path: str, token: str = None): | ||
if token is None: | ||
token = os.environ.get("GITHUB_TOKEN", "git") | ||
if not token: | ||
raise ValueError("GitHub token is required for cloning repositories.") | ||
|
||
repo_url = f"https://{token}@github.com/swe-bench/{repo_name.replace('/', '__')}.git" | ||
os.makedirs(path, exist_ok=True) | ||
|
||
# Clone the repository | ||
git.Repo.clone_from(repo_url, path) | ||
logger.info(f"Repository '{repo_name}' cloned successfully.") | ||
|
||
@handle_exception(exception_type=Exception) # Using a broad exception type for the example | ||
def copy_repo(self, source_path: str, destination_path: str): | ||
if not os.path.isdir(source_path): | ||
raise ValueError("Source path does not exist or is not a directory.") | ||
|
||
os.makedirs(destination_path, exist_ok=True) | ||
|
||
# Copy the repository | ||
try: | ||
shutil.copytree( | ||
source_path, destination_path, dirs_exist_ok=True | ||
) # For Python 3.8+, dirs_exist_ok handles existing directories | ||
except TypeError: | ||
# Fallback for Python < 3.8, where dirs_exist_ok is not available | ||
if os.listdir(destination_path): # If destination is not empty | ||
raise ValueError("Destination directory is not empty and dirs_exist_ok is not supported.") | ||
shutil.copytree(source_path, destination_path) | ||
|
||
logger.info(f"Repository contents from '{source_path}' copied successfully to '{destination_path}'.") | ||
|
||
@handle_exception(exception_type=Exception, default_return=False) | ||
def reset_task_env(self, instance: Dict): | ||
""" | ||
Reset task environment + testbed and checkout base commit of given task instance | ||
""" | ||
gitignore_path = Path(".gitignore") | ||
if gitignore_path.exists(): | ||
self.exec(["git", "ls-files", "--ignored", "--exclude-standard", "-o", "-z"], raise_error=False) | ||
# fixme: need detect platform and change this cmd | ||
# self.exec(["xargs", "-0", "-r", "rm", "-rf"], input=gitignore_path.read_text()) | ||
|
||
self.exec(["git", "restore", "."]) | ||
self.exec(["git", "reset", "HEAD", "."]) | ||
self.exec(["git", "clean", "-fdx"]) | ||
self.exec(["git", "-c", "advice.detachedHead=false", "checkout", instance["base_commit"]]) | ||
logger.info(f"[{instance['instance_id']}] Reset task environment to {instance['base_commit']}") | ||
return True |
Oops, something went wrong.