Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 7 additions & 19 deletions src/prevayler_clj_aws/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,12 @@
unmarshal-fn))

(defn- read-snapshot [s3-cli s3-sdk-cli bucket snapshot-path]
(if (snapshot-exists? s3-cli bucket snapshot-path)
(let [v2-path (snapshot-v2-path snapshot-path)
snap1 (read-object s3-sdk-cli bucket snapshot-path unmarshal)]
(try
(if (snapshot-exists? s3-cli bucket v2-path)
(let [snap2 (read-object s3-sdk-cli bucket v2-path unmarshal-from-in)]
(println "Snapshot v1" (if (= snap1 snap2) "IS" "IS NOT") "equal to v2"))
(println v2-path "object not found in bucket."))
(catch Exception e
(.printStackTrace e)))
snap1)
{:partkey 0}))

(defn- save-snapshot! [s3-cli s3-sdk-cli bucket snapshot-path snapshot]
(util/aws-invoke s3-cli {:op :PutObject
:request {:Bucket bucket
:Key snapshot-path
:Body (marshal snapshot)}})
(let [v2-path (snapshot-v2-path snapshot-path)]
(if (snapshot-exists? s3-cli bucket v2-path)
(read-object s3-sdk-cli bucket v2-path unmarshal-from-in)
{:partkey 0})))

(defn- save-snapshot! [s3-sdk-cli bucket snapshot-path snapshot]
(try
(let [v2-path (snapshot-v2-path snapshot-path)
temp-file (java.io.File/createTempFile "snapshot" "")] ; We use an intermediary file to easily determine the length of the stream. Otherwise, to determine its length, Amazon's SDK would buffer the entire stream in RAM, defeating our purpose.
Expand Down Expand Up @@ -160,7 +148,7 @@
(println "Saving snapshot to bucket...")
; Since s3 update is atomic, if saving snapshot fails next prevayler will pick the previous state
; and restore events from the previous partkey
(save-snapshot! s3-client s3-sdk-cli s3-bucket snapshot-path {:state @state-atom
(save-snapshot! s3-sdk-cli s3-bucket snapshot-path {:state @state-atom
:partkey (inc @snapshot-index-atom)})
(println "Snapshot done.")
(swap! snapshot-index-atom inc)
Expand Down
17 changes: 3 additions & 14 deletions test/prevayler_clj_aws/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@
(testing "snapshot-v2 is the default snapshot file name"
(let [{{:keys [s3-client s3-bucket]} :aws-opts :as opts} (gen-opts)
_ (prev! opts)]
(is (match? [{:Key "snapshot"} {:Key "snapshot-v2"}] (list-objects s3-client s3-bucket)))))
(is (match? [{:Key "snapshot-v2"}] (list-objects s3-client s3-bucket)))))

(testing "can override snapshot file name"
(let [{{:keys [s3-client s3-bucket]} :aws-opts :as opts} (gen-opts :aws-opts {:snapshot-path "my-path"})
_ (prev! opts)]
(is (match? [{:Key "my-path"} {:Key "my-path-v2"}] (list-objects s3-client s3-bucket)))))
(is (match? [{:Key "my-path-v2"}] (list-objects s3-client s3-bucket)))))

(testing "default initial state is empty map"
(let [prevayler (prev! (gen-opts))]
Expand Down Expand Up @@ -154,15 +154,4 @@
(prevayler/snapshot! prev1)
(prevayler/snapshot! prev1)
(let [prev2 (prev! (assoc opts :business-fn (constantly "rubbish")))]
(is (= ["A" "B" "C" "D"] @prev2)))))

(testing "it generates snapshot v2"
(let [{{:keys [s3-client s3-sdk-cli s3-bucket]} :aws-opts :as opts} (gen-opts)
_ (util/aws-invoke s3-client {:op :PutObject
:request {:Bucket s3-bucket
:Key "snapshot"
:Body (#'core/marshal {:partkey 0
:state :state})}})
prev (prev! opts)] ;; saves snapshot-v2
(is (= :state @prev))
(is (= {:state :state :partkey 1} (#'core/read-object s3-sdk-cli s3-bucket "snapshot-v2" #'core/unmarshal-from-in))))))
(is (= ["A" "B" "C" "D"] @prev2))))))