-
Notifications
You must be signed in to change notification settings - Fork 0
Ap 722 copy files #78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
26614ef
3055268
9588f7c
c22e5b4
0277210
a07c76c
592be13
b37c180
21756e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| """File copy DAG to transfer files from one location to another.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| import os | ||
|
|
||
| from pathlib import Path | ||
|
|
||
| from airflow.sdk import Param, dag, get_current_context, task | ||
| from airflow.sdk.exceptions import AirflowFailException | ||
|
|
||
| from mokelumne.util.storage import run_dir | ||
| from mokelumne.util.file_transfer import ( | ||
| build_file_manifest, | ||
| copy_files_from_manifest, | ||
| load_json, | ||
| save_json, | ||
| verify_file_manifest, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @dag( | ||
| description="Transfers files from one location to another", | ||
| schedule=None, | ||
| catchup=False, | ||
| params={ | ||
| "source": Param( | ||
| default="", | ||
| title="Source Directory", | ||
| description="Directory where source files are found", | ||
| type="string", | ||
| ), | ||
| "destination": Param( | ||
| default="", | ||
| title="Destination Directory", | ||
| description="Directory where source files will be copied to", | ||
| type="string", | ||
| ), | ||
| }, | ||
| tags=["file-transfer"], | ||
| ) | ||
| def copy_files(): | ||
| """Copy files from a source directory to an empty destination directory.""" | ||
|
|
||
| @task | ||
| def validate_source() -> str: | ||
| """Checks that the source is a valid directory.""" | ||
|
|
||
| ctx = get_current_context() | ||
| source = ctx["params"]["source"] | ||
|
|
||
| source_path = Path(source) | ||
| if not source_path.exists(): | ||
| raise AirflowFailException(f"Source directory does not exist: {source_path}") | ||
|
|
||
| if not source_path.is_dir(): | ||
| raise AirflowFailException(f"Source is not a directory: {source_path}") | ||
|
|
||
| return source | ||
|
|
||
| @task | ||
| def prepare_destination() -> str: | ||
| """ | ||
| Prepare the destination directory. | ||
| TODO: one of the requirements that we got from Lynne is that the destination | ||
| should be an incoming subdirectory of any directory on PA or DA, | ||
| e.g. /srv/pa/aerial/ucb/incoming. i don't have a good answer for how to | ||
| approach this, but i'm curious about how you think we should confirm that 1) | ||
| we're actually writing to the appropriate incoming subdirectory, and if the | ||
| incoming directory exists and has files in it, we should fail the job. | ||
| """ | ||
|
|
||
| ctx = get_current_context() | ||
| destination = ctx["params"]["destination"] | ||
|
|
||
| destination_path = Path(destination) | ||
|
|
||
| if not destination_path.exists(): | ||
| logger.info("Creating destination directory: %s", destination_path) | ||
| destination_path.mkdir(parents=True) | ||
|
anarchivist marked this conversation as resolved.
|
||
|
|
||
| if not destination_path.is_dir(): | ||
| raise AirflowFailException(f"Destination is not a directory: {destination_path}") | ||
|
|
||
| if next(os.scandir(destination_path), None) is not None: | ||
| raise AirflowFailException(f"Destination directory contains files: {destination_path}") | ||
|
|
||
| return destination | ||
|
|
||
| @task | ||
| def build_manifest(source: str) -> str: | ||
| """Build a manifest of all files under the source directory.""" | ||
|
|
||
| source_path = Path(source) | ||
| manifest = build_file_manifest(source_path) | ||
|
|
||
| if not manifest: | ||
| raise AirflowFailException(f"No files found in source: {source_path}") | ||
|
|
||
| logger.info("Manifest contains %s file(s)", len(manifest)) | ||
|
|
||
| ctx = get_current_context() | ||
| manifest_path = run_dir(ctx["run_id"]) / "manifest.json" | ||
|
|
||
| save_json(manifest, manifest_path) | ||
|
|
||
| logger.info("Manifest written to: %s", manifest_path) | ||
|
|
||
| return str(manifest_path) | ||
|
|
||
| @task | ||
| def copy_manifest_files(source: str, destination: str, manifest_path: str) -> None: | ||
| """Copy all files in manifest to destination directory.""" | ||
|
|
||
| try: | ||
| copy_files_from_manifest( | ||
| Path(source), | ||
| Path(destination), | ||
| Path(manifest_path), | ||
| ) | ||
| except Exception as ex: | ||
| raise AirflowFailException( | ||
| f"Manifest copy failed: {ex}" | ||
| ) from ex | ||
|
|
||
| manifest = load_json(Path(manifest_path)) | ||
| logger.info("Copied %s file(s)", len(manifest)) | ||
|
|
||
| @task | ||
| def verify_manifest(destination: str, manifest_path: str) -> None: | ||
| """Verify all copied files exist at the destination.""" | ||
|
|
||
| try: | ||
| verification_report = verify_file_manifest( | ||
| Path(destination), | ||
| Path(manifest_path), | ||
| ) | ||
| except Exception as ex: | ||
| raise AirflowFailException( | ||
| f"Manifest verification failed: {ex}" | ||
| ) from ex | ||
|
|
||
| ctx = get_current_context() | ||
| run_path = run_dir(ctx["run_id"]) | ||
|
|
||
| verification_report_path = run_path / "verification_report.json" | ||
|
|
||
| save_json(verification_report, verification_report_path) | ||
|
|
||
| logger.info("Verification report written to: %s", verification_report_path) | ||
| logger.info("Verified %s copied file(s)", len(verification_report)) | ||
|
|
||
| validated_source = validate_source() | ||
| prepared_destination = prepare_destination() | ||
| manifest = build_manifest(validated_source) | ||
| copied_manifest_files = copy_manifest_files( | ||
| validated_source, | ||
| prepared_destination, | ||
| manifest, | ||
| ) | ||
| verified_manifest = verify_manifest(prepared_destination, manifest) | ||
|
|
||
| ( | ||
| validated_source | ||
| >> prepared_destination | ||
| >> manifest | ||
| >> copied_manifest_files | ||
| >> verified_manifest | ||
| ) | ||
|
|
||
| copy_files() # pyright: ignore[reportUnusedExpression] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| """Provides file transfer routines.""" | ||
|
|
||
| import hashlib | ||
| import json | ||
| import shutil | ||
|
|
||
| from pathlib import Path | ||
|
|
||
| ManifestEntry = dict[str, int | str] | ||
| Manifest = dict[str, str | list[ManifestEntry]] | ||
|
|
||
| def sha256_for_file(path: Path) -> str: | ||
| """Calculate the SHA256 checksum for a file.""" | ||
| sha256 = hashlib.sha256() | ||
|
|
||
| with open(path, "rb") as file: | ||
| for chunk in iter(lambda: file.read(8192), b""): | ||
| sha256.update(chunk) | ||
|
|
||
| return sha256.hexdigest() | ||
|
|
||
|
|
||
| def build_file_manifest(source_path: Path) -> Manifest: | ||
| files = [] | ||
|
|
||
| for item in source_path.rglob("*"): | ||
| if item.is_file(): | ||
| relative_path = item.relative_to(source_path) | ||
| files.append( | ||
| { | ||
| "path": str(relative_path), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a reason why you're preferring relative path here? my inclination would be to use the absolute path to ensure there's a lack of ambiguity, but happy to hear your thoughts otherwise.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking the manifest is intended to describe the directory tree being transferred (not necessarily an absolute location). I'm not married to the idea, I can change it if you think it would be better...or I can add the source root separately: { Think I should make one of those changes now or next iteration?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'd add the source root info somewhere now - i think it's useful context to have in the manifest if we're trying to diagnose issues.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done - added "source_root": {
"source_root": "tmp/source",
"files": [
{
"path": "test4.txt",
"size": 5,
"sha256": "b9cca56a720f2beee61f2e744ab3d20a95772a4315d18c5eee251a465f078012"
}
]
} |
||
| "size": item.stat().st_size, | ||
| "sha256": sha256_for_file(item), | ||
| } | ||
| ) | ||
|
|
||
| return { | ||
| "source_root": str(source_path), | ||
| "files": files, | ||
| } | ||
|
|
||
|
|
||
| def save_json(data: Manifest, path: Path) -> None: | ||
| with open(path, "w", encoding="utf-8") as json_file: | ||
| json.dump(data, json_file, indent=2) | ||
|
|
||
|
|
||
| def load_json(path: Path) -> Manifest: | ||
| with open(path, "r", encoding="utf-8") as json_file: | ||
| return json.load(json_file) | ||
|
|
||
|
|
||
| def verify_file_manifest(destination_path: Path, manifest_path: Path) -> list[ManifestEntry]: | ||
| """Verify all copied files exist at the destination.""" | ||
|
|
||
| verification_report: ManifestEntry = [] | ||
| manifest = load_json(manifest_path) | ||
|
|
||
| for entry in manifest["files"]: | ||
| relative_path = Path(str(entry["path"])) | ||
| expected_size = entry["size"] | ||
| expected_sha256 = entry["sha256"] | ||
|
|
||
| destination_file = destination_path / relative_path | ||
|
|
||
| if not destination_file.exists(): | ||
| raise FileNotFoundError(f"File not found: {destination_file}") | ||
|
|
||
| if not destination_file.is_file(): | ||
| raise ValueError(f"Path exists but is not a file: {destination_file}") | ||
|
|
||
| actual_size = destination_file.stat().st_size | ||
|
|
||
| if actual_size != expected_size: | ||
| raise ValueError( | ||
| f"Size mismatch for {destination_file}: " | ||
| f"expected {expected_size}, got {actual_size}" | ||
| ) | ||
|
|
||
| actual_sha256 = sha256_for_file(destination_file) | ||
|
|
||
| if actual_sha256 != expected_sha256: | ||
| raise ValueError( | ||
| f"Checksum mismatch for {destination_file}: " | ||
| f"expected {expected_sha256}, got {actual_sha256}" | ||
| ) | ||
|
|
||
| verification_report.append( | ||
| { | ||
| "path": str(relative_path), | ||
| "status": "verified", | ||
| "size": expected_size, | ||
| "sha256": expected_sha256, | ||
| } | ||
| ) | ||
|
|
||
| return verification_report | ||
|
|
||
|
|
||
| def copy_files_from_manifest(source_path: Path, destination_path: Path, manifest_path: Path) -> None: | ||
| """Copy all files in manifest to destination directory.""" | ||
|
|
||
| manifest = load_json(manifest_path) | ||
|
|
||
| for entry in manifest["files"]: | ||
| relative_path = Path(str(entry["path"])) | ||
| source_file = source_path / relative_path | ||
| destination_file = destination_path / relative_path | ||
|
|
||
| if not source_file.exists(): | ||
| raise FileNotFoundError(f"Manifest file missing from source: {source_file}") | ||
|
|
||
| if not source_file.is_file(): | ||
| raise ValueError(f"Manifest path is not a file: {source_file}") | ||
|
|
||
| destination_file.parent.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| shutil.copy2(source_file, destination_file) | ||
Uh oh!
There was an error while loading. Please reload this page.