main
1package pipeline
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os"
8 "time"
9
10 "github.com/crash/upvs/internal/manifest"
11 "github.com/crash/upvs/internal/output"
12 "github.com/crash/upvs/internal/protect"
13 "github.com/crash/upvs/internal/retry"
14 "github.com/crash/upvs/internal/workerpool"
15)
16
17// FetchConfig holds configuration for the fetch phase.
18type FetchConfig struct {
19 Workers int
20 Retries int
21}
22
23// FetchResult holds the output of the fetch phase.
24type FetchResult struct {
25 Downloaded int
26 Skipped int
27 Failed int
28 Errors []error
29}
30
31// Fetch downloads clips concurrently with retry support.
32func Fetch(ctx context.Context, client *protect.Client, layout *output.Layout, cfg FetchConfig) (*FetchResult, error) {
33 clipIndex, err := manifest.ReadClipIndex(layout.ClipIndexPath())
34 if err != nil {
35 return nil, fmt.Errorf("reading clip index: %w", err)
36 }
37
38 slog.Info("starting fetch",
39 slog.Int("clips", len(clipIndex.Clips)),
40 slog.Int("workers", cfg.Workers))
41
42 result := &FetchResult{}
43 tasks := make([]workerpool.Task, 0, len(clipIndex.Clips))
44 cameraID := clipIndex.CameraID
45
46 for _, clip := range clipIndex.Clips {
47 // Skip if already complete
48 if clip.Status == manifest.StatusComplete {
49 if fileExists(clip.FilePath) {
50 result.Skipped++
51 continue
52 }
53 }
54
55 task := func(ctx context.Context) error {
56 return downloadClip(ctx, client, cameraID, clip, cfg.Retries)
57 }
58 tasks = append(tasks, task)
59 }
60
61 if len(tasks) == 0 {
62 slog.Info("no clips to download")
63 return result, nil
64 }
65
66 pool := workerpool.New(cfg.Workers)
67 results := pool.RunWithProgress(ctx, tasks, func(completed, total int) {
68 slog.Info("fetch progress",
69 slog.Int("completed", completed),
70 slog.Int("total", total))
71 })
72
73 // Count results
74 for _, r := range results {
75 if r.Error != nil {
76 result.Failed++
77 result.Errors = append(result.Errors, r.Error)
78 } else {
79 result.Downloaded++
80 }
81 }
82
83 // Save updated clip index
84 err = clipIndex.Write(layout.ClipIndexPath())
85 if err != nil {
86 return result, fmt.Errorf("writing clip index: %w", err)
87 }
88
89 slog.Info("fetch complete",
90 slog.Int("downloaded", result.Downloaded),
91 slog.Int("skipped", result.Skipped),
92 slog.Int("failed", result.Failed))
93
94 return result, nil
95}
96
97func downloadClip(ctx context.Context, client *protect.Client, cameraID string, clip *manifest.ClipEntry, maxRetries int) error {
98 partialPath := clip.FilePath + ".partial"
99
100 // Clean up any existing partial file
101 os.Remove(partialPath)
102
103 clip.Status = manifest.StatusInProgress
104
105 retryCfg := retry.DefaultConfig()
106 retryCfg.MaxAttempts = maxRetries
107
108 err := retry.Do(ctx, retryCfg, func() error {
109 return doDownload(ctx, client, cameraID, clip, partialPath)
110 })
111
112 if err != nil {
113 clip.Status = manifest.StatusFailed
114 clip.Error = err.Error()
115 return fmt.Errorf("downloading clip (event_id=%s): %w", clip.EventID, err)
116 }
117
118 // Atomic rename
119 err = os.Rename(partialPath, clip.FilePath)
120 if err != nil {
121 clip.Status = manifest.StatusFailed
122 clip.Error = err.Error()
123 return fmt.Errorf("renaming clip (event_id=%s): %w", clip.EventID, err)
124 }
125
126 // Get file size
127 info, err := os.Stat(clip.FilePath)
128 if err == nil {
129 clip.FileSize = info.Size()
130 }
131
132 now := time.Now()
133 clip.DownloadedAt = &now
134 clip.Status = manifest.StatusComplete
135 clip.Error = ""
136
137 return nil
138}
139
140func doDownload(ctx context.Context, client *protect.Client, cameraID string, clip *manifest.ClipEntry, partialPath string) error {
141 f, err := os.Create(partialPath)
142 if err != nil {
143 return fmt.Errorf("creating file: %w", err)
144 }
145 defer f.Close()
146
147 err = client.DownloadClip(ctx, cameraID, clip.StartMs, clip.EndMs, f)
148 if err != nil {
149 return err
150 }
151
152 err = f.Sync()
153 if err != nil {
154 return fmt.Errorf("syncing file: %w", err)
155 }
156
157 return nil
158}
159
160func fileExists(path string) bool {
161 info, err := os.Stat(path)
162 if err != nil {
163 return false
164 }
165 return info.Size() > 0
166}