main
Raw Download raw file
  1// Package writer provides WARC file writing functionality using the gowarc library.
  2//
  3// The WARCWriter is a pipeline.Processor implementation that converts
  4// RecordedURL objects into WARC records and writes them to disk. It uses
  5// the gowarc library's file rotator to manage file sizes and naming.
  6//
  7// Key features:
  8//   - Writes both request and response records for each captured URL
  9//   - Supports gzip and zstd compression
 10//   - Automatic file rotation when size limits are reached
 11//   - Configurable file naming prefix
 12//   - Uses spooled temp files to minimize memory usage for large payloads
 13//
 14// The writer runs asynchronously via gowarc's file rotator, receiving
 15// record batches through a channel. This decouples the proxy request
 16// handling from the slower disk I/O operations.
 17//
 18// WARC records follow the ISO 28500 standard format, with each captured
 19// HTTP transaction resulting in a request record and response record pair.
 20package writer
 21
 22import (
 23	"bytes"
 24	"context"
 25	"fmt"
 26	"log/slog"
 27	"net/http"
 28	"time"
 29
 30	"github.com/google/uuid"
 31	"github.com/internetarchive/gowarc"
 32	"github.com/internetarchive/gowarc/pkg/spooledtempfile"
 33	"github.com/internetarchive/gowarcprox/internal/models"
 34	"github.com/internetarchive/gowarcprox/pkg/config"
 35)
 36
 37// WARCWriter is a pipeline processor that writes WARC records
 38type WARCWriter struct {
 39	config          *config.Config
 40	recordChan      chan *warc.RecordBatch
 41	doneChans       []chan bool
 42	rotatorSettings *warc.RotatorSettings
 43	logger          *slog.Logger
 44	recordCount     int64
 45	bytesWritten    int64
 46}
 47
 48// NewWARCWriter creates a new WARC writer processor
 49func NewWARCWriter(cfg *config.Config, logger *slog.Logger) (*WARCWriter, error) {
 50	if logger == nil {
 51		logger = slog.Default()
 52	}
 53
 54	// Determine compression type
 55	compression := ""
 56	if cfg.WARCCompression == "gzip" {
 57		compression = "gzip"
 58	} else if cfg.WARCCompression == "zstd" {
 59		compression = "zstd"
 60	}
 61
 62	// Create rotator settings
 63	rotatorSettings := &warc.RotatorSettings{
 64		WarcinfoContent: warc.Header{
 65			"software":    "gowarcprox/0.1.0",
 66			"format":      "WARC File Format 1.1",
 67			"conformsTo":  "http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1-1_latestdraft.pdf",
 68			"robots":      "classic",
 69			"description": "WARC created by gowarcprox",
 70		},
 71		Prefix:             cfg.WARCPrefix,
 72		Compression:        compression,
 73		WARCSize:           float64(cfg.WARCSize) / 1000000, // Convert bytes to MB
 74		WARCWriterPoolSize: cfg.WARCWriterThreads,
 75		OutputDirectory:    cfg.WARCDirectory + "/",
 76	}
 77
 78	// Create WARC rotator (this starts goroutines)
 79	recordChan, doneChans, err := rotatorSettings.NewWARCRotator()
 80	if err != nil {
 81		return nil, fmt.Errorf("failed to create WARC rotator: %w", err)
 82	}
 83
 84	w := &WARCWriter{
 85		config:          cfg,
 86		recordChan:      recordChan,
 87		doneChans:       doneChans,
 88		rotatorSettings: rotatorSettings,
 89		logger:          logger,
 90	}
 91
 92	logger.Info("WARC writer initialized",
 93		"prefix", cfg.WARCPrefix,
 94		"directory", cfg.WARCDirectory,
 95		"compression", compression,
 96		"writers", cfg.WARCWriterThreads)
 97
 98	return w, nil
 99}
