Commit 76ef639
Changed files (3)
internal/proxy/proxy.go
@@ -11,21 +11,23 @@ import (
"github.com/internetarchive/gowarcprox/internal/certauth"
"github.com/internetarchive/gowarcprox/internal/pipeline"
+ "github.com/internetarchive/gowarcprox/internal/writer"
"github.com/internetarchive/gowarcprox/pkg/config"
)
// Server represents the proxy server
type Server struct {
- config *config.Config
- listener net.Listener
- server *http.Server
- handler *Handler
- certAuth *certauth.CertificateAuthority
- pipeline *pipeline.Pipeline
- wg sync.WaitGroup
- ctx context.Context
- cancel context.CancelFunc
- logger *slog.Logger
+ config *config.Config
+ listener net.Listener
+ server *http.Server
+ handler *Handler
+ certAuth *certauth.CertificateAuthority
+ pipeline *pipeline.Pipeline
+ warcWriter *writer.WARCWriter
+ wg sync.WaitGroup
+ ctx context.Context
+ cancel context.CancelFunc
+ logger *slog.Logger
}
// NewServer creates a new proxy server
@@ -59,6 +61,17 @@ func NewServer(cfg *config.Config, logger *slog.Logger) (*Server, error) {
// Initialize processing pipeline
s.pipeline = pipeline.NewPipeline(cfg, logger)
+ // Initialize WARC writer
+ warcWriter, err := writer.NewWARCWriter(cfg, logger)
+ if err != nil {
+ cancel()
+ return nil, fmt.Errorf("failed to initialize WARC writer: %w", err)
+ }
+ s.warcWriter = warcWriter
+
+ // Add WARC writer to pipeline
+ s.pipeline.AddProcessor(warcWriter)
+
// Create the HTTP handler
s.handler = NewHandler(s, cfg, logger)
@@ -129,6 +142,11 @@ func (s *Server) Stop() error {
s.logger.Error("pipeline stop error", "error", err)
}
+ // Close the WARC writer
+ if err := s.warcWriter.Close(); err != nil {
+ s.logger.Error("WARC writer close error", "error", err)
+ }
+
s.logger.Info("proxy server stopped")
return nil
}
internal/writer/writer.go
@@ -0,0 +1,249 @@
+package writer
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "log/slog"
+ "net/http"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/internetarchive/gowarc"
+ "github.com/internetarchive/gowarc/pkg/spooledtempfile"
+ "github.com/internetarchive/gowarcprox/internal/models"
+ "github.com/internetarchive/gowarcprox/pkg/config"
+)
+
+// WARCWriter is a pipeline processor that writes WARC records
+type WARCWriter struct {
+ config *config.Config
+ recordChan chan *warc.RecordBatch
+ doneChans []chan bool
+ rotatorSettings *warc.RotatorSettings
+ logger *slog.Logger
+ recordCount int64
+ bytesWritten int64
+}
+
+// NewWARCWriter creates a new WARC writer processor
+func NewWARCWriter(cfg *config.Config, logger *slog.Logger) (*WARCWriter, error) {
+ if logger == nil {
+ logger = slog.Default()
+ }
+
+ // Determine compression type
+ compression := ""
+ if cfg.WARCCompression == "gzip" {
+ compression = "gzip"
+ } else if cfg.WARCCompression == "zstd" {
+ compression = "zstd"
+ }
+
+ // Create rotator settings
+ rotatorSettings := &warc.RotatorSettings{
+ WarcinfoContent: warc.Header{
+ "software": "gowarcprox/0.1.0",
+ "format": "WARC File Format 1.1",
+ "conformsTo": "http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1-1_latestdraft.pdf",
+ "robots": "classic",
+ "description": "WARC created by gowarcprox",
+ },
+ Prefix: cfg.WARCPrefix,
+ Compression: compression,
+ WARCSize: float64(cfg.WARCSize) / 1000000, // Convert bytes to MB
+ WARCWriterPoolSize: cfg.WARCWriterThreads,
+ OutputDirectory: cfg.WARCDirectory + "/",
+ }
+
+ // Create WARC rotator (this starts goroutines)
+ recordChan, doneChans, err := rotatorSettings.NewWARCRotator()
+ if err != nil {
+ return nil, fmt.Errorf("failed to create WARC rotator: %w", err)
+ }
+
+ w := &WARCWriter{
+ config: cfg,
+ recordChan: recordChan,
+ doneChans: doneChans,
+ rotatorSettings: rotatorSettings,
+ logger: logger,
+ }
+
+ logger.Info("WARC writer initialized",
+ "prefix", cfg.WARCPrefix,
+ "directory", cfg.WARCDirectory,
+ "compression", compression,
+ "writers", cfg.WARCWriterThreads)
+
+ return w, nil
+}
+
+// Name returns the processor name
+func (w *WARCWriter) Name() string {
+ return "WARCWriter"
+}
+
+// Process writes a recorded URL to WARC format
+func (w *WARCWriter) Process(ctx context.Context, ru *models.RecordedURL) error {
+ // Create WARC records
+ requestRecord, err := w.createRequestRecord(ru)
+ if err != nil {
+ return fmt.Errorf("failed to create request record: %w", err)
+ }
+
+ responseRecord, err := w.createResponseRecord(ru)
+ if err != nil {
+ return fmt.Errorf("failed to create response record: %w", err)
+ }
+
+ // Create record batch
+ batch := &warc.RecordBatch{
+ CaptureTime: ru.Timestamp.Format(time.RFC3339),
+ Records: []*warc.Record{requestRecord, responseRecord},
+ }
+
+ // Send to rotator channel
+ select {
+ case w.recordChan <- batch:
+ w.recordCount += 2
+ w.bytesWritten += int64(len(ru.RequestBody)) + int64(len(ru.ResponseBody))
+
+ w.logger.Debug("wrote WARC records",
+ "url", ru.URL,
+ "response_id", ru.WARCRecordID)
+ case <-ctx.Done():
+ return fmt.Errorf("context cancelled")
+ }
+
+ return nil
+}
+
+// createRequestRecord creates a WARC request record
+func (w *WARCWriter) createRequestRecord(ru *models.RecordedURL) (*warc.Record, error) {
+ // Build HTTP request block
+ var buf bytes.Buffer
+ fmt.Fprintf(&buf, "%s %s HTTP/1.1\r\n", ru.Method, ru.URL)
+
+ // Write headers
+ for name, values := range ru.RequestHeader {
+ for _, value := range values {
+ fmt.Fprintf(&buf, "%s: %s\r\n", name, value)
+ }
+ }
+ buf.WriteString("\r\n")
+
+ // Write body
+ buf.Write(ru.RequestBody)
+
+ // Create spooled temp file for content
+ content := spooledtempfile.NewSpooledTempFile("gowarcprox-req-", "", w.config.TmpFileMaxMemory, false, 0.5)
+ if _, err := content.Write(buf.Bytes()); err != nil {
+ return nil, fmt.Errorf("failed to write request content: %w", err)
+ }
+ if _, err := content.Seek(0, 0); err != nil {
+ return nil, fmt.Errorf("failed to seek request content: %w", err)
+ }
+
+ // Create record
+ record := &warc.Record{
+ Header: warc.Header{},
+ Content: content,
+ Version: "WARC/1.1",
+ }
+
+ // Set WARC headers
+ record.Header.Set("WARC-Type", "request")
+ record.Header.Set("WARC-Record-ID", "<urn:uuid:"+uuid.New().String()+">")
+ record.Header.Set("WARC-Date", ru.Timestamp.Format(time.RFC3339))
+ record.Header.Set("Content-Type", "application/http; msgtype=request")
+ record.Header.Set("WARC-Target-URI", ru.URL)
+ if ru.RemoteIP != "" {
+ record.Header.Set("WARC-IP-Address", ru.RemoteIP)
+ }
+
+ return record, nil
+}
+
+// createResponseRecord creates a WARC response record
+func (w *WARCWriter) createResponseRecord(ru *models.RecordedURL) (*warc.Record, error) {
+ // Build HTTP response block
+ var buf bytes.Buffer
+
+ // Status line
+ statusText := http.StatusText(ru.StatusCode)
+ if statusText == "" {
+ statusText = ru.StatusMessage
+ }
+ fmt.Fprintf(&buf, "HTTP/1.1 %d %s\r\n", ru.StatusCode, statusText)
+
+ // Write headers
+ for name, values := range ru.ResponseHeader {
+ for _, value := range values {
+ fmt.Fprintf(&buf, "%s: %s\r\n", name, value)
+ }
+ }
+ buf.WriteString("\r\n")
+
+ // Write body
+ buf.Write(ru.ResponseBody)
+
+ // Create spooled temp file for content
+ content := spooledtempfile.NewSpooledTempFile("gowarcprox-resp-", "", w.config.TmpFileMaxMemory, false, 0.5)
+ if _, err := content.Write(buf.Bytes()); err != nil {
+ return nil, fmt.Errorf("failed to write response content: %w", err)
+ }
+ if _, err := content.Seek(0, 0); err != nil {
+ return nil, fmt.Errorf("failed to seek response content: %w", err)
+ }
+
+ // Generate UUID for this record
+ recordID := uuid.New().String()
+ ru.WARCRecordID = recordID // Store for later use (e.g., dedup)
+
+ // Create record
+ record := &warc.Record{
+ Header: warc.Header{},
+ Content: content,
+ Version: "WARC/1.1",
+ }
+
+ // Set WARC headers
+ record.Header.Set("WARC-Type", "response")
+ record.Header.Set("WARC-Record-ID", "<urn:uuid:"+recordID+">")
+ record.Header.Set("WARC-Date", ru.Timestamp.Format(time.RFC3339))
+ record.Header.Set("Content-Type", "application/http; msgtype=response")
+ record.Header.Set("WARC-Target-URI", ru.URL)
+ if ru.RemoteIP != "" {
+ record.Header.Set("WARC-IP-Address", ru.RemoteIP)
+ }
+
+ // Set payload digest
+ if ru.PayloadDigest != "" {
+ record.Header.Set("WARC-Payload-Digest", ru.PayloadDigest)
+ }
+
+ return record, nil
+}
+
+// Close closes the WARC writer
+func (w *WARCWriter) Close() error {
+ w.logger.Info("closing WARC writer",
+ "records_written", w.recordCount,
+ "bytes_written", w.bytesWritten)
+
+ // Close the record channel to signal rotator goroutines to finish
+ close(w.recordChan)
+
+ // Wait for all rotator goroutines to finish
+ for _, doneChan := range w.doneChans {
+ <-doneChan
+ }
+
+ return nil
+}
+
+// GetStats returns writer statistics
+func (w *WARCWriter) GetStats() (int64, int64) {
+ return w.recordCount, w.bytesWritten
+}
.gitignore
@@ -5,3 +5,4 @@
warcprox-ca/
warcprox-ca.pem
*.sqlite
+warcs/