diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py new file mode 100644 index 0000000..c2810ce --- /dev/null +++ b/mokelumne/dags/copy_files.py @@ -0,0 +1,174 @@ +"""File copy DAG to transfer files from one location to another.""" + +from __future__ import annotations + +import logging +import os + +from pathlib import Path + +from airflow.sdk import Param, dag, get_current_context, task +from airflow.sdk.exceptions import AirflowFailException + +from mokelumne.util.storage import run_dir +from mokelumne.util.file_transfer import ( + build_file_manifest, + copy_files_from_manifest, + load_json, + save_json, + verify_file_manifest, +) + +logger = logging.getLogger(__name__) + + +@dag( + description="Transfers files from one location to another", + schedule=None, + catchup=False, + params={ + "source": Param( + default="", + title="Source Directory", + description="Directory where source files are found", + type="string", + ), + "destination": Param( + default="", + title="Destination Directory", + description="Directory where source files will be copied to", + type="string", + ), + }, + tags=["file-transfer"], +) +def copy_files(): + """Copy files from a source directory to an empty destination directory.""" + + @task + def validate_source() -> str: + """Checks that the source is a valid directory.""" + + ctx = get_current_context() + source = ctx["params"]["source"] + + source_path = Path(source) + if not source_path.exists(): + raise AirflowFailException(f"Source directory does not exist: {source_path}") + + if not source_path.is_dir(): + raise AirflowFailException(f"Source is not a directory: {source_path}") + + return source + + @task + def prepare_destination() -> str: + """ + Prepare the destination directory. + TODO: one of the requirements that we got from Lynne is that the destination + should be an incoming subdirectory of any directory on PA or DA, + e.g. /srv/pa/aerial/ucb/incoming. i don't have a good answer for how to + approach this, but i'm curious about how you think we should confirm that 1) + we're actually writing to the appropriate incoming subdirectory, and if the + incoming directory exists and has files in it, we should fail the job. + """ + + ctx = get_current_context() + destination = ctx["params"]["destination"] + + destination_path = Path(destination) + + if not destination_path.exists(): + logger.info("Creating destination directory: %s", destination_path) + destination_path.mkdir(parents=True) + + if not destination_path.is_dir(): + raise AirflowFailException(f"Destination is not a directory: {destination_path}") + + if next(os.scandir(destination_path), None) is not None: + raise AirflowFailException(f"Destination directory contains files: {destination_path}") + + return destination + + @task + def build_manifest(source: str) -> str: + """Build a manifest of all files under the source directory.""" + + source_path = Path(source) + manifest = build_file_manifest(source_path) + + if not manifest: + raise AirflowFailException(f"No files found in source: {source_path}") + + logger.info("Manifest contains %s file(s)", len(manifest)) + + ctx = get_current_context() + manifest_path = run_dir(ctx["run_id"]) / "manifest.json" + + save_json(manifest, manifest_path) + + logger.info("Manifest written to: %s", manifest_path) + + return str(manifest_path) + + @task + def copy_manifest_files(source: str, destination: str, manifest_path: str) -> None: + """Copy all files in manifest to destination directory.""" + + try: + copy_files_from_manifest( + Path(source), + Path(destination), + Path(manifest_path), + ) + except Exception as ex: + raise AirflowFailException( + f"Manifest copy failed: {ex}" + ) from ex + + manifest = load_json(Path(manifest_path)) + logger.info("Copied %s file(s)", len(manifest)) + + @task + def verify_manifest(destination: str, manifest_path: str) -> None: + """Verify all copied files exist at the destination.""" + + try: + verification_report = verify_file_manifest( + Path(destination), + Path(manifest_path), + ) + except Exception as ex: + raise AirflowFailException( + f"Manifest verification failed: {ex}" + ) from ex + + ctx = get_current_context() + run_path = run_dir(ctx["run_id"]) + + verification_report_path = run_path / "verification_report.json" + + save_json(verification_report, verification_report_path) + + logger.info("Verification report written to: %s", verification_report_path) + logger.info("Verified %s copied file(s)", len(verification_report)) + + validated_source = validate_source() + prepared_destination = prepare_destination() + manifest = build_manifest(validated_source) + copied_manifest_files = copy_manifest_files( + validated_source, + prepared_destination, + manifest, + ) + verified_manifest = verify_manifest(prepared_destination, manifest) + + ( + validated_source + >> prepared_destination + >> manifest + >> copied_manifest_files + >> verified_manifest + ) + +copy_files() # pyright: ignore[reportUnusedExpression] diff --git a/mokelumne/util/file_transfer.py b/mokelumne/util/file_transfer.py new file mode 100644 index 0000000..d8b4f5b --- /dev/null +++ b/mokelumne/util/file_transfer.py @@ -0,0 +1,118 @@ +"""Provides file transfer routines.""" + +import hashlib +import json +import shutil + +from pathlib import Path + +ManifestEntry = dict[str, int | str] +Manifest = dict[str, str | list[ManifestEntry]] + +def sha256_for_file(path: Path) -> str: + """Calculate the SHA256 checksum for a file.""" + sha256 = hashlib.sha256() + + with open(path, "rb") as file: + for chunk in iter(lambda: file.read(8192), b""): + sha256.update(chunk) + + return sha256.hexdigest() + + +def build_file_manifest(source_path: Path) -> Manifest: + files = [] + + for item in source_path.rglob("*"): + if item.is_file(): + relative_path = item.relative_to(source_path) + files.append( + { + "path": str(relative_path), + "size": item.stat().st_size, + "sha256": sha256_for_file(item), + } + ) + + return { + "source_root": str(source_path), + "files": files, + } + + +def save_json(data: Manifest, path: Path) -> None: + with open(path, "w", encoding="utf-8") as json_file: + json.dump(data, json_file, indent=2) + + +def load_json(path: Path) -> Manifest: + with open(path, "r", encoding="utf-8") as json_file: + return json.load(json_file) + + +def verify_file_manifest(destination_path: Path, manifest_path: Path) -> list[ManifestEntry]: + """Verify all copied files exist at the destination.""" + + verification_report: ManifestEntry = [] + manifest = load_json(manifest_path) + + for entry in manifest["files"]: + relative_path = Path(str(entry["path"])) + expected_size = entry["size"] + expected_sha256 = entry["sha256"] + + destination_file = destination_path / relative_path + + if not destination_file.exists(): + raise FileNotFoundError(f"File not found: {destination_file}") + + if not destination_file.is_file(): + raise ValueError(f"Path exists but is not a file: {destination_file}") + + actual_size = destination_file.stat().st_size + + if actual_size != expected_size: + raise ValueError( + f"Size mismatch for {destination_file}: " + f"expected {expected_size}, got {actual_size}" + ) + + actual_sha256 = sha256_for_file(destination_file) + + if actual_sha256 != expected_sha256: + raise ValueError( + f"Checksum mismatch for {destination_file}: " + f"expected {expected_sha256}, got {actual_sha256}" + ) + + verification_report.append( + { + "path": str(relative_path), + "status": "verified", + "size": expected_size, + "sha256": expected_sha256, + } + ) + + return verification_report + + +def copy_files_from_manifest(source_path: Path, destination_path: Path, manifest_path: Path) -> None: + """Copy all files in manifest to destination directory.""" + + manifest = load_json(manifest_path) + + for entry in manifest["files"]: + relative_path = Path(str(entry["path"])) + source_file = source_path / relative_path + destination_file = destination_path / relative_path + + if not source_file.exists(): + raise FileNotFoundError(f"Manifest file missing from source: {source_file}") + + if not source_file.is_file(): + raise ValueError(f"Manifest path is not a file: {source_file}") + + destination_file.parent.mkdir(parents=True, exist_ok=True) + + shutil.copy2(source_file, destination_file) diff --git a/test/unit/test_file_transfer.py b/test/unit/test_file_transfer.py new file mode 100644 index 0000000..dd6b40b --- /dev/null +++ b/test/unit/test_file_transfer.py @@ -0,0 +1,152 @@ +"""PyTest cases for the mokelumne.util.file_transfer module.""" + +from pathlib import Path + +import pytest + +from mokelumne.util import file_transfer + + +class TestFileTransfer: + """Tests for the Mokelumne file transfer module.""" + + def test_sha256_for_file(self, tmp_path: Path): + """Ensure that sha256_for_file returns the expected checksum.""" + test_file = tmp_path / "test.txt" + test_file.write_text("hello", encoding="utf-8") + + result = file_transfer.sha256_for_file(test_file) + + assert result == ( + "2cf24dba5fb0a30e26e83b2ac5b9e" + "29e1b161e5c1fa7425e73043362938b9824" + ) + + def test_save_and_load_json(self, tmp_path: Path): + """Ensure that JSON data can be saved and loaded.""" + data = [ + { + "path": "test.txt", + "size": 5, + "sha256": "fake-checksum", + } + ] + + json_file_path = tmp_path / "data.json" + + file_transfer.save_json(data, json_file_path) + result = file_transfer.load_json(json_file_path) + + assert result == data + + def test_build_manifest_empty_directory(self, tmp_path: Path): + """Ensure that build_manifest returns an empty list for an empty directory.""" + result = file_transfer.build_file_manifest(tmp_path) + + assert result == { + "source_root": str(tmp_path), + "files": [], + } + + def test_build_manifest_includes_nested_files(self, tmp_path: Path): + """Ensure that build_manifest includes files recursively.""" + file_one = tmp_path / "file_one.txt" + file_one.write_text("hello", encoding="utf-8") + + subdir = tmp_path / "subdir" + subdir.mkdir() + + file_two = subdir / "file_two.txt" + file_two.write_text("goodbye", encoding="utf-8") + + result = file_transfer.build_file_manifest(tmp_path) + + paths = {entry["path"] for entry in result["files"]} + + assert paths == { + "file_one.txt", + "subdir/file_two.txt", + } + assert len(result) == 2 + + def test_verify_manifest_success(self, tmp_path: Path): + """Ensure verify_manifest succeeds for valid files.""" + + test_file = tmp_path / "test.txt" + test_file.write_text("hello", encoding="utf-8") + + manifest = file_transfer.build_file_manifest(tmp_path) + + manifest_path = tmp_path / "manifest.json" + file_transfer.save_json(manifest, manifest_path) + + result = file_transfer.verify_file_manifest(tmp_path, manifest_path) + + assert result == [ + { + "path": "test.txt", + "status": "verified", + "size": 5, + "sha256": file_transfer.sha256_for_file(test_file), + } + ] + + def test_verify_manifest_missing_file(self, tmp_path: Path): + """Ensure verify_manifest fails when a file is missing.""" + + test_file = tmp_path / "test.txt" + test_file.write_text("hello", encoding="utf-8") + + manifest = file_transfer.build_file_manifest(tmp_path) + + manifest_path = tmp_path / "manifest.json" + file_transfer.save_json(manifest, manifest_path) + + test_file.unlink() + + with pytest.raises(FileNotFoundError): + file_transfer.verify_file_manifest(tmp_path, manifest_path) + + def test_copy_manifest_files(self, tmp_path: Path): + """Ensure copy_manifest_files copies files from source to destination.""" + source = tmp_path / "source" + source.mkdir() + + destination = tmp_path / "destination" + destination.mkdir() + + source_file = source / "test.txt" + source_file.write_text("hello", encoding="utf-8") + + manifest = file_transfer.build_file_manifest(source) + + manifest_path = tmp_path / "manifest.json" + file_transfer.save_json(manifest, manifest_path) + + file_transfer.copy_files_from_manifest(source, destination, manifest_path) + + destination_file = destination / "test.txt" + + assert destination_file.exists() + assert destination_file.read_text(encoding="utf-8") == "hello" + + def test_copy_manifest_files_missing_source_file(self, tmp_path: Path): + """Ensure copy_manifest_files fails if a manifest file is missing from source.""" + source = tmp_path / "source" + source.mkdir() + + destination = tmp_path / "destination" + destination.mkdir() + + source_file = source / "test.txt" + source_file.write_text("hello", encoding="utf-8") + + manifest = file_transfer.build_file_manifest(source) + + manifest_path = tmp_path / "manifest.json" + file_transfer.save_json(manifest, manifest_path) + + source_file.unlink() + + with pytest.raises(FileNotFoundError): + file_transfer.copy_files_from_manifest(source, destination, manifest_path)