Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions mokelumne/dags/copy_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""File copy DAG to transfer files from one location to another."""

from __future__ import annotations

import logging
import os

from pathlib import Path

from airflow.sdk import Param, dag, get_current_context, task
from airflow.sdk.exceptions import AirflowFailException

from mokelumne.util.storage import run_dir
from mokelumne.util.file_transfer import (
build_file_manifest,
copy_files_from_manifest,
load_json,
save_json,
verify_file_manifest,
)

logger = logging.getLogger(__name__)


@dag(
description="Transfers files from one location to another",
schedule=None,
catchup=False,
params={
"source": Param(
default="",
title="Source Directory",
description="Directory where source files are found",
type="string",
),
"destination": Param(
default="",
title="Destination Directory",
description="Directory where source files will be copied to",
type="string",
),
Comment thread
anarchivist marked this conversation as resolved.
},
tags=["file-transfer"],
)
def copy_files():
"""Copy files from a source directory to an empty destination directory."""

@task
def validate_source() -> str:
"""Checks that the source is a valid directory."""

ctx = get_current_context()
source = ctx["params"]["source"]

source_path = Path(source)
if not source_path.exists():
raise AirflowFailException(f"Source directory does not exist: {source_path}")

if not source_path.is_dir():
raise AirflowFailException(f"Source is not a directory: {source_path}")

return source

@task
def prepare_destination() -> str:
"""
Prepare the destination directory.
TODO: one of the requirements that we got from Lynne is that the destination
should be an incoming subdirectory of any directory on PA or DA,
e.g. /srv/pa/aerial/ucb/incoming. i don't have a good answer for how to
approach this, but i'm curious about how you think we should confirm that 1)
we're actually writing to the appropriate incoming subdirectory, and if the
incoming directory exists and has files in it, we should fail the job.
"""

ctx = get_current_context()
destination = ctx["params"]["destination"]

destination_path = Path(destination)

if not destination_path.exists():
logger.info("Creating destination directory: %s", destination_path)
destination_path.mkdir(parents=True)
Comment thread
anarchivist marked this conversation as resolved.

if not destination_path.is_dir():
raise AirflowFailException(f"Destination is not a directory: {destination_path}")

if next(os.scandir(destination_path), None) is not None:
raise AirflowFailException(f"Destination directory contains files: {destination_path}")

return destination

@task
def build_manifest(source: str) -> str:
"""Build a manifest of all files under the source directory."""

source_path = Path(source)
manifest = build_file_manifest(source_path)

if not manifest:
raise AirflowFailException(f"No files found in source: {source_path}")

logger.info("Manifest contains %s file(s)", len(manifest))

ctx = get_current_context()
manifest_path = run_dir(ctx["run_id"]) / "manifest.json"

save_json(manifest, manifest_path)

logger.info("Manifest written to: %s", manifest_path)

return str(manifest_path)

@task
def copy_manifest_files(source: str, destination: str, manifest_path: str) -> None:
"""Copy all files in manifest to destination directory."""

try:
copy_files_from_manifest(
Path(source),
Path(destination),
Path(manifest_path),
)
except Exception as ex:
raise AirflowFailException(
f"Manifest copy failed: {ex}"
) from ex

manifest = load_json(Path(manifest_path))
logger.info("Copied %s file(s)", len(manifest))

@task
def verify_manifest(destination: str, manifest_path: str) -> None:
"""Verify all copied files exist at the destination."""

try:
verification_report = verify_file_manifest(
Path(destination),
Path(manifest_path),
)
except Exception as ex:
raise AirflowFailException(
f"Manifest verification failed: {ex}"
) from ex

ctx = get_current_context()
run_path = run_dir(ctx["run_id"])

verification_report_path = run_path / "verification_report.json"

save_json(verification_report, verification_report_path)

logger.info("Verification report written to: %s", verification_report_path)
logger.info("Verified %s copied file(s)", len(verification_report))

validated_source = validate_source()
prepared_destination = prepare_destination()
manifest = build_manifest(validated_source)
copied_manifest_files = copy_manifest_files(
validated_source,
prepared_destination,
manifest,
)
verified_manifest = verify_manifest(prepared_destination, manifest)

(
validated_source
>> prepared_destination
>> manifest
>> copied_manifest_files
>> verified_manifest
)

copy_files() # pyright: ignore[reportUnusedExpression]
118 changes: 118 additions & 0 deletions mokelumne/util/file_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Provides file transfer routines."""

