main
1// Package writer provides WARC file writing functionality using the gowarc library.
2//
3// The WARCWriter is a pipeline.Processor implementation that converts
4// RecordedURL objects into WARC records and writes them to disk. It uses
5// the gowarc library's file rotator to manage file sizes and naming.
6//
7// Key features:
8// - Writes both request and response records for each captured URL
9// - Supports gzip and zstd compression
10// - Automatic file rotation when size limits are reached
11// - Configurable file naming prefix
12// - Uses spooled temp files to minimize memory usage for large payloads
13//
14// The writer runs asynchronously via gowarc's file rotator, receiving
15// record batches through a channel. This decouples the proxy request
16// handling from the slower disk I/O operations.
17//
18// WARC records follow the ISO 28500 standard format, with each captured
19// HTTP transaction resulting in a request record and response record pair.
20package writer
21
22import (
23 "bytes"
24 "context"
25 "fmt"
26 "log/slog"
27 "net/http"
28 "time"
29
30 "github.com/google/uuid"
31 "github.com/internetarchive/gowarc"
32 "github.com/internetarchive/gowarc/pkg/spooledtempfile"
33 "github.com/internetarchive/gowarcprox/internal/models"
34 "github.com/internetarchive/gowarcprox/pkg/config"
35)
36
37// WARCWriter is a pipeline processor that writes WARC records
38type WARCWriter struct {
39 config *config.Config
40 recordChan chan *warc.RecordBatch
41 doneChans []chan bool
42 rotatorSettings *warc.RotatorSettings
43 logger *slog.Logger
44 recordCount int64
45 bytesWritten int64
46}
47
48// NewWARCWriter creates a new WARC writer processor
49func NewWARCWriter(cfg *config.Config, logger *slog.Logger) (*WARCWriter, error) {
50 if logger == nil {
51 logger = slog.Default()
52 }
53
54 // Determine compression type
55 compression := ""
56 if cfg.WARCCompression == "gzip" {
57 compression = "gzip"
58 } else if cfg.WARCCompression == "zstd" {
59 compression = "zstd"
60 }
61
62 // Create rotator settings
63 rotatorSettings := &warc.RotatorSettings{
64 WarcinfoContent: warc.Header{
65 "software": "gowarcprox/0.1.0",
66 "format": "WARC File Format 1.1",
67 "conformsTo": "http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1-1_latestdraft.pdf",
68 "robots": "classic",
69 "description": "WARC created by gowarcprox",
70 },
71 Prefix: cfg.WARCPrefix,
72 Compression: compression,
73 WARCSize: float64(cfg.WARCSize) / 1000000, // Convert bytes to MB
74 WARCWriterPoolSize: cfg.WARCWriterThreads,
75 OutputDirectory: cfg.WARCDirectory + "/",
76 }
77
78 // Create WARC rotator (this starts goroutines)
79 recordChan, doneChans, err := rotatorSettings.NewWARCRotator()
80 if err != nil {
81 return nil, fmt.Errorf("failed to create WARC rotator: %w", err)
82 }
83
84 w := &WARCWriter{
85 config: cfg,
86 recordChan: recordChan,
87 doneChans: doneChans,
88 rotatorSettings: rotatorSettings,
89 logger: logger,
90 }
91
92 logger.Info("WARC writer initialized",
93 "prefix", cfg.WARCPrefix,
94 "directory", cfg.WARCDirectory,
95 "compression", compression,
96 "writers", cfg.WARCWriterThreads)
97
98 return w, nil
99}
100
101// Name returns the processor name
102func (w *WARCWriter) Name() string {
103 return "WARCWriter"
104}
105
106// Process writes a recorded URL to WARC format
107func (w *WARCWriter) Process(ctx context.Context, ru *models.RecordedURL) error {
108 // Create WARC records
109 requestRecord, err := w.createRequestRecord(ru)
110 if err != nil {
111 return fmt.Errorf("failed to create request record: %w", err)
112 }
113
114 responseRecord, err := w.createResponseRecord(ru)
115 if err != nil {
116 return fmt.Errorf("failed to create response record: %w", err)
117 }
118
119 // Create record batch
120 batch := &warc.RecordBatch{
121 CaptureTime: ru.Timestamp.Format(time.RFC3339),
122 Records: []*warc.Record{requestRecord, responseRecord},
123 }
124
125 // Send to rotator channel
126 select {
127 case w.recordChan <- batch:
128 w.recordCount += 2
129 w.bytesWritten += int64(len(ru.RequestBody)) + int64(len(ru.ResponseBody))
130
131 w.logger.Debug("wrote WARC records",
132 "url", ru.URL,
133 "response_id", ru.WARCRecordID)
134 case <-ctx.Done():
135 return fmt.Errorf("context cancelled")
136 }
137
138 return nil
139}
140
141// createRequestRecord creates a WARC request record
142func (w *WARCWriter) createRequestRecord(ru *models.RecordedURL) (*warc.Record, error) {
143 // Build HTTP request block
144 var buf bytes.Buffer
145 fmt.Fprintf(&buf, "%s %s HTTP/1.1\r\n", ru.Method, ru.URL)
146
147 // Write headers
148 for name, values := range ru.RequestHeader {
149 for _, value := range values {
150 fmt.Fprintf(&buf, "%s: %s\r\n", name, value)
151 }
152 }
153 buf.WriteString("\r\n")
154
155 // Write body
156 buf.Write(ru.RequestBody)
157
158 // Create spooled temp file for content
159 content := spooledtempfile.NewSpooledTempFile("gowarcprox-req-", "", w.config.TmpFileMaxMemory, false, 0.5)
160 if _, err := content.Write(buf.Bytes()); err != nil {
161 return nil, fmt.Errorf("failed to write request content: %w", err)
162 }
163 if _, err := content.Seek(0, 0); err != nil {
164 return nil, fmt.Errorf("failed to seek request content: %w", err)
165 }
166
167 // Create record
168 record := &warc.Record{
169 Header: warc.Header{},
170 Content: content,
171 Version: "WARC/1.1",
172 }
173
174 // Set WARC headers
175 record.Header.Set("WARC-Type", "request")
176 record.Header.Set("WARC-Record-ID", "<urn:uuid:"+uuid.New().String()+">")
177 record.Header.Set("WARC-Date", ru.Timestamp.Format(time.RFC3339))
178 record.Header.Set("Content-Type", "application/http; msgtype=request")
179 record.Header.Set("WARC-Target-URI", ru.URL)
180 if ru.RemoteIP != "" {
181 record.Header.Set("WARC-IP-Address", ru.RemoteIP)
182 }
183
184 return record, nil
185}
186
187// createResponseRecord creates a WARC response record
188func (w *WARCWriter) createResponseRecord(ru *models.RecordedURL) (*warc.Record, error) {
189 // Build HTTP response block
190 var buf bytes.Buffer
191
192 // Status line
193 statusText := http.StatusText(ru.StatusCode)
194 if statusText == "" {
195 statusText = ru.StatusMessage
196 }
197 fmt.Fprintf(&buf, "HTTP/1.1 %d %s\r\n", ru.StatusCode, statusText)
198
199 // Write headers
200 for name, values := range ru.ResponseHeader {
201 for _, value := range values {
202 fmt.Fprintf(&buf, "%s: %s\r\n", name, value)
203 }
204 }
205 buf.WriteString("\r\n")
206
207 // Write body
208 buf.Write(ru.ResponseBody)
209
210 // Create spooled temp file for content
211 content := spooledtempfile.NewSpooledTempFile("gowarcprox-resp-", "", w.config.TmpFileMaxMemory, false, 0.5)
212 if _, err := content.Write(buf.Bytes()); err != nil {
213 return nil, fmt.Errorf("failed to write response content: %w", err)
214 }
215 if _, err := content.Seek(0, 0); err != nil {
216 return nil, fmt.Errorf("failed to seek response content: %w", err)
217 }
218
219 // Generate UUID for this record
220 recordID := uuid.New().String()
221 ru.WARCRecordID = recordID // Store for later use (e.g., dedup)
222
223 // Create record
224 record := &warc.Record{
225 Header: warc.Header{},
226 Content: content,
227 Version: "WARC/1.1",
228 }
229
230 // Set WARC headers
231 record.Header.Set("WARC-Type", "response")
232 record.Header.Set("WARC-Record-ID", "<urn:uuid:"+recordID+">")
233 record.Header.Set("WARC-Date", ru.Timestamp.Format(time.RFC3339))
234 record.Header.Set("Content-Type", "application/http; msgtype=response")
235 record.Header.Set("WARC-Target-URI", ru.URL)
236 if ru.RemoteIP != "" {
237 record.Header.Set("WARC-IP-Address", ru.RemoteIP)
238 }
239
240 // Set payload digest
241 if ru.PayloadDigest != "" {
242 record.Header.Set("WARC-Payload-Digest", ru.PayloadDigest)
243 }
244
245 return record, nil
246}
247
248// Close closes the WARC writer
249func (w *WARCWriter) Close() error {
250 w.logger.Info("closing WARC writer",
251 "records_written", w.recordCount,
252 "bytes_written", w.bytesWritten)
253
254 // Close the record channel to signal rotator goroutines to finish
255 close(w.recordChan)
256
257 // Wait for all rotator goroutines to finish
258 for _, doneChan := range w.doneChans {
259 <-doneChan
260 }
261
262 return nil
263}
264
265// GetStats returns writer statistics
266func (w *WARCWriter) GetStats() (int64, int64) {
267 return w.recordCount, w.bytesWritten
268}