From 26614efeb25d0168cb4e694f6a50daed9a5bd905 Mon Sep 17 00:00:00 2001 From: Steve Sullivan Date: Thu, 18 Jun 2026 16:56:52 -0700 Subject: [PATCH 1/9] First version of copy_files DAG --- mokelumne/dags/copy_files.py | 169 +++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 mokelumne/dags/copy_files.py diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py new file mode 100644 index 0000000..ca0f919 --- /dev/null +++ b/mokelumne/dags/copy_files.py @@ -0,0 +1,169 @@ +""" +File copy DAG to transfer files from one location to another +""" + +from __future__ import annotations + +import logging +import shutil + +from pathlib import Path + +from airflow.sdk import Param, dag, get_current_context, task +from airflow.sdk.exceptions import AirflowFailException + +logger = logging.getLogger(__name__) + + +@dag( + description="Transfers files from one location to another", + schedule=None, + catchup=False, + params={ + "source": Param( + default="", + title="Source Directory", + description="Directory where source files are found", + type="string", + ), + "destination": Param( + default="", + title="Destination Directory", + description="Directory where source files will be copied to", + type="string", + ), + }, + tags=["file-transfer"], +) +def copy_files(): + """ + Copy files from a source directory to an empty destination directory. + """ + + @task + def validate_source() -> str: + """ + Checks that the source is a valid directory + """ + logger.info("VALIDATE_SOURCE - VERSION 2") + + ctx = get_current_context() + source = ctx["params"]["source"] + + if not source.strip(): + raise AirflowFailException("Source directory is required") + + source_path = Path(source) + if not source_path.exists(): + raise AirflowFailException(f"Source directory does not exist: {source_path}") + + if not source_path.is_dir(): + raise AirflowFailException(f"Source is not a directory: {source_path}") + + logger.info("SOURCE IS: %s", source) + + return source + + @task + def prepare_destination() -> str: + """ + Prepare the destination directory + + If directory does not exist, create it + If it exists and it contains files in it, fail! + """ + logger.info("PREPARE_DESTINATION - VERSION 1") + + ctx = get_current_context() + destination = ctx["params"]["destination"] + + if not destination.strip(): + raise AirflowFailException("Destination directory is required") + + destination_path = Path(destination) + + if not destination_path.exists(): + logger.info("Creating destination directory: %s", destination_path) + destination_path.mkdir(parents=True) + + if not destination_path.is_dir(): + raise AirflowFailException(f"Destination is not a directory: {destination_path}") + + if any(destination_path.iterdir()): + raise AirflowFailException(f"Destination directory contains files: {destination_path}") + + logger.info("DESTINATION IS: %s", destination) + + return destination + + @task + def copy_source_files(source: str, destination: str) -> list[str]: + """ + Copy all files and subdirectories from source directory to destination directory + """ + logger.info("COPY_SOURCE_FILES - VERSION 3") + + source_path = Path(source) + destination_path = Path(destination) + + copied_files = [] + + for item in source_path.rglob("*"): + relative_path = item.relative_to(source_path) + destination_item = destination_path / relative_path + + if item.is_dir(): + logger.info("Creating destination subdirectory: %s", destination_item) + destination_item.mkdir(parents=True, exist_ok=True) + elif item.is_file(): + destination_item.parent.mkdir(parents=True, exist_ok=True) + + logger.info("Copying %s to %s", item, destination_item) + shutil.copy2(item, destination_item) + + copied_files.append(str(relative_path)) + + if not copied_files: + raise AirflowFailException(f"No files found to copy in source: {source_path}") + + logger.info("Copied %s file(s)", len(copied_files)) + + return copied_files + + + @task + def verify_copy(destination: str, copied_files: list[str]) -> None: + """ + Verify all copied files exist at the destination. + """ + logger.info("VERIFY_COPY - VERSION 1") + + destination_path = Path(destination) + + for copied_file in copied_files: + destination_file = destination_path / copied_file + + if not destination_file.exists(): + raise AirflowFailException( + f"Copied file missing from destination: {destination_file}" + ) + + if not destination_file.is_file(): + raise AirflowFailException( + f"Copied path exists but is not a file: {destination_file}" + ) + + logger.info("Verified copied file exists: %s", destination_file) + + logger.info("Verified %s copied file(s)", len(copied_files)) + + + + validated_source = validate_source() + prepared_destination = prepare_destination() + copied_files = copy_source_files(validated_source, prepared_destination) + verified_copy = verify_copy(prepared_destination, copied_files) + + validated_source >> prepared_destination >> copied_files >> verified_copy + +copy_files() # pyright: ignore[reportUnusedExpression] From 305526858ccf0c514d117ab12071fe934bbdf958 Mon Sep 17 00:00:00 2001 From: Steve Sullivan Date: Mon, 22 Jun 2026 14:09:07 -0700 Subject: [PATCH 2/9] Refactor to use a manifest --- mokelumne/dags/copy_files.py | 76 ++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 17 deletions(-) diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py index ca0f919..2650090 100644 --- a/mokelumne/dags/copy_files.py +++ b/mokelumne/dags/copy_files.py @@ -1,5 +1,12 @@ """ File copy DAG to transfer files from one location to another +Change this to use a manifest rather than just copying from one dir to another.... + +1. validate_source +2. prepare_destination +3. build_manifest <--- need to create the manifest from the source +4. copy_from_manifest <--- need to refactor +5. verify_manifest """ from __future__ import annotations @@ -97,31 +104,65 @@ def prepare_destination() -> str: return destination @task - def copy_source_files(source: str, destination: str) -> list[str]: + def build_manifest(source: str) -> list[dict[str, int | str]]: + """ + Build a manifest of all files under the source directory. + """ + logger.info("BUILD_MANIFEST - VERSION 4") + + source_path = Path(source) + manifest = [] + + for item in source_path.rglob("*"): + if item.is_file(): + relative_path = item.relative_to(source_path) + + manifest.append( + { + "path": str(relative_path), + "size": item.stat().st_size, + } + ) + + if not manifest: + raise AirflowFailException(f"No files found in source: {source_path}") + + logger.info("Manifest contains %s file(s)", len(manifest)) + + return manifest + + + @task + def copy_manifest_files(source: str, destination: str, manifest: list[dict[str, int | str]]) -> list[str]: """ - Copy all files and subdirectories from source directory to destination directory + Copy all files in manifest to destination directory """ - logger.info("COPY_SOURCE_FILES - VERSION 3") + logger.info("COPY_SOURCE_FILES - VERSION 4") source_path = Path(source) destination_path = Path(destination) copied_files = [] - for item in source_path.rglob("*"): - relative_path = item.relative_to(source_path) - destination_item = destination_path / relative_path - - if item.is_dir(): - logger.info("Creating destination subdirectory: %s", destination_item) - destination_item.mkdir(parents=True, exist_ok=True) - elif item.is_file(): - destination_item.parent.mkdir(parents=True, exist_ok=True) + for entry in manifest: + relative_path = Path(str(entry["path"])) + source_file = source_path / relative_path + destination_file = destination_path / relative_path + + if not source_file.exists(): + raise AirflowFailException(f"Manifest file missing from source: {source_file}") + + if not source_file.is_file(): + raise AirflowFailException(f"Manifest path is not a file: {source_file}") + + + destination_file.parent.mkdir(parents=True, exist_ok=True) - logger.info("Copying %s to %s", item, destination_item) - shutil.copy2(item, destination_item) + logger.info("Copying %s to %s", source_file, destination_file) + shutil.copy2(source_file, destination_file) + + copied_files.append(str(relative_path)) - copied_files.append(str(relative_path)) if not copied_files: raise AirflowFailException(f"No files found to copy in source: {source_path}") @@ -161,9 +202,10 @@ def verify_copy(destination: str, copied_files: list[str]) -> None: validated_source = validate_source() prepared_destination = prepare_destination() - copied_files = copy_source_files(validated_source, prepared_destination) + manifest = build_manifest(validated_source) + copied_files = copy_manifest_files(validated_source, prepared_destination, manifest) verified_copy = verify_copy(prepared_destination, copied_files) - validated_source >> prepared_destination >> copied_files >> verified_copy + validated_source >> prepared_destination >> manifest >> copied_files >> verified_copy copy_files() # pyright: ignore[reportUnusedExpression] From 9588f7c4c1edbe9d79fb67db30e30ee3b848bdf4 Mon Sep 17 00:00:00 2001 From: Steve Sullivan Date: Mon, 22 Jun 2026 17:48:00 -0700 Subject: [PATCH 3/9] Add sha256 check --- mokelumne/dags/copy_files.py | 57 ++++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py index 2650090..758bb7f 100644 --- a/mokelumne/dags/copy_files.py +++ b/mokelumne/dags/copy_files.py @@ -11,6 +11,7 @@ from __future__ import annotations +import hashlib import logging import shutil @@ -21,6 +22,17 @@ logger = logging.getLogger(__name__) +def sha256_for_file(path: Path) -> str: + """ + Calculate the checksum + """ + sha256 = hashlib.sha256() + + with open(path, "rb") as file: + for chunk in iter(lambda: file.read(8192), b""): + sha256.update(chunk) + + return sha256.hexdigest() @dag( description="Transfers files from one location to another", @@ -108,7 +120,7 @@ def build_manifest(source: str) -> list[dict[str, int | str]]: """ Build a manifest of all files under the source directory. """ - logger.info("BUILD_MANIFEST - VERSION 4") + logger.info("BUILD_MANIFEST - VERSION 5") source_path = Path(source) manifest = [] @@ -121,6 +133,7 @@ def build_manifest(source: str) -> list[dict[str, int | str]]: { "path": str(relative_path), "size": item.stat().st_size, + "sha256": sha256_for_file(item), } ) @@ -137,7 +150,7 @@ def copy_manifest_files(source: str, destination: str, manifest: list[dict[str, """ Copy all files in manifest to destination directory """ - logger.info("COPY_SOURCE_FILES - VERSION 4") + logger.info("COPY_SOURCE_FILES - VERSION 5") source_path = Path(source) destination_path = Path(destination) @@ -173,38 +186,50 @@ def copy_manifest_files(source: str, destination: str, manifest: list[dict[str, @task - def verify_copy(destination: str, copied_files: list[str]) -> None: + def verify_copy( + destination: str, + manifest: list[dict[str, int | str]] + ) -> None: """ Verify all copied files exist at the destination. """ - logger.info("VERIFY_COPY - VERSION 1") + logger.info("VERIFY_COPY - VERSION 4") destination_path = Path(destination) - for copied_file in copied_files: - destination_file = destination_path / copied_file + for entry in manifest: + relative_path = Path(str(entry["path"])) + expected_size = entry["size"] + expected_sha256 = entry['sha256'] + + destination_file = destination_path / relative_path if not destination_file.exists(): - raise AirflowFailException( - f"Copied file missing from destination: {destination_file}" - ) + raise AirflowFailException(f"Copied file missing from destination: {destination_file}") if not destination_file.is_file(): - raise AirflowFailException( - f"Copied path exists but is not a file: {destination_file}" - ) - - logger.info("Verified copied file exists: %s", destination_file) + raise AirflowFailException(f"Copied path exists but is not a file: {destination_file}") + + actual_size = destination_file.stat().st_size - logger.info("Verified %s copied file(s)", len(copied_files)) + if actual_size != expected_size: + raise AirflowFailException(f"Copied file size does not match original") + actual_sha256 = sha256_for_file(destination_file) + + if actual_sha256 != expected_sha256: + raise AirflowFailException(f"Copied file checksum does not match original: {destination_file}") + + logger.info("Verified copied file exists, size matches, and checksum matches: %s", destination_file) + + logger.info("Verified %s copied file(s)", len(manifest)) validated_source = validate_source() prepared_destination = prepare_destination() manifest = build_manifest(validated_source) copied_files = copy_manifest_files(validated_source, prepared_destination, manifest) - verified_copy = verify_copy(prepared_destination, copied_files) + verified_copy = verify_copy(prepared_destination, manifest) validated_source >> prepared_destination >> manifest >> copied_files >> verified_copy From c22e5b4445294796bb7f2d318c8c914b6d631389 Mon Sep 17 00:00:00 2001 From: Steve Sullivan Date: Mon, 22 Jun 2026 18:18:04 -0700 Subject: [PATCH 4/9] Rename some vars and tasks clean up comments --- mokelumne/dags/copy_files.py | 40 +++++++++++++++--------------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py index 758bb7f..798540c 100644 --- a/mokelumne/dags/copy_files.py +++ b/mokelumne/dags/copy_files.py @@ -1,11 +1,10 @@ """ File copy DAG to transfer files from one location to another -Change this to use a manifest rather than just copying from one dir to another.... 1. validate_source 2. prepare_destination -3. build_manifest <--- need to create the manifest from the source -4. copy_from_manifest <--- need to refactor +3. build_manifest +4. copy_from_manifest 5. verify_manifest """ @@ -24,7 +23,7 @@ def sha256_for_file(path: Path) -> str: """ - Calculate the checksum + Calculate the SHA256 checksum for a file. """ sha256 = hashlib.sha256() @@ -146,17 +145,15 @@ def build_manifest(source: str) -> list[dict[str, int | str]]: @task - def copy_manifest_files(source: str, destination: str, manifest: list[dict[str, int | str]]) -> list[str]: + def copy_manifest_files(source: str, destination: str, manifest: list[dict[str, int | str]]) -> None: """ Copy all files in manifest to destination directory """ - logger.info("COPY_SOURCE_FILES - VERSION 5") + logger.info("COPY_MANIFEST_FILES - VERSION 1") source_path = Path(source) destination_path = Path(destination) - copied_files = [] - for entry in manifest: relative_path = Path(str(entry["path"])) source_file = source_path / relative_path @@ -174,26 +171,18 @@ def copy_manifest_files(source: str, destination: str, manifest: list[dict[str, logger.info("Copying %s to %s", source_file, destination_file) shutil.copy2(source_file, destination_file) - copied_files.append(str(relative_path)) - - - if not copied_files: - raise AirflowFailException(f"No files found to copy in source: {source_path}") - - logger.info("Copied %s file(s)", len(copied_files)) + logger.info("Copied %s file(s)", len(manifest)) - return copied_files - @task - def verify_copy( + def verify_manifest( destination: str, manifest: list[dict[str, int | str]] ) -> None: """ Verify all copied files exist at the destination. """ - logger.info("VERIFY_COPY - VERSION 4") + logger.info("VERIFY_MANIFEST - VERSION 1") destination_path = Path(destination) @@ -213,12 +202,15 @@ def verify_copy( actual_size = destination_file.stat().st_size if actual_size != expected_size: - raise AirflowFailException(f"Copied file size does not match original") + raise AirflowFailException(f"Size mismatch for {destination_file}: expected {expected_size}, got {actual_size}") actual_sha256 = sha256_for_file(destination_file) if actual_sha256 != expected_sha256: - raise AirflowFailException(f"Copied file checksum does not match original: {destination_file}") + raise AirflowFailException( + f"Checksum mismatch for {destination_file}: " + f"expected {expected_sha256}, got {actual_sha256}" + ) logger.info("Verified copied file exists, size matches, and checksum matches: %s", destination_file) @@ -228,9 +220,9 @@ def verify_copy( validated_source = validate_source() prepared_destination = prepare_destination() manifest = build_manifest(validated_source) - copied_files = copy_manifest_files(validated_source, prepared_destination, manifest) - verified_copy = verify_copy(prepared_destination, manifest) + copied_manifest_files = copy_manifest_files(validated_source, prepared_destination, manifest) + verified_manifest = verify_manifest(prepared_destination, manifest) - validated_source >> prepared_destination >> manifest >> copied_files >> verified_copy + validated_source >> prepared_destination >> manifest >> copied_manifest_files >> verified_manifest copy_files() # pyright: ignore[reportUnusedExpression] From 02772103d427f69901df133982dbfbfe225c65b8 Mon Sep 17 00:00:00 2001 From: Steve Sullivan Date: Wed, 24 Jun 2026 13:28:07 -0700 Subject: [PATCH 5/9] Add file transfer utilities and tests --- mokelumne/dags/copy_files.py | 191 ++++++++++---------------------- mokelumne/util/file_transfer.py | 103 +++++++++++++++++ test/unit/test_file_transfer.py | 155 ++++++++++++++++++++++++++ 3 files changed, 319 insertions(+), 130 deletions(-) create mode 100644 mokelumne/util/file_transfer.py create mode 100644 test/unit/test_file_transfer.py diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py index 798540c..5fc31f1 100644 --- a/mokelumne/dags/copy_files.py +++ b/mokelumne/dags/copy_files.py @@ -1,37 +1,25 @@ -""" -File copy DAG to transfer files from one location to another - -1. validate_source -2. prepare_destination -3. build_manifest -4. copy_from_manifest -5. verify_manifest -""" +"""File copy DAG to transfer files from one location to another.""" from __future__ import annotations -import hashlib import logging -import shutil from pathlib import Path from airflow.sdk import Param, dag, get_current_context, task from airflow.sdk.exceptions import AirflowFailException -logger = logging.getLogger(__name__) +from mokelumne.util.storage import run_dir +from mokelumne.util.file_transfer import ( + build_manifest as build_file_manifest, + copy_manifest_files as copy_files_from_manifest, + read_manifest, + verify_manifest as verify_file_manifest, + write_manifest, +) -def sha256_for_file(path: Path) -> str: - """ - Calculate the SHA256 checksum for a file. - """ - sha256 = hashlib.sha256() +logger = logging.getLogger(__name__) - with open(path, "rb") as file: - for chunk in iter(lambda: file.read(8192), b""): - sha256.update(chunk) - - return sha256.hexdigest() @dag( description="Transfers files from one location to another", @@ -54,59 +42,48 @@ def sha256_for_file(path: Path) -> str: tags=["file-transfer"], ) def copy_files(): - """ - Copy files from a source directory to an empty destination directory. - """ + """Copy files from a source directory to an empty destination directory.""" @task def validate_source() -> str: - """ - Checks that the source is a valid directory - """ - logger.info("VALIDATE_SOURCE - VERSION 2") - + """Checks that the source is a valid directory.""" + ctx = get_current_context() source = ctx["params"]["source"] if not source.strip(): raise AirflowFailException("Source directory is required") - + source_path = Path(source) if not source_path.exists(): raise AirflowFailException(f"Source directory does not exist: {source_path}") - + if not source_path.is_dir(): raise AirflowFailException(f"Source is not a directory: {source_path}") logger.info("SOURCE IS: %s", source) return source - + @task def prepare_destination() -> str: - """ - Prepare the destination directory - - If directory does not exist, create it - If it exists and it contains files in it, fail! - """ - logger.info("PREPARE_DESTINATION - VERSION 1") + """Prepare the destination directory.""" ctx = get_current_context() destination = ctx["params"]["destination"] if not destination.strip(): raise AirflowFailException("Destination directory is required") - + destination_path = Path(destination) if not destination_path.exists(): logger.info("Creating destination directory: %s", destination_path) destination_path.mkdir(parents=True) - + if not destination_path.is_dir(): raise AirflowFailException(f"Destination is not a directory: {destination_path}") - + if any(destination_path.iterdir()): raise AirflowFailException(f"Destination directory contains files: {destination_path}") @@ -115,114 +92,68 @@ def prepare_destination() -> str: return destination @task - def build_manifest(source: str) -> list[dict[str, int | str]]: - """ - Build a manifest of all files under the source directory. - """ - logger.info("BUILD_MANIFEST - VERSION 5") + def build_manifest(source: str) -> str: + """Build a manifest of all files under the source directory.""" source_path = Path(source) - manifest = [] - - for item in source_path.rglob("*"): - if item.is_file(): - relative_path = item.relative_to(source_path) - - manifest.append( - { - "path": str(relative_path), - "size": item.stat().st_size, - "sha256": sha256_for_file(item), - } - ) - + manifest = build_file_manifest(source_path) + if not manifest: raise AirflowFailException(f"No files found in source: {source_path}") - - logger.info("Manifest contains %s file(s)", len(manifest)) - - return manifest - - - @task - def copy_manifest_files(source: str, destination: str, manifest: list[dict[str, int | str]]) -> None: - """ - Copy all files in manifest to destination directory - """ - logger.info("COPY_MANIFEST_FILES - VERSION 1") - source_path = Path(source) - destination_path = Path(destination) + logger.info("Manifest contains %s file(s)", len(manifest)) - for entry in manifest: - relative_path = Path(str(entry["path"])) - source_file = source_path / relative_path - destination_file = destination_path / relative_path + ctx = get_current_context() + manifest_path = run_dir(ctx["run_id"]) / "manifest.json" - if not source_file.exists(): - raise AirflowFailException(f"Manifest file missing from source: {source_file}") + write_manifest(manifest, manifest_path) - if not source_file.is_file(): - raise AirflowFailException(f"Manifest path is not a file: {source_file}") - + logger.info("Manifest written to: %s", manifest_path) - destination_file.parent.mkdir(parents=True, exist_ok=True) - - logger.info("Copying %s to %s", source_file, destination_file) - shutil.copy2(source_file, destination_file) + return str(manifest_path) + @task + def copy_manifest_files(source: str, destination: str, manifest_path: str) -> None: + """Copy all files in manifest to destination directory.""" + + try: + copy_files_from_manifest( + Path(source), + Path(destination), + Path(manifest_path), + ) + except Exception as ex: + raise AirflowFailException( + f"Manifest copy failed: {ex}" + ) from ex + + manifest = read_manifest(Path(manifest_path)) logger.info("Copied %s file(s)", len(manifest)) - @task - def verify_manifest( - destination: str, - manifest: list[dict[str, int | str]] - ) -> None: - """ - Verify all copied files exist at the destination. - """ - logger.info("VERIFY_MANIFEST - VERSION 1") - - destination_path = Path(destination) - - for entry in manifest: - relative_path = Path(str(entry["path"])) - expected_size = entry["size"] - expected_sha256 = entry['sha256'] - - destination_file = destination_path / relative_path - - if not destination_file.exists(): - raise AirflowFailException(f"Copied file missing from destination: {destination_file}") + def verify_manifest(destination: str, manifest_path: str) -> None: + """Verify all copied files exist at the destination.""" - if not destination_file.is_file(): - raise AirflowFailException(f"Copied path exists but is not a file: {destination_file}") - - actual_size = destination_file.stat().st_size - - if actual_size != expected_size: - raise AirflowFailException(f"Size mismatch for {destination_file}: expected {expected_size}, got {actual_size}") - - actual_sha256 = sha256_for_file(destination_file) - - if actual_sha256 != expected_sha256: - raise AirflowFailException( - f"Checksum mismatch for {destination_file}: " - f"expected {expected_sha256}, got {actual_sha256}" - ) - - logger.info("Verified copied file exists, size matches, and checksum matches: %s", destination_file) + try: + verify_file_manifest(Path(destination), Path(manifest_path)) + except Exception as ex: + raise AirflowFailException( + f"Manifest verification failed: {ex}" + ) from ex + manifest = read_manifest(Path(manifest_path)) logger.info("Verified %s copied file(s)", len(manifest)) - validated_source = validate_source() prepared_destination = prepare_destination() manifest = build_manifest(validated_source) - copied_manifest_files = copy_manifest_files(validated_source, prepared_destination, manifest) + copied_manifest_files = copy_manifest_files( + validated_source, + prepared_destination, + manifest, + ) verified_manifest = verify_manifest(prepared_destination, manifest) validated_source >> prepared_destination >> manifest >> copied_manifest_files >> verified_manifest -copy_files() # pyright: ignore[reportUnusedExpression] +copy_files() # pyright: ignore[reportUnusedExpression] diff --git a/mokelumne/util/file_transfer.py b/mokelumne/util/file_transfer.py new file mode 100644 index 0000000..f8e53a0 --- /dev/null +++ b/mokelumne/util/file_transfer.py @@ -0,0 +1,103 @@ +"""Provides file transfer routines.""" + +import hashlib +import json +import shutil + +from pathlib import Path + +def sha256_for_file(path: Path) -> str: + """Calculate the SHA256 checksum for a file.""" + sha256 = hashlib.sha256() + + with open(path, "rb") as file: + for chunk in iter(lambda: file.read(8192), b""): + sha256.update(chunk) + + return sha256.hexdigest() + + +def build_manifest(source_path: Path) -> list[dict[str, int | str]]: + manifest = [] + + for item in source_path.rglob("*"): + if item.is_file(): + relative_path = item.relative_to(source_path) + manifest.append( + { + "path": str(relative_path), + "size": item.stat().st_size, + "sha256": sha256_for_file(item), + } + ) + + return manifest + + +def write_manifest(manifest: list[dict[str, int | str]], manifest_path: Path) -> None: + with open(manifest_path, "w", encoding="utf-8") as manifest_file: + json.dump(manifest, manifest_file, indent=2) + + +def read_manifest(manifest_path: Path) -> list[dict[str, int | str]]: + with open(manifest_path, "r", encoding="utf-8") as manifest_file: + return json.load(manifest_file) + + +def verify_manifest(destination_path: Path, manifest_path: Path) -> None: + """Verify all copied files exist at the destination.""" + + manifest = read_manifest(manifest_path) + + for entry in manifest: + relative_path = Path(str(entry["path"])) + expected_size = entry["size"] + expected_sha256 = entry["sha256"] + + destination_file = destination_path / relative_path + + if not destination_file.exists(): + raise FileNotFoundError(f"File not found: {destination_file}") + + if not destination_file.is_file(): + raise ValueError(f"Path exists but is not a file: {destination_file}") + + actual_size = destination_file.stat().st_size + + if actual_size != expected_size: + raise ValueError( + f"Size mismatch for {destination_file}: " + f"expected {expected_size}, got {actual_size}" + ) + + actual_sha256 = sha256_for_file(destination_file) + + if actual_sha256 != expected_sha256: + raise ValueError( + f"Checksum mismatch for {destination_file}: " + f"expected {expected_sha256}, got {actual_sha256}" + ) + + +def copy_manifest_files(source_path: Path, destination_path: Path, manifest_path: Path) -> None: + """Copy all files in manifest to destination directory""" + + manifest = read_manifest(manifest_path) + + for entry in manifest: + relative_path = Path(str(entry["path"])) + source_file = source_path / relative_path + destination_file = destination_path / relative_path + + if not source_file.exists(): + raise FileNotFoundError(f"Manifest file missing from source: {source_file}") + + if not source_file.is_file(): + raise ValueError(f"Manifest path is not a file: {source_file}") + + + destination_file.parent.mkdir(parents=True, exist_ok=True) + + # logger.info("Copying %s to %s", source_file, destination_file) + shutil.copy2(source_file, destination_file) + diff --git a/test/unit/test_file_transfer.py b/test/unit/test_file_transfer.py new file mode 100644 index 0000000..ae9b45d --- /dev/null +++ b/test/unit/test_file_transfer.py @@ -0,0 +1,155 @@ +"""PyTest cases for the mokelumne.util.file_transfer module.""" + +from pathlib import Path +import pytest + +from mokelumne.util import file_transfer + +class TestFileTransfer: + """Tests for the Mokelumne file transfer module.""" + + def test_sha256_for_file(self, tmp_path: Path): + """Ensure that sha256_for_file returns the expected checksum.""" + test_file = tmp_path / "test.txt" + test_file.write_text("hello", encoding="utf-8") + + result = file_transfer.sha256_for_file(test_file) + + assert result == "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + + def test_write_and_read_manifest(self, tmp_path: Path): + """Ensure that a manifest can be written and read.""" + manifest = [ + { + "path": "test.txt", + "size": 5, + "sha256": "fake-checksum", + } + ] + + manifest_path = tmp_path / "manifest.json" + + file_transfer.write_manifest(manifest, manifest_path) + result = file_transfer.read_manifest(manifest_path) + + assert result == manifest + + def test_build_manifest(self, tmp_path: Path): + """Ensure that build_manifest invludes files recursively.""" + file_one = tmp_path / "file_one.txt" + file_one.write_text("Hello", encoding="utf-8") + + subdir = tmp_path / "subdir" + subdir.mkdir() + + file_two = subdir / "file_two.txt" + file_two.write_text("goodbye", encoding="utf-8") + + result = file_transfer.build_manifest(tmp_path) + + paths = {entry["path"] for entry in result} + + assert paths == { + "file_one.txt", + "subdir/file_two.txt", + } + + def test_build_manifest_empty_directory(self, tmp_path: Path): + """Ensure that build_manifest returns an empty list for an empty directory.""" + result = file_transfer.build_manifest(tmp_path) + + assert result == [] + + def test_build_manifest_includes_nested_files(self, tmp_path: Path): + """Ensure that build_manifest includes files recursively.""" + file_one = tmp_path / "file_one.txt" + file_one.write_text("hello", encoding="utf-8") + + subdir = tmp_path / "subdir" + subdir.mkdir() + + file_two = subdir / "file_two.txt" + file_two.write_text("goodbye", encoding="utf-8") + + result = file_transfer.build_manifest(tmp_path) + + paths = {entry["path"] for entry in result} + + assert paths == { + "file_one.txt", + "subdir/file_two.txt", + } + assert len(result) == 2 + + def test_verify_manifest_success(self, tmp_path: Path): + """Ensure verify_manifest succeeds for valid files.""" + + test_file = tmp_path / "test.txt" + test_file.write_text("hello", encoding="utf-8") + + manifest = file_transfer.build_manifest(tmp_path) + + manifest_path = tmp_path / "manifest.json" + file_transfer.write_manifest(manifest, manifest_path) + + file_transfer.verify_manifest(tmp_path, manifest_path) + + def test_verify_manifest_missing_file(self, tmp_path: Path): + """Ensure verify_manifest fails when a file is missing.""" + + test_file = tmp_path / "test.txt" + test_file.write_text("hello", encoding="utf-8") + + manifest = file_transfer.build_manifest(tmp_path) + + manifest_path = tmp_path / "manifest.json" + file_transfer.write_manifest(manifest, manifest_path) + + test_file.unlink() + + with pytest.raises(FileNotFoundError): + file_transfer.verify_manifest(tmp_path, manifest_path) + + def test_copy_manifest_files(self, tmp_path: Path): + """Ensure copy_manifest_files copies files from source to destination.""" + source = tmp_path / "source" + source.mkdir() + + destination = tmp_path / "destination" + destination.mkdir() + + source_file = source / "test.txt" + source_file.write_text("hello", encoding="utf-8") + + manifest = file_transfer.build_manifest(source) + + manifest_path = tmp_path / "manifest.json" + file_transfer.write_manifest(manifest, manifest_path) + + file_transfer.copy_manifest_files(source, destination, manifest_path) + + destination_file = destination / "test.txt" + + assert destination_file.exists() + assert destination_file.read_text(encoding="utf-8") == "hello" + + def test_copy_manifest_files_missing_source_file(self, tmp_path: Path): + """Ensure copy_manifest_files fails if a manifest file is missing from source.""" + source = tmp_path / "source" + source.mkdir() + + destination = tmp_path / "destination" + destination.mkdir() + + source_file = source / "test.txt" + source_file.write_text("hello", encoding="utf-8") + + manifest = file_transfer.build_manifest(source) + + manifest_path = tmp_path / "manifest.json" + file_transfer.write_manifest(manifest, manifest_path) + + source_file.unlink() + + with pytest.raises(FileNotFoundError): + file_transfer.copy_manifest_files(source, destination, manifest_path) \ No newline at end of file From a07c76c3129f66cb596b5ab6da631fe9175599b7 Mon Sep 17 00:00:00 2001 From: Steve Sullivan Date: Thu, 25 Jun 2026 17:17:35 -0700 Subject: [PATCH 6/9] Add verification report some small refactoring --- mokelumne/dags/copy_files.py | 32 +++++++++++++----- mokelumne/util/file_transfer.py | 36 +++++++++++++------- test/unit/test_file_transfer.py | 60 +++++++++++++++------------------ 3 files changed, 74 insertions(+), 54 deletions(-) diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py index 5fc31f1..207a903 100644 --- a/mokelumne/dags/copy_files.py +++ b/mokelumne/dags/copy_files.py @@ -13,9 +13,9 @@ from mokelumne.util.file_transfer import ( build_manifest as build_file_manifest, copy_manifest_files as copy_files_from_manifest, - read_manifest, + load_json, + save_json, verify_manifest as verify_file_manifest, - write_manifest, ) logger = logging.getLogger(__name__) @@ -106,7 +106,7 @@ def build_manifest(source: str) -> str: ctx = get_current_context() manifest_path = run_dir(ctx["run_id"]) / "manifest.json" - write_manifest(manifest, manifest_path) + save_json(manifest, manifest_path) logger.info("Manifest written to: %s", manifest_path) @@ -127,7 +127,7 @@ def copy_manifest_files(source: str, destination: str, manifest_path: str) -> No f"Manifest copy failed: {ex}" ) from ex - manifest = read_manifest(Path(manifest_path)) + manifest = load_json(Path(manifest_path)) logger.info("Copied %s file(s)", len(manifest)) @task @@ -135,14 +135,24 @@ def verify_manifest(destination: str, manifest_path: str) -> None: """Verify all copied files exist at the destination.""" try: - verify_file_manifest(Path(destination), Path(manifest_path)) + verification_report = verify_file_manifest( + Path(destination), + Path(manifest_path), + ) except Exception as ex: raise AirflowFailException( f"Manifest verification failed: {ex}" ) from ex - manifest = read_manifest(Path(manifest_path)) - logger.info("Verified %s copied file(s)", len(manifest)) + ctx = get_current_context() + run_path = run_dir(ctx["run_id"]) + + verification_report_path = run_path / "verification_report.json" + + save_json(verification_report, verification_report_path) + + logger.info("Verification report written to: %s", verification_report_path) + logger.info("Verified %s copied file(s)", len(verification_report)) validated_source = validate_source() prepared_destination = prepare_destination() @@ -154,6 +164,12 @@ def verify_manifest(destination: str, manifest_path: str) -> None: ) verified_manifest = verify_manifest(prepared_destination, manifest) - validated_source >> prepared_destination >> manifest >> copied_manifest_files >> verified_manifest + ( + validated_source + >> prepared_destination + >> manifest + >> copied_manifest_files + >> verified_manifest + ) copy_files() # pyright: ignore[reportUnusedExpression] diff --git a/mokelumne/util/file_transfer.py b/mokelumne/util/file_transfer.py index f8e53a0..a128e39 100644 --- a/mokelumne/util/file_transfer.py +++ b/mokelumne/util/file_transfer.py @@ -6,6 +6,7 @@ from pathlib import Path + def sha256_for_file(path: Path) -> str: """Calculate the SHA256 checksum for a file.""" sha256 = hashlib.sha256() @@ -34,20 +35,21 @@ def build_manifest(source_path: Path) -> list[dict[str, int | str]]: return manifest -def write_manifest(manifest: list[dict[str, int | str]], manifest_path: Path) -> None: - with open(manifest_path, "w", encoding="utf-8") as manifest_file: - json.dump(manifest, manifest_file, indent=2) +def save_json(data: list[dict[str, int | str]], path: Path) -> None: + with open(path, "w", encoding="utf-8") as json_file: + json.dump(data, json_file, indent=2) -def read_manifest(manifest_path: Path) -> list[dict[str, int | str]]: - with open(manifest_path, "r", encoding="utf-8") as manifest_file: - return json.load(manifest_file) +def load_json(path: Path) -> list[dict[str, int | str]]: + with open(path, "r", encoding="utf-8") as json_file: + return json.load(json_file) -def verify_manifest(destination_path: Path, manifest_path: Path) -> None: +def verify_manifest(destination_path: Path, manifest_path: Path) -> list[dict[str, int | str]]: """Verify all copied files exist at the destination.""" - manifest = read_manifest(manifest_path) + verification_report: list[dict[str, int | str]] = [] + manifest = load_json(manifest_path) for entry in manifest: relative_path = Path(str(entry["path"])) @@ -77,12 +79,23 @@ def verify_manifest(destination_path: Path, manifest_path: Path) -> None: f"Checksum mismatch for {destination_file}: " f"expected {expected_sha256}, got {actual_sha256}" ) + + verification_report.append( + { + "path": str(relative_path), + "status": "verified", + "size": expected_size, + "sha256": expected_sha256, + } + ) + + return verification_report def copy_manifest_files(source_path: Path, destination_path: Path, manifest_path: Path) -> None: - """Copy all files in manifest to destination directory""" + """Copy all files in manifest to destination directory.""" - manifest = read_manifest(manifest_path) + manifest = load_json(manifest_path) for entry in manifest: relative_path = Path(str(entry["path"])) @@ -95,9 +108,6 @@ def copy_manifest_files(source_path: Path, destination_path: Path, manifest_path if not source_file.is_file(): raise ValueError(f"Manifest path is not a file: {source_file}") - destination_file.parent.mkdir(parents=True, exist_ok=True) - # logger.info("Copying %s to %s", source_file, destination_file) shutil.copy2(source_file, destination_file) - diff --git a/test/unit/test_file_transfer.py b/test/unit/test_file_transfer.py index ae9b45d..cf4ddc8 100644 --- a/test/unit/test_file_transfer.py +++ b/test/unit/test_file_transfer.py @@ -1,10 +1,12 @@ """PyTest cases for the mokelumne.util.file_transfer module.""" from pathlib import Path + import pytest from mokelumne.util import file_transfer + class TestFileTransfer: """Tests for the Mokelumne file transfer module.""" @@ -15,11 +17,14 @@ def test_sha256_for_file(self, tmp_path: Path): result = file_transfer.sha256_for_file(test_file) - assert result == "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + assert result == ( + "2cf24dba5fb0a30e26e83b2ac5b9e" + "29e1b161e5c1fa7425e73043362938b9824" + ) - def test_write_and_read_manifest(self, tmp_path: Path): - """Ensure that a manifest can be written and read.""" - manifest = [ + def test_save_and_load_json(self, tmp_path: Path): + """Ensure that JSON data can be saved and loaded.""" + data = [ { "path": "test.txt", "size": 5, @@ -27,32 +32,12 @@ def test_write_and_read_manifest(self, tmp_path: Path): } ] - manifest_path = tmp_path / "manifest.json" - - file_transfer.write_manifest(manifest, manifest_path) - result = file_transfer.read_manifest(manifest_path) - - assert result == manifest - - def test_build_manifest(self, tmp_path: Path): - """Ensure that build_manifest invludes files recursively.""" - file_one = tmp_path / "file_one.txt" - file_one.write_text("Hello", encoding="utf-8") - - subdir = tmp_path / "subdir" - subdir.mkdir() - - file_two = subdir / "file_two.txt" - file_two.write_text("goodbye", encoding="utf-8") - - result = file_transfer.build_manifest(tmp_path) + json_file_path = tmp_path / "data.json" - paths = {entry["path"] for entry in result} + file_transfer.save_json(data, json_file_path) + result = file_transfer.load_json(json_file_path) - assert paths == { - "file_one.txt", - "subdir/file_two.txt", - } + assert result == data def test_build_manifest_empty_directory(self, tmp_path: Path): """Ensure that build_manifest returns an empty list for an empty directory.""" @@ -90,9 +75,18 @@ def test_verify_manifest_success(self, tmp_path: Path): manifest = file_transfer.build_manifest(tmp_path) manifest_path = tmp_path / "manifest.json" - file_transfer.write_manifest(manifest, manifest_path) + file_transfer.save_json(manifest, manifest_path) - file_transfer.verify_manifest(tmp_path, manifest_path) + result = file_transfer.verify_manifest(tmp_path, manifest_path) + + assert result == [ + { + "path": "test.txt", + "status": "verified", + "size": 5, + "sha256": file_transfer.sha256_for_file(test_file), + } + ] def test_verify_manifest_missing_file(self, tmp_path: Path): """Ensure verify_manifest fails when a file is missing.""" @@ -103,7 +97,7 @@ def test_verify_manifest_missing_file(self, tmp_path: Path): manifest = file_transfer.build_manifest(tmp_path) manifest_path = tmp_path / "manifest.json" - file_transfer.write_manifest(manifest, manifest_path) + file_transfer.save_json(manifest, manifest_path) test_file.unlink() @@ -124,7 +118,7 @@ def test_copy_manifest_files(self, tmp_path: Path): manifest = file_transfer.build_manifest(source) manifest_path = tmp_path / "manifest.json" - file_transfer.write_manifest(manifest, manifest_path) + file_transfer.save_json(manifest, manifest_path) file_transfer.copy_manifest_files(source, destination, manifest_path) @@ -147,7 +141,7 @@ def test_copy_manifest_files_missing_source_file(self, tmp_path: Path): manifest = file_transfer.build_manifest(source) manifest_path = tmp_path / "manifest.json" - file_transfer.write_manifest(manifest, manifest_path) + file_transfer.save_json(manifest, manifest_path) source_file.unlink() From 592be1380917843e81d4adcd3df1745394732855 Mon Sep 17 00:00:00 2001 From: Steve Sullivan <113538232+steve-sullivan@users.noreply.github.com> Date: Mon, 29 Jun 2026 10:55:08 -0700 Subject: [PATCH 7/9] Update mokelumne/dags/copy_files.py Co-authored-by: Anna Wilcox --- mokelumne/dags/copy_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py index 207a903..8e4ce2d 100644 --- a/mokelumne/dags/copy_files.py +++ b/mokelumne/dags/copy_files.py @@ -84,7 +84,7 @@ def prepare_destination() -> str: if not destination_path.is_dir(): raise AirflowFailException(f"Destination is not a directory: {destination_path}") - if any(destination_path.iterdir()): + if next(os.scandir(destination_path), None) is not None: raise AirflowFailException(f"Destination directory contains files: {destination_path}") logger.info("DESTINATION IS: %s", destination) From b37c1807d4a4198cd0a06b40448c5df4668dbfdd Mon Sep 17 00:00:00 2001 From: Steve Sullivan Date: Mon, 29 Jun 2026 13:28:46 -0700 Subject: [PATCH 8/9] Refactor some duplication and unneeded code --- mokelumne/dags/copy_files.py | 27 +++++++++++++-------------- mokelumne/util/file_transfer.py | 8 +++++--- test/unit/test_file_transfer.py | 20 ++++++++++---------- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/mokelumne/dags/copy_files.py b/mokelumne/dags/copy_files.py index 8e4ce2d..c2810ce 100644 --- a/mokelumne/dags/copy_files.py +++ b/mokelumne/dags/copy_files.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import os from pathlib import Path @@ -11,11 +12,11 @@ from mokelumne.util.storage import run_dir from mokelumne.util.file_transfer import ( - build_manifest as build_file_manifest, - copy_manifest_files as copy_files_from_manifest, + build_file_manifest, + copy_files_from_manifest, load_json, save_json, - verify_manifest as verify_file_manifest, + verify_file_manifest, ) logger = logging.getLogger(__name__) @@ -51,9 +52,6 @@ def validate_source() -> str: ctx = get_current_context() source = ctx["params"]["source"] - if not source.strip(): - raise AirflowFailException("Source directory is required") - source_path = Path(source) if not source_path.exists(): raise AirflowFailException(f"Source directory does not exist: {source_path}") @@ -61,20 +59,23 @@ def validate_source() -> str: if not source_path.is_dir(): raise AirflowFailException(f"Source is not a directory: {source_path}") - logger.info("SOURCE IS: %s", source) - return source @task def prepare_destination() -> str: - """Prepare the destination directory.""" + """ + Prepare the destination directory. + TODO: one of the requirements that we got from Lynne is that the destination + should be an incoming subdirectory of any directory on PA or DA, + e.g. /srv/pa/aerial/ucb/incoming. i don't have a good answer for how to + approach this, but i'm curious about how you think we should confirm that 1) + we're actually writing to the appropriate incoming subdirectory, and if the + incoming directory exists and has files in it, we should fail the job. + """ ctx = get_current_context() destination = ctx["params"]["destination"] - if not destination.strip(): - raise AirflowFailException("Destination directory is required") - destination_path = Path(destination) if not destination_path.exists(): @@ -87,8 +88,6 @@ def prepare_destination() -> str: if next(os.scandir(destination_path), None) is not None: raise AirflowFailException(f"Destination directory contains files: {destination_path}") - logger.info("DESTINATION IS: %s", destination) - return destination @task diff --git a/mokelumne/util/file_transfer.py b/mokelumne/util/file_transfer.py index a128e39..2e169b7 100644 --- a/mokelumne/util/file_transfer.py +++ b/mokelumne/util/file_transfer.py @@ -18,9 +18,11 @@ def sha256_for_file(path: Path) -> str: return sha256.hexdigest() -def build_manifest(source_path: Path) -> list[dict[str, int | str]]: +def build_file_manifest(source_path: Path) -> list[dict[str, int | str]]: manifest = [] + # TODO: Decide if we want to change relative_path to absolute_path + # or save the root path once for item in source_path.rglob("*"): if item.is_file(): relative_path = item.relative_to(source_path) @@ -45,7 +47,7 @@ def load_json(path: Path) -> list[dict[str, int | str]]: return json.load(json_file) -def verify_manifest(destination_path: Path, manifest_path: Path) -> list[dict[str, int | str]]: +def verify_file_manifest(destination_path: Path, manifest_path: Path) -> list[dict[str, int | str]]: """Verify all copied files exist at the destination.""" verification_report: list[dict[str, int | str]] = [] @@ -92,7 +94,7 @@ def verify_manifest(destination_path: Path, manifest_path: Path) -> list[dict[st return verification_report -def copy_manifest_files(source_path: Path, destination_path: Path, manifest_path: Path) -> None: +def copy_files_from_manifest(source_path: Path, destination_path: Path, manifest_path: Path) -> None: """Copy all files in manifest to destination directory.""" manifest = load_json(manifest_path) diff --git a/test/unit/test_file_transfer.py b/test/unit/test_file_transfer.py index cf4ddc8..2ff795d 100644 --- a/test/unit/test_file_transfer.py +++ b/test/unit/test_file_transfer.py @@ -41,7 +41,7 @@ def test_save_and_load_json(self, tmp_path: Path): def test_build_manifest_empty_directory(self, tmp_path: Path): """Ensure that build_manifest returns an empty list for an empty directory.""" - result = file_transfer.build_manifest(tmp_path) + result = file_transfer.build_file_manifest(tmp_path) assert result == [] @@ -56,7 +56,7 @@ def test_build_manifest_includes_nested_files(self, tmp_path: Path): file_two = subdir / "file_two.txt" file_two.write_text("goodbye", encoding="utf-8") - result = file_transfer.build_manifest(tmp_path) + result = file_transfer.build_file_manifest(tmp_path) paths = {entry["path"] for entry in result} @@ -72,12 +72,12 @@ def test_verify_manifest_success(self, tmp_path: Path): test_file = tmp_path / "test.txt" test_file.write_text("hello", encoding="utf-8") - manifest = file_transfer.build_manifest(tmp_path) + manifest = file_transfer.build_file_manifest(tmp_path) manifest_path = tmp_path / "manifest.json" file_transfer.save_json(manifest, manifest_path) - result = file_transfer.verify_manifest(tmp_path, manifest_path) + result = file_transfer.verify_file_manifest(tmp_path, manifest_path) assert result == [ { @@ -94,7 +94,7 @@ def test_verify_manifest_missing_file(self, tmp_path: Path): test_file = tmp_path / "test.txt" test_file.write_text("hello", encoding="utf-8") - manifest = file_transfer.build_manifest(tmp_path) + manifest = file_transfer.build_file_manifest(tmp_path) manifest_path = tmp_path / "manifest.json" file_transfer.save_json(manifest, manifest_path) @@ -102,7 +102,7 @@ def test_verify_manifest_missing_file(self, tmp_path: Path): test_file.unlink() with pytest.raises(FileNotFoundError): - file_transfer.verify_manifest(tmp_path, manifest_path) + file_transfer.verify_file_manifest(tmp_path, manifest_path) def test_copy_manifest_files(self, tmp_path: Path): """Ensure copy_manifest_files copies files from source to destination.""" @@ -115,12 +115,12 @@ def test_copy_manifest_files(self, tmp_path: Path): source_file = source / "test.txt" source_file.write_text("hello", encoding="utf-8") - manifest = file_transfer.build_manifest(source) + manifest = file_transfer.build_file_manifest(source) manifest_path = tmp_path / "manifest.json" file_transfer.save_json(manifest, manifest_path) - file_transfer.copy_manifest_files(source, destination, manifest_path) + file_transfer.copy_files_from_manifest(source, destination, manifest_path) destination_file = destination / "test.txt" @@ -138,7 +138,7 @@ def test_copy_manifest_files_missing_source_file(self, tmp_path: Path): source_file = source / "test.txt" source_file.write_text("hello", encoding="utf-8") - manifest = file_transfer.build_manifest(source) + manifest = file_transfer.build_file_manifest(source) manifest_path = tmp_path / "manifest.json" file_transfer.save_json(manifest, manifest_path) @@ -146,4 +146,4 @@ def test_copy_manifest_files_missing_source_file(self, tmp_path: Path): source_file.unlink() with pytest.raises(FileNotFoundError): - file_transfer.copy_manifest_files(source, destination, manifest_path) \ No newline at end of file + file_transfer.copy_files_from_manifest(source, destination, manifest_path) From 21756e54ea36c99ff2372898870b8c8cb21a9062 Mon Sep 17 00:00:00 2001 From: Steve Sullivan Date: Mon, 29 Jun 2026 16:21:33 -0700 Subject: [PATCH 9/9] Add source root to manifest --- mokelumne/util/file_transfer.py | 27 +++++++++++++++------------ test/unit/test_file_transfer.py | 7 +++++-- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/mokelumne/util/file_transfer.py b/mokelumne/util/file_transfer.py index 2e169b7..d8b4f5b 100644 --- a/mokelumne/util/file_transfer.py +++ b/mokelumne/util/file_transfer.py @@ -6,6 +6,8 @@ from pathlib import Path +ManifestEntry = dict[str, int | str] +Manifest = dict[str, str | list[ManifestEntry]] def sha256_for_file(path: Path) -> str: """Calculate the SHA256 checksum for a file.""" @@ -18,15 +20,13 @@ def sha256_for_file(path: Path) -> str: return sha256.hexdigest() -def build_file_manifest(source_path: Path) -> list[dict[str, int | str]]: - manifest = [] +def build_file_manifest(source_path: Path) -> Manifest: + files = [] - # TODO: Decide if we want to change relative_path to absolute_path - # or save the root path once for item in source_path.rglob("*"): if item.is_file(): relative_path = item.relative_to(source_path) - manifest.append( + files.append( { "path": str(relative_path), "size": item.stat().st_size, @@ -34,26 +34,29 @@ def build_file_manifest(source_path: Path) -> list[dict[str, int | str]]: } ) - return manifest + return { + "source_root": str(source_path), + "files": files, + } -def save_json(data: list[dict[str, int | str]], path: Path) -> None: +def save_json(data: Manifest, path: Path) -> None: with open(path, "w", encoding="utf-8") as json_file: json.dump(data, json_file, indent=2) -def load_json(path: Path) -> list[dict[str, int | str]]: +def load_json(path: Path) -> Manifest: with open(path, "r", encoding="utf-8") as json_file: return json.load(json_file) -def verify_file_manifest(destination_path: Path, manifest_path: Path) -> list[dict[str, int | str]]: +def verify_file_manifest(destination_path: Path, manifest_path: Path) -> list[ManifestEntry]: """Verify all copied files exist at the destination.""" - verification_report: list[dict[str, int | str]] = [] + verification_report: ManifestEntry = [] manifest = load_json(manifest_path) - for entry in manifest: + for entry in manifest["files"]: relative_path = Path(str(entry["path"])) expected_size = entry["size"] expected_sha256 = entry["sha256"] @@ -99,7 +102,7 @@ def copy_files_from_manifest(source_path: Path, destination_path: Path, manifest manifest = load_json(manifest_path) - for entry in manifest: + for entry in manifest["files"]: relative_path = Path(str(entry["path"])) source_file = source_path / relative_path destination_file = destination_path / relative_path diff --git a/test/unit/test_file_transfer.py b/test/unit/test_file_transfer.py index 2ff795d..dd6b40b 100644 --- a/test/unit/test_file_transfer.py +++ b/test/unit/test_file_transfer.py @@ -43,7 +43,10 @@ def test_build_manifest_empty_directory(self, tmp_path: Path): """Ensure that build_manifest returns an empty list for an empty directory.""" result = file_transfer.build_file_manifest(tmp_path) - assert result == [] + assert result == { + "source_root": str(tmp_path), + "files": [], + } def test_build_manifest_includes_nested_files(self, tmp_path: Path): """Ensure that build_manifest includes files recursively.""" @@ -58,7 +61,7 @@ def test_build_manifest_includes_nested_files(self, tmp_path: Path): result = file_transfer.build_file_manifest(tmp_path) - paths = {entry["path"] for entry in result} + paths = {entry["path"] for entry in result["files"]} assert paths == { "file_one.txt",