import hashlib
import json
import shutil

from pathlib import Path

ManifestEntry = dict[str, int | str]
Manifest = dict[str, str | list[ManifestEntry]]

def sha256_for_file(path: Path) -> str:
"""Calculate the SHA256 checksum for a file."""
sha256 = hashlib.sha256()

with open(path, "rb") as file:
for chunk in iter(lambda: file.read(8192), b""):
sha256.update(chunk)

return sha256.hexdigest()


def build_file_manifest(source_path: Path) -> Manifest:
files = []

for item in source_path.rglob("*"):
if item.is_file():
relative_path = item.relative_to(source_path)
files.append(
{
"path": str(relative_path),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why you're preferring relative path here? my inclination would be to use the absolute path to ensure there's a lack of ambiguity, but happy to hear your thoughts otherwise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the manifest is intended to describe the directory tree being transferred (not necessarily an absolute location). I'm not married to the idea, I can change it if you think it would be better...or I can add the source root separately:

{
"source_root": "/the/root/directory",
"files": [
{
"path": "test4.txt",
"size": 5,
"sha256": "..."
},
{
"path": "otherother/test.txt",
"size": 4,
"sha256": "..."
}
]
}

Think I should make one of those changes now or next iteration?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd add the source root info somewhere now - i think it's useful context to have in the manifest if we're trying to diagnose issues.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - added "source_root":

{
  "source_root": "tmp/source",
  "files": [
    {
      "path": "test4.txt",
      "size": 5,
      "sha256": "b9cca56a720f2beee61f2e744ab3d20a95772a4315d18c5eee251a465f078012"
    }
  ]
}

"size": item.stat().st_size,
"sha256": sha256_for_file(item),
}
)

return {
"source_root": str(source_path),
"files": files,
}


def save_json(data: Manifest, path: Path) -> None:
with open(path, "w", encoding="utf-8") as json_file:
json.dump(data, json_file, indent=2)


def load_json(path: Path) -> Manifest:
with open(path, "r", encoding="utf-8") as json_file:
return json.load(json_file)


def verify_file_manifest(destination_path: Path, manifest_path: Path) -> list[ManifestEntry]:
"""Verify all copied files exist at the destination."""

verification_report: ManifestEntry = []
manifest = load_json(manifest_path)

for entry in manifest["files"]:
relative_path = Path(str(entry["path"]))
expected_size = entry["size"]
expected_sha256 = entry["sha256"]

destination_file = destination_path / relative_path

if not destination_file.exists():
raise FileNotFoundError(f"File not found: {destination_file}")

if not destination_file.is_file():
raise ValueError(f"Path exists but is not a file: {destination_file}")

actual_size = destination_file.stat().st_size

if actual_size != expected_size:
raise ValueError(
f"Size mismatch for {destination_file}: "
f"expected {expected_size}, got {actual_size}"
)

actual_sha256 = sha256_for_file(destination_file)

if actual_sha256 != expected_sha256:
raise ValueError(
f"Checksum mismatch for {destination_file}: "
f"expected {expected_sha256}, got {actual_sha256}"
)

verification_report.append(
{
"path": str(relative_path),
"status": "verified",
"size": expected_size,
"sha256": expected_sha256,
}
)

return verification_report


def copy_files_from_manifest(source_path: Path, destination_path: Path, manifest_path: Path) -> None:
"""Copy all files in manifest to destination directory."""

manifest = load_json(manifest_path)

for entry in manifest["files"]:
relative_path = Path(str(entry["path"]))
source_file = source_path / relative_path
destination_file = destination_path / relative_path

if not source_file.exists():
raise FileNotFoundError(f"Manifest file missing from source: {source_file}")

if not source_file.is_file():
raise ValueError(f"Manifest path is not a file: {source_file}")

destination_file.parent.mkdir(parents=True, exist_ok=True)

shutil.copy2(source_file, destination_file)
Loading
Loading