main
Raw Download raw file
  1package pipeline
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log/slog"
  7	"os"
  8	"time"
  9
 10	"github.com/crash/upvs/internal/manifest"
 11	"github.com/crash/upvs/internal/output"
 12	"github.com/crash/upvs/internal/protect"
 13	"github.com/crash/upvs/internal/retry"
 14	"github.com/crash/upvs/internal/workerpool"
 15)
 16
 17// FetchConfig holds configuration for the fetch phase.
 18type FetchConfig struct {
 19	Workers int
 20	Retries int
 21}
 22
 23// FetchResult holds the output of the fetch phase.
 24type FetchResult struct {
 25	Downloaded int
 26	Skipped    int
 27	Failed     int
 28	Errors     []error
 29}
 30
 31// Fetch downloads clips concurrently with retry support.
 32func Fetch(ctx context.Context, client *protect.Client, layout *output.Layout, cfg FetchConfig) (*FetchResult, error) {
 33	clipIndex, err := manifest.ReadClipIndex(layout.ClipIndexPath())
 34	if err != nil {
 35		return nil, fmt.Errorf("reading clip index: %w", err)
 36	}
 37
 38	slog.Info("starting fetch",
 39		slog.Int("clips", len(clipIndex.Clips)),
 40		slog.Int("workers", cfg.Workers))
 41
 42	result := &FetchResult{}
 43	tasks := make([]workerpool.Task, 0, len(clipIndex.Clips))
 44	cameraID := clipIndex.CameraID
 45
 46	for _, clip := range clipIndex.Clips {
 47		// Skip if already complete
 48		if clip.Status == manifest.StatusComplete {
 49			if fileExists(clip.FilePath) {
 50				result.Skipped++
 51				continue
 52			}
 53		}
 54
 55		task := func(ctx context.Context) error {
 56			return downloadClip(ctx, client, cameraID, clip, cfg.Retries)
 57		}
 58		tasks = append(tasks, task)
 59	}
 60
 61	if len(tasks) == 0 {
 62		slog.Info("no clips to download")
 63		return result, nil
 64	}
 65
 66	pool := workerpool.New(cfg.Workers)
 67	results := pool.RunWithProgress(ctx, tasks, func(completed, total int) {
 68		slog.Info("fetch progress",
 69			slog.Int("completed", completed),
 70			slog.Int("total", total))
 71	})
 72
 73	// Count results
 74	for _, r := range results {
 75		if r.Error != nil {
 76			result.Failed++
 77			result.Errors = append(result.Errors, r.Error)
 78		} else {
 79			result.Downloaded++
 80		}
 81	}
 82
 83	// Save updated clip index
 84	err = clipIndex.Write(layout.ClipIndexPath())
 85	if err != nil {
 86		return result, fmt.Errorf("writing clip index: %w", err)
 87	}
 88
 89	slog.Info("fetch complete",
 90		slog.Int("downloaded", result.Downloaded),
 91		slog.Int("skipped", result.Skipped),
 92		slog.Int("failed", result.Failed))
 93
 94	return result, nil
 95}
 96
 97func downloadClip(ctx context.Context, client *protect.Client, cameraID string, clip *manifest.ClipEntry, maxRetries int) error {
 98	partialPath := clip.FilePath + ".partial"
 99
100	// Clean up any existing partial file
101	os.Remove(partialPath)
102
103	clip.Status = manifest.StatusInProgress
104
105	retryCfg := retry.DefaultConfig()
106	retryCfg.MaxAttempts = maxRetries
107
108	err := retry.Do(ctx, retryCfg, func() error {
109		return doDownload(ctx, client, cameraID, clip, partialPath)
110	})
111
112	if err != nil {
113		clip.Status = manifest.StatusFailed
114		clip.Error = err.Error()
115		return fmt.Errorf("downloading clip (event_id=%s): %w", clip.EventID, err)
116	}
117
118	// Atomic rename
119	err = os.Rename(partialPath, clip.FilePath)
120	if err != nil {
121		clip.Status = manifest.StatusFailed
122		clip.Error = err.Error()
123		return fmt.Errorf("renaming clip (event_id=%s): %w", clip.EventID, err)
124	}
125
126	// Get file size
127	info, err := os.Stat(clip.FilePath)
128	if err == nil {
129		clip.FileSize = info.Size()
130	}
131
132	now := time.Now()
133	clip.DownloadedAt = &now
134	clip.Status = manifest.StatusComplete
135	clip.Error = ""
136
137	return nil
138}
139
140func doDownload(ctx context.Context, client *protect.Client, cameraID string, clip *manifest.ClipEntry, partialPath string) error {
141	f, err := os.Create(partialPath)
142	if err != nil {
143		return fmt.Errorf("creating file: %w", err)
144	}
145	defer f.Close()
146
147	err = client.DownloadClip(ctx, cameraID, clip.StartMs, clip.EndMs, f)
148	if err != nil {
149		return err
150	}
151
152	err = f.Sync()
153	if err != nil {
154		return fmt.Errorf("syncing file: %w", err)
155	}
156
157	return nil
158}
159
160func fileExists(path string) bool {
161	info, err := os.Stat(path)
162	if err != nil {
163		return false
164	}
165	return info.Size() > 0
166}