|
10 | 10 | from typing import TYPE_CHECKING, Final, Iterable |
11 | 11 | from urllib.parse import urlparse |
12 | 12 |
|
| 13 | +import git |
13 | 14 | import httpx |
14 | 15 | from starlette.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND |
15 | 16 |
|
@@ -50,6 +51,9 @@ def is_github_host(url: str) -> bool: |
50 | 51 | async def run_command(*args: str) -> tuple[bytes, bytes]: |
51 | 52 | """Execute a shell command asynchronously and return (stdout, stderr) bytes. |
52 | 53 |
|
| 54 | + This function is kept for backward compatibility with non-git commands. |
| 55 | + Git operations should use GitPython directly. |
| 56 | +
|
53 | 57 | Parameters |
54 | 58 | ---------- |
55 | 59 | *args : str |
@@ -92,21 +96,26 @@ async def ensure_git_installed() -> None: |
92 | 96 |
|
93 | 97 | """ |
94 | 98 | try: |
95 | | - await run_command("git", "--version") |
96 | | - except RuntimeError as exc: |
| 99 | + # Use GitPython to check git availability |
| 100 | + git.Git().version() |
| 101 | + except git.GitCommandError as exc: |
| 102 | + msg = "Git is not installed or not accessible. Please install Git first." |
| 103 | + raise RuntimeError(msg) from exc |
| 104 | + except Exception as exc: |
97 | 105 | msg = "Git is not installed or not accessible. Please install Git first." |
98 | 106 | raise RuntimeError(msg) from exc |
| 107 | + |
99 | 108 | if sys.platform == "win32": |
100 | 109 | try: |
101 | | - stdout, _ = await run_command("git", "config", "core.longpaths") |
102 | | - if stdout.decode().strip().lower() != "true": |
| 110 | + longpaths_value = git.Git().config("core.longpaths") |
| 111 | + if longpaths_value.lower() != "true": |
103 | 112 | logger.warning( |
104 | 113 | "Git clone may fail on Windows due to long file paths. " |
105 | 114 | "Consider enabling long path support with: 'git config --global core.longpaths true'. " |
106 | 115 | "Note: This command may require administrator privileges.", |
107 | 116 | extra={"platform": "windows", "longpaths_enabled": False}, |
108 | 117 | ) |
109 | | - except RuntimeError: |
| 118 | + except git.GitCommandError: |
110 | 119 | # Ignore if checking 'core.longpaths' fails. |
111 | 120 | pass |
112 | 121 |
|
@@ -222,61 +231,73 @@ async def fetch_remote_branches_or_tags(url: str, *, ref_type: str, token: str | |
222 | 231 | msg = f"Invalid fetch type: {ref_type}" |
223 | 232 | raise ValueError(msg) |
224 | 233 |
|
225 | | - cmd = ["git"] |
226 | | - |
227 | | - # Add authentication if needed |
228 | | - if token and is_github_host(url): |
229 | | - cmd += ["-c", create_git_auth_header(token, url=url)] |
230 | | - |
231 | | - cmd += ["ls-remote"] |
232 | | - |
233 | | - fetch_tags = ref_type == "tags" |
234 | | - to_fetch = "tags" if fetch_tags else "heads" |
235 | | - |
236 | | - cmd += [f"--{to_fetch}"] |
237 | | - |
238 | | - # `--refs` filters out the peeled tag objects (those ending with "^{}") (for tags) |
239 | | - if fetch_tags: |
240 | | - cmd += ["--refs"] |
241 | | - |
242 | | - cmd += [url] |
243 | | - |
244 | 234 | await ensure_git_installed() |
245 | | - stdout, _ = await run_command(*cmd) |
246 | | - # For each line in the output: |
247 | | - # - Skip empty lines and lines that don't contain "refs/{to_fetch}/" |
248 | | - # - Extract the branch or tag name after "refs/{to_fetch}/" |
249 | | - return [ |
250 | | - line.split(f"refs/{to_fetch}/", 1)[1] |
251 | | - for line in stdout.decode().splitlines() |
252 | | - if line.strip() and f"refs/{to_fetch}/" in line |
253 | | - ] |
| 235 | + |
| 236 | + # Use GitPython to get remote references |
| 237 | + try: |
| 238 | + git_cmd = git.Git() |
| 239 | + |
| 240 | + # Prepare environment with authentication if needed |
| 241 | + env = None |
| 242 | + if token and is_github_host(url): |
| 243 | + auth_url = _add_token_to_url(url, token) |
| 244 | + url = auth_url |
| 245 | + |
| 246 | + fetch_tags = ref_type == "tags" |
| 247 | + to_fetch = "tags" if fetch_tags else "heads" |
| 248 | + |
| 249 | + # Build ls-remote command |
| 250 | + cmd_args = ["ls-remote", f"--{to_fetch}"] |
| 251 | + if fetch_tags: |
| 252 | + cmd_args.append("--refs") # Filter out peeled tag objects |
| 253 | + cmd_args.append(url) |
| 254 | + |
| 255 | + # Run the command |
| 256 | + output = git_cmd.execute(cmd_args, env=env) |
| 257 | + |
| 258 | + # Parse output |
| 259 | + return [ |
| 260 | + line.split(f"refs/{to_fetch}/", 1)[1] |
| 261 | + for line in output.splitlines() |
| 262 | + if line.strip() and f"refs/{to_fetch}/" in line |
| 263 | + ] |
| 264 | + except git.GitCommandError as exc: |
| 265 | + msg = f"Failed to fetch {ref_type} from {url}: {exc}" |
| 266 | + raise RuntimeError(msg) from exc |
254 | 267 |
|
255 | 268 |
|
256 | | -def create_git_command(base_cmd: list[str], local_path: str, url: str, token: str | None = None) -> list[str]: |
257 | | - """Create a git command with authentication if needed. |
| 269 | +def create_git_repo(local_path: str, url: str, token: str | None = None) -> git.Repo: |
| 270 | + """Create a GitPython Repo object with authentication if needed. |
258 | 271 |
|
259 | 272 | Parameters |
260 | 273 | ---------- |
261 | | - base_cmd : list[str] |
262 | | - The base git command to start with. |
263 | 274 | local_path : str |
264 | | - The local path where the git command should be executed. |
| 275 | + The local path where the git repository is located. |
265 | 276 | url : str |
266 | 277 | The repository URL to check if it's a GitHub repository. |
267 | 278 | token : str | None |
268 | 279 | GitHub personal access token (PAT) for accessing private repositories. |
269 | 280 |
|
270 | 281 | Returns |
271 | 282 | ------- |
272 | | - list[str] |
273 | | - The git command with authentication if needed. |
| 283 | + git.Repo |
| 284 | + A GitPython Repo object configured with authentication. |
274 | 285 |
|
275 | 286 | """ |
276 | | - cmd = [*base_cmd, "-C", local_path] |
277 | | - if token and is_github_host(url): |
278 | | - cmd += ["-c", create_git_auth_header(token, url=url)] |
279 | | - return cmd |
| 287 | + try: |
| 288 | + repo = git.Repo(local_path) |
| 289 | + |
| 290 | + # Configure authentication if needed |
| 291 | + if token and is_github_host(url): |
| 292 | + auth_header = create_git_auth_header(token, url=url) |
| 293 | + # Set the auth header in git config for this repo |
| 294 | + key, value = auth_header.split('=', 1) |
| 295 | + repo.git.config(key, value) |
| 296 | + |
| 297 | + return repo |
| 298 | + except git.InvalidGitRepositoryError as exc: |
| 299 | + msg = f"Invalid git repository at {local_path}" |
| 300 | + raise ValueError(msg) from exc |
280 | 301 |
|
281 | 302 |
|
282 | 303 | def create_git_auth_header(token: str, url: str = "https://github.com") -> str: |
@@ -343,8 +364,13 @@ async def checkout_partial_clone(config: CloneConfig, token: str | None) -> None |
343 | 364 | if config.blob: |
344 | 365 | # Remove the file name from the subpath when ingesting from a file url (e.g. blob/branch/path/file.txt) |
345 | 366 | subpath = str(Path(subpath).parent.as_posix()) |
346 | | - checkout_cmd = create_git_command(["git"], config.local_path, config.url, token) |
347 | | - await run_command(*checkout_cmd, "sparse-checkout", "set", subpath) |
| 367 | + |
| 368 | + try: |
| 369 | + repo = create_git_repo(config.local_path, config.url, token) |
| 370 | + repo.git.execute(["sparse-checkout", "set", subpath]) |
| 371 | + except git.GitCommandError as exc: |
| 372 | + msg = f"Failed to configure sparse-checkout: {exc}" |
| 373 | + raise RuntimeError(msg) from exc |
348 | 374 |
|
349 | 375 |
|
350 | 376 | async def resolve_commit(config: CloneConfig, token: str | None) -> str: |
@@ -400,20 +426,27 @@ async def _resolve_ref_to_sha(url: str, pattern: str, token: str | None = None) |
400 | 426 | If the ref does not exist in the remote repository. |
401 | 427 |
|
402 | 428 | """ |
403 | | - # Build: git [-c http.<host>/.extraheader=Auth...] ls-remote <url> <pattern> |
404 | | - cmd: list[str] = ["git"] |
405 | | - if token and is_github_host(url): |
406 | | - cmd += ["-c", create_git_auth_header(token, url=url)] |
407 | | - |
408 | | - cmd += ["ls-remote", url, pattern] |
409 | | - stdout, _ = await run_command(*cmd) |
410 | | - lines = stdout.decode().splitlines() |
411 | | - sha = _pick_commit_sha(lines) |
412 | | - if not sha: |
413 | | - msg = f"{pattern!r} not found in {url}" |
414 | | - raise ValueError(msg) |
415 | | - |
416 | | - return sha |
| 429 | + try: |
| 430 | + git_cmd = git.Git() |
| 431 | + |
| 432 | + # Prepare authentication if needed |
| 433 | + auth_url = url |
| 434 | + if token and is_github_host(url): |
| 435 | + auth_url = _add_token_to_url(url, token) |
| 436 | + |
| 437 | + # Execute ls-remote command |
| 438 | + output = git_cmd.execute(["ls-remote", auth_url, pattern]) |
| 439 | + lines = output.splitlines() |
| 440 | + |
| 441 | + sha = _pick_commit_sha(lines) |
| 442 | + if not sha: |
| 443 | + msg = f"{pattern!r} not found in {url}" |
| 444 | + raise ValueError(msg) |
| 445 | + |
| 446 | + return sha |
| 447 | + except git.GitCommandError as exc: |
| 448 | + msg = f"Failed to resolve {pattern} in {url}: {exc}" |
| 449 | + raise ValueError(msg) from exc |
417 | 450 |
|
418 | 451 |
|
419 | 452 | def _pick_commit_sha(lines: Iterable[str]) -> str | None: |
@@ -449,3 +482,37 @@ def _pick_commit_sha(lines: Iterable[str]) -> str | None: |
449 | 482 | first_non_peeled = sha |
450 | 483 |
|
451 | 484 | return first_non_peeled # branch or lightweight tag (or None) |
| 485 | + |
| 486 | + |
| 487 | +def _add_token_to_url(url: str, token: str) -> str: |
| 488 | + """Add authentication token to GitHub URL. |
| 489 | +
|
| 490 | + Parameters |
| 491 | + ---------- |
| 492 | + url : str |
| 493 | + The original GitHub URL. |
| 494 | + token : str |
| 495 | + The GitHub token to add. |
| 496 | +
|
| 497 | + Returns |
| 498 | + ------- |
| 499 | + str |
| 500 | + The URL with embedded authentication. |
| 501 | +
|
| 502 | + """ |
| 503 | + from urllib.parse import urlparse, urlunparse |
| 504 | + |
| 505 | + parsed = urlparse(url) |
| 506 | + # Add token as username in URL (GitHub supports this) |
| 507 | + netloc = f"x-oauth-basic:{token}@{parsed.hostname}" |
| 508 | + if parsed.port: |
| 509 | + netloc += f":{parsed.port}" |
| 510 | + |
| 511 | + return urlunparse(( |
| 512 | + parsed.scheme, |
| 513 | + netloc, |
| 514 | + parsed.path, |
| 515 | + parsed.params, |
| 516 | + parsed.query, |
| 517 | + parsed.fragment |
| 518 | + )) |
0 commit comments