100
101// Name returns the processor name
102func (w *WARCWriter) Name() string {
103	return "WARCWriter"
104}
105
106// Process writes a recorded URL to WARC format
107func (w *WARCWriter) Process(ctx context.Context, ru *models.RecordedURL) error {
108	// Create WARC records
109	requestRecord, err := w.createRequestRecord(ru)
110	if err != nil {
111		return fmt.Errorf("failed to create request record: %w", err)
112	}
113
114	responseRecord, err := w.createResponseRecord(ru)
115	if err != nil {
116		return fmt.Errorf("failed to create response record: %w", err)
117	}
118
119	// Create record batch
120	batch := &warc.RecordBatch{
121		CaptureTime: ru.Timestamp.Format(time.RFC3339),
122		Records:     []*warc.Record{requestRecord, responseRecord},
123	}
124
125	// Send to rotator channel
126	select {
127	case w.recordChan <- batch:
128		w.recordCount += 2
129		w.bytesWritten += int64(len(ru.RequestBody)) + int64(len(ru.ResponseBody))
130
131		w.logger.Debug("wrote WARC records",
132			"url", ru.URL,
133			"response_id", ru.WARCRecordID)
134	case <-ctx.Done():
135		return fmt.Errorf("context cancelled")
136	}
137
138	return nil
139}
140
141// createRequestRecord creates a WARC request record
142func (w *WARCWriter) createRequestRecord(ru *models.RecordedURL) (*warc.Record, error) {
143	// Build HTTP request block
144	var buf bytes.Buffer
145	fmt.Fprintf(&buf, "%s %s HTTP/1.1\r\n", ru.Method, ru.URL)
146
147	// Write headers
148	for name, values := range ru.RequestHeader {
149		for _, value := range values {
150			fmt.Fprintf(&buf, "%s: %s\r\n", name, value)
151		}
152	}
153	buf.WriteString("\r\n")
154
155	// Write body
156	buf.Write(ru.RequestBody)
157
158	// Create spooled temp file for content
159	content := spooledtempfile.NewSpooledTempFile("gowarcprox-req-", "", w.config.TmpFileMaxMemory, false, 0.5)
160	if _, err := content.Write(buf.Bytes()); err != nil {
161		return nil, fmt.Errorf("failed to write request content: %w", err)
162	}
163	if _, err := content.Seek(0, 0); err != nil {
164		return nil, fmt.Errorf("failed to seek request content: %w", err)
165	}
166
167	// Create record
168	record := &warc.Record{
169		Header:  warc.Header{},
170		Content: content,
171		Version: "WARC/1.1",
172	}
173
174	// Set WARC headers
175	record.Header.Set("WARC-Type", "request")
176	record.Header.Set("WARC-Record-ID", "<urn:uuid:"+uuid.New().String()+">")
177	record.Header.Set("WARC-Date", ru.Timestamp.Format(time.RFC3339))
178	record.Header.Set("Content-Type", "application/http; msgtype=request")
179	record.Header.Set("WARC-Target-URI", ru.URL)
180	if ru.RemoteIP != "" {
181		record.Header.Set("WARC-IP-Address", ru.RemoteIP)
182	}
183
184	return record, nil
185}
186
187// createResponseRecord creates a WARC response record
188func (w *WARCWriter) createResponseRecord(ru *models.RecordedURL) (*warc.Record, error) {
189	// Build HTTP response block
190	var buf bytes.Buffer
191
192	// Status line
193	statusText := http.StatusText(ru.StatusCode)
194	if statusText == "" {
195		statusText = ru.StatusMessage
196	}
197	fmt.Fprintf(&buf, "HTTP/1.1 %d %s\r\n", ru.StatusCode, statusText)
198
199	// Write headers
200	for name, values := range ru.ResponseHeader {
201		for _, value := range values {
202			fmt.Fprintf(&buf, "%s: %s\r\n", name, value)
203		}
204	}
205	buf.WriteString("\r\n")
206
207	// Write body
208	buf.Write(ru.ResponseBody)
209
210	// Create spooled temp file for content
211	content := spooledtempfile.NewSpooledTempFile("gowarcprox-resp-", "", w.config.TmpFileMaxMemory, false, 0.5)
212	if _, err := content.Write(buf.Bytes()); err != nil {
213		return nil, fmt.Errorf("failed to write response content: %w", err)
214	}
215	if _, err := content.Seek(0, 0); err != nil {
216		return nil, fmt.Errorf("failed to seek response content: %w", err)
217	}
218
219	// Generate UUID for this record
220	recordID := uuid.New().String()
221	ru.WARCRecordID = recordID // Store for later use (e.g., dedup)
222
223	// Create record
224	record := &warc.Record{
225		Header:  warc.Header{},
226		Content: content,
227		Version: "WARC/1.1",
228	}
229
230	// Set WARC headers
231	record.Header.Set("WARC-Type", "response")
232	record.Header.Set("WARC-Record-ID", "<urn:uuid:"+recordID+">")
233	record.Header.Set("WARC-Date", ru.Timestamp.Format(time.RFC3339))
234	record.Header.Set("Content-Type", "application/http; msgtype=response")
235	record.Header.Set("WARC-Target-URI", ru.URL)
236	if ru.RemoteIP != "" {
237		record.Header.Set("WARC-IP-Address", ru.RemoteIP)
238	}
239
240	// Set payload digest
241	if ru.PayloadDigest != "" {
242		record.Header.Set("WARC-Payload-Digest", ru.PayloadDigest)
243	}
244
245	return record, nil
246}
247
248// Close closes the WARC writer
249func (w *WARCWriter) Close() error {
250	w.logger.Info("closing WARC writer",
251		"records_written", w.recordCount,
252		"bytes_written", w.bytesWritten)
253
254	// Close the record channel to signal rotator goroutines to finish
255	close(w.recordChan)
256
257	// Wait for all rotator goroutines to finish
258	for _, doneChan := range w.doneChans {
259		<-doneChan
260	}
261
262	return nil
263}
264
265// GetStats returns writer statistics
266func (w *WARCWriter) GetStats() (int64, int64) {
267	return w.recordCount, w.bytesWritten
268}