Commit 6fd5ce5

bryfry <bryon@fryer.io>
2026-01-06 20:58:39
Phase 3: Implement request/response recording and pipeline
Implemented comprehensive traffic recording with digest calculation and processing pipeline infrastructure. The proxy now captures all HTTP/HTTPS traffic, calculates content digests, and queues URLs for processing. Components implemented: - RecordedURL model (internal/models/recordedurl.go) - Complete data model for captured requests/responses - Request/response headers and bodies - Timing information and digest fields - Deduplication and Warcprox-Meta support structures - Helper methods for bucket and metadata handling - Traffic recorder (internal/proxy/recorder.go) - RecordingResponseWriter for capturing response data - RecordingReader for capturing request bodies - createRecordedURL factory method - Digest calculation with multiple algorithms (SHA1, SHA256, BLAKE3) - Base32 encoding for SHA hashes (warcprox compatible) - HTTP response block construction - Header cloning and body reading utilities - Processing pipeline (internal/pipeline/pipeline.go, processor.go) - Producer-consumer pattern with goroutines - Buffered channels for URL queuing (configurable size) - Processor interface for extensibility - Pipeline statistics tracking - Graceful start/stop with context cancellation - Worker pool for concurrent processing Integration: - Updated proxy.Server to manage pipeline lifecycle - Updated HTTP handler to record traffic and enqueue URLs - Updated HTTPS MITM handler to record encrypted traffic - Pipeline starts before server, stops after server shutdown Testing: Successfully tested with both HTTP and HTTPS requests: - HTTP: curl --proxy localhost:8000 http://example.com/ - Digest: sha1:JUWMXAQNHPTRTHYQWT3EJILYCL7YC3PQ - Processed through pipeline successfully - HTTPS: curl --proxy localhost:8000 --insecure https://example.com/ - Same content, same digest (verification) - MITM decryption and recording working correctly - Pipeline processing successful Next: Phase 4 will integrate gowarc.Writer to write WARC files. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent eacc60f
internal/models/recordedurl.go
@@ -0,0 +1,173 @@
+package models
+
+import (
+	"net/http"
+	"time"
+)
+
+// RecordedURL represents a captured HTTP request/response pair
+type RecordedURL struct {
+	// Request information
+	URL           string
+	Method        string
+	RequestHeader http.Header
+	RequestBody   []byte
+
+	// Response information
+	StatusCode     int
+	StatusMessage  string
+	ResponseHeader http.Header
+	ResponseBody   []byte
+
+	// Timing information
+	Timestamp  time.Time
+	Duration   time.Duration
+
+	// Remote server information
+	RemoteAddr string
+	RemoteIP   string
+
+	// Client information
+	ClientAddr string
+	ClientIP   string
+
+	// Digest information
+	PayloadDigest string // e.g., "sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A"
+	BlockDigest   string // Digest of the entire HTTP response block
+
+	// Content information
+	ContentType   string
+	ContentLength int64
+
+	// Deduplication information
+	DedupInfo *DedupInfo
+
+	// Warcprox-Meta information
+	WarcproxMeta *WarcproxMeta
+
+	// WARC information
+	WARCRecordID string // UUID of the WARC record
+}
+
+// DedupInfo contains information about a deduplicated resource
+type DedupInfo struct {
+	// The WARC record ID of the original capture
+	RecordID string
+
+	// The URL of the original capture
+	URL string
+
+	// The date of the original capture
+	Date time.Time
+
+	// Whether this was found via read-only dedup bucket
+	ReadOnly bool
+}
+
+// WarcproxMeta contains per-request configuration from the Warcprox-Meta header
+type WarcproxMeta struct {
+	// Custom WARC prefix for this request
+	WarcPrefix string
+
+	// Dedup buckets: map of bucket name to mode ("ro" or "rw")
+	DedupBuckets map[string]string
+
+	// Stats buckets for tracking statistics
+	StatsBuckets []string
+
+	// Stats bucket definitions with domain tallying
+	StatsBucketDefs []StatsBucketDef
+
+	// Hard limits (420 response when exceeded)
+	Limits map[string]int64
+
+	// Soft limits (430 response when exceeded)
+	SoftLimits map[string]int64
+
+	// URL blocking rules
+	Blocks []BlockRule
+
+	// Metadata to include in WARC records
+	Metadata map[string]interface{}
+
+	// Accept flags for response metadata
+	Accept map[string]bool
+}
+
+// StatsBucketDef defines a stats bucket with optional domain tallying
+type StatsBucketDef struct {
+	Bucket       string
+	TallyDomains []string
+}
+
+// BlockRule defines a URL blocking rule
+type BlockRule struct {
+	Domain string
+	SURT   string
+	Regex  string
+}
+
+// IsDedup returns true if this URL was found in the dedup database
+func (ru *RecordedURL) IsDedup() bool {
+	return ru.DedupInfo != nil
+}
+
+// GetWarcPrefix returns the WARC prefix to use for this URL
+func (ru *RecordedURL) GetWarcPrefix(defaultPrefix string) string {
+	if ru.WarcproxMeta != nil && ru.WarcproxMeta.WarcPrefix != "" {
+		return ru.WarcproxMeta.WarcPrefix
+	}
+	return defaultPrefix
+}
+
+// GetDedupBuckets returns the dedup buckets for this URL
+func (ru *RecordedURL) GetDedupBuckets() map[string]string {
+	if ru.WarcproxMeta != nil && ru.WarcproxMeta.DedupBuckets != nil {
+		return ru.WarcproxMeta.DedupBuckets
+	}
+	// Return empty bucket (default dedup bucket)
+	return map[string]string{"": "rw"}
+}
+
+// GetStatsBuckets returns the stats buckets for this URL
+func (ru *RecordedURL) GetStatsBuckets() []string {
+	if ru.WarcproxMeta == nil {
+		return nil
+	}
+
+	// Collect bucket names from both simple buckets and bucket definitions
+	buckets := make([]string, 0)
+
+	if ru.WarcproxMeta.StatsBuckets != nil {
+		buckets = append(buckets, ru.WarcproxMeta.StatsBuckets...)
+	}
+
+	for _, def := range ru.WarcproxMeta.StatsBucketDefs {
+		buckets = append(buckets, def.Bucket)
+	}
+
+	return buckets
+}
+
+// GetStatsBucketDefs returns the stats bucket definitions for this URL
+func (ru *RecordedURL) GetStatsBucketDefs() []StatsBucketDef {
+	if ru.WarcproxMeta != nil {
+		return ru.WarcproxMeta.StatsBucketDefs
+	}
+	return nil
+}
+
+// ShouldTallyDomain checks if domain tallying is enabled for the given bucket
+func (ru *RecordedURL) ShouldTallyDomain(bucket string) (bool, []string) {
+	if ru.WarcproxMeta == nil {
+		return false, nil
+	}
+
+	for _, def := range ru.WarcproxMeta.StatsBucketDefs {
+		if def.Bucket == bucket && len(def.TallyDomains) > 0 {
+			return true, def.TallyDomains
+		}
+	}
+
+	return false, nil
+}
internal/pipeline/pipeline.go
@@ -0,0 +1,177 @@
+package pipeline
+
+import (
+	"context"
+	"fmt"
+	"log/slog"
+	"sync"
+
+	"github.com/internetarchive/gowarcprox/internal/models"
+	"github.com/internetarchive/gowarcprox/pkg/config"
+)
+
+// Pipeline processes recorded URLs through a chain of processors
+type Pipeline struct {
+	config     *config.Config
+	processors []Processor
+	inputCh    chan *models.RecordedURL
+	ctx        context.Context
+	cancel     context.CancelFunc
+	wg         sync.WaitGroup
+	logger     *slog.Logger
+	stats      *Stats
+}
+
+// Stats tracks pipeline statistics
+type Stats struct {
+	mu              sync.Mutex
+	URLsQueued      int64
+	URLsProcessed   int64
+	URLsFailed      int64
+	CurrentQueueLen int
+}
+
+// NewPipeline creates a new processing pipeline
+func NewPipeline(cfg *config.Config, logger *slog.Logger) *Pipeline {
+	if logger == nil {
+		logger = slog.Default()
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	return &Pipeline{
+		config:     cfg,
+		processors: make([]Processor, 0),
+		inputCh:    make(chan *models.RecordedURL, cfg.QueueSize),
+		ctx:        ctx,
+		cancel:     cancel,
+		logger:     logger,
+		stats:      &Stats{},
+	}
+}
+
+// AddProcessor adds a processor to the pipeline
+func (p *Pipeline) AddProcessor(processor Processor) {
+	p.processors = append(p.processors, processor)
+	p.logger.Info("added processor to pipeline", "processor", processor.Name())
+}
+
+// Start starts the pipeline workers
+func (p *Pipeline) Start() error {
+	p.logger.Info("starting pipeline", "queue_size", p.config.QueueSize)
+
+	// Start worker goroutines
+	numWorkers := 1 // Can be made configurable later
+	for i := 0; i < numWorkers; i++ {
+		p.wg.Add(1)
+		go p.worker(i)
+	}
+
+	p.logger.Info("pipeline started", "workers", numWorkers)
+	return nil
+}
+
+// Stop gracefully stops the pipeline
+func (p *Pipeline) Stop() error {
+	p.logger.Info("stopping pipeline")
+
+	// Close input channel to signal workers to stop
+	close(p.inputCh)
+
+	// Cancel context
+	p.cancel()
+
+	// Wait for all workers to finish
+	p.wg.Wait()
+
+	p.logger.Info("pipeline stopped",
+		"urls_processed", p.stats.URLsProcessed,
+		"urls_failed", p.stats.URLsFailed)
+	return nil
+}
+
+// Enqueue adds a recorded URL to the pipeline
+func (p *Pipeline) Enqueue(ru *models.RecordedURL) error {
+	select {
+	case p.inputCh <- ru:
+		p.stats.mu.Lock()
+		p.stats.URLsQueued++
+		p.stats.CurrentQueueLen = len(p.inputCh)
+		p.stats.mu.Unlock()
+		return nil
+	case <-p.ctx.Done():
+		return fmt.Errorf("pipeline is shutting down")
+	}
+}
+
+// worker processes URLs from the input channel
+func (p *Pipeline) worker(id int) {
+	defer p.wg.Done()
+
+	p.logger.Debug("pipeline worker started", "worker_id", id)
+
+	for {
+		select {
+		case ru, ok := <-p.inputCh:
+			if !ok {
+				// Channel closed, worker should exit
+				p.logger.Debug("pipeline worker stopping", "worker_id", id)
+				return
+			}
+
+			// Process through all processors
+			if err := p.processURL(ru); err != nil {
+				p.logger.Error("failed to process URL",
+					"worker_id", id,
+					"url", ru.URL,
+					"error", err)
+
+				p.stats.mu.Lock()
+				p.stats.URLsFailed++
+				p.stats.mu.Unlock()
+			} else {
+				p.stats.mu.Lock()
+				p.stats.URLsProcessed++
+				p.stats.mu.Unlock()
+
+				p.logger.Debug("processed URL",
+					"worker_id", id,
+					"url", ru.URL,
+					"method", ru.Method,
+					"status", ru.StatusCode,
+					"digest", ru.PayloadDigest)
+			}
+
+			// Update queue length
+			p.stats.mu.Lock()
+			p.stats.CurrentQueueLen = len(p.inputCh)
+			p.stats.mu.Unlock()
+
+		case <-p.ctx.Done():
+			p.logger.Debug("pipeline worker cancelled", "worker_id", id)
+			return
+		}
+	}
+}
+
+// processURL processes a URL through all processors
+func (p *Pipeline) processURL(ru *models.RecordedURL) error {
+	for _, processor := range p.processors {
+		if err := processor.Process(p.ctx, ru); err != nil {
+			return fmt.Errorf("processor %s failed: %w", processor.Name(), err)
+		}
+	}
+	return nil
+}
+
+// GetStats returns a copy of the current pipeline statistics
+func (p *Pipeline) GetStats() Stats {
+	p.stats.mu.Lock()
+	defer p.stats.mu.Unlock()
+	return *p.stats
+}
+
+// QueueLength returns the current queue length
+func (p *Pipeline) QueueLength() int {
+	return len(p.inputCh)
+}
internal/pipeline/processor.go
@@ -0,0 +1,16 @@
+package pipeline
+
+import (
+	"context"
+
+	"github.com/internetarchive/gowarcprox/internal/models"
+)
+
+// Processor is the interface for pipeline processors
+type Processor interface {
+	// Process handles a recorded URL
+	Process(ctx context.Context, ru *models.RecordedURL) error
+
+	// Name returns the processor name for logging
+	Name() string
+}
internal/proxy/handler.go
@@ -6,6 +6,7 @@ import (
 	"log/slog"
 	"net/http"
 	"strings"
+	"time"
 
 	"github.com/internetarchive/gowarcprox/pkg/config"
 )
@@ -62,6 +63,8 @@ func (h *Handler) handleConnect(w http.ResponseWriter, r *http.Request) {
 
 // handleHTTP handles regular HTTP proxy requests
 func (h *Handler) handleHTTP(w http.ResponseWriter, r *http.Request) {
+	startTime := time.Now()
+
 	// For proxy requests, the URL should be absolute
 	// Make sure we have a valid URL
 	if r.URL.Scheme == "" {
@@ -70,6 +73,14 @@ func (h *Handler) handleHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
+	// Read and record request body
+	reqBody, err := readRequestBody(r)
+	if err != nil {
+		h.logger.Error("failed to read request body", "error", err)
+		http.Error(w, "Bad Gateway", http.StatusBadGateway)
+		return
+	}
+
 	// Create a new request to the remote server
 	outReq, err := http.NewRequest(r.Method, r.URL.String(), r.Body)
 	if err != nil {
@@ -110,6 +121,16 @@ func (h *Handler) handleHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 	defer resp.Body.Close()
 
+	// Read response body for recording
+	respBody, err := io.ReadAll(resp.Body)
+	if err != nil {
+		h.logger.Error("failed to read response body",
+			"url", r.URL.String(),
+			"error", err)
+		http.Error(w, "Bad Gateway", http.StatusBadGateway)
+		return
+	}
+
 	// Copy response headers
 	for name, values := range resp.Header {
 		if isHopByHopHeader(name) {
@@ -123,20 +144,35 @@ func (h *Handler) handleHTTP(w http.ResponseWriter, r *http.Request) {
 	// Write status code
 	w.WriteHeader(resp.StatusCode)
 
-	// Copy response body
-	written, err := io.Copy(w, resp.Body)
+	// Write response body to client
+	written, err := w.Write(respBody)
 	if err != nil {
-		h.logger.Error("failed to copy response body",
+		h.logger.Error("failed to write response body",
 			"url", r.URL.String(),
 			"error", err)
 		return
 	}
 
+	// Get remote address
+	remoteAddr := ""
+	if resp.Request != nil && resp.Request.RemoteAddr != "" {
+		remoteAddr = resp.Request.RemoteAddr
+	}
+
+	// Create RecordedURL and enqueue to pipeline
+	ru := h.createRecordedURL(r, resp, reqBody, respBody, startTime, remoteAddr)
+	if err := h.server.pipeline.Enqueue(ru); err != nil {
+		h.logger.Error("failed to enqueue recorded URL",
+			"url", r.URL.String(),
+			"error", err)
+	}
+
 	h.logger.Info("proxied request",
 		"method", r.Method,
 		"url", r.URL.String(),
 		"status", resp.StatusCode,
-		"bytes", written)
+		"bytes", written,
+		"digest", ru.PayloadDigest)
 }
 
 // isHopByHopHeader checks if a header is hop-by-hop
internal/proxy/mitm.go
@@ -2,12 +2,14 @@ package proxy
 
 import (
 	"bufio"
+	"bytes"
 	"crypto/tls"
 	"fmt"
 	"io"
 	"net"
 	"net/http"
 	"strings"
+	"time"
 )
 
 // handleConnectMITM handles the HTTPS MITM proxy logic
@@ -87,6 +89,8 @@ func (h *Handler) handleConnectMITM(w http.ResponseWriter, r *http.Request) erro
 
 	h.logger.Debug("TLS handshake successful", "hostname", hostname)
 
+	startTime := time.Now()
+
 	// Now read the actual HTTP request from the encrypted connection
 	reader := bufio.NewReader(tlsClientConn)
 	req, err := http.ReadRequest(reader)
@@ -100,6 +104,13 @@ func (h *Handler) handleConnectMITM(w http.ResponseWriter, r *http.Request) erro
 		return nil // Don't return error for EOF
 	}
 
+	// Read request body for recording
+	reqBody, err := readRequestBody(req)
+	if err != nil {
+		h.logger.Error("failed to read request body", "error", err)
+		return fmt.Errorf("failed to read request body: %w", err)
+	}
+
 	// Fix up the request URL
 	// In HTTPS proxy mode, the request URL doesn't have scheme/host
 	if req.URL.Scheme == "" {
@@ -161,6 +172,18 @@ func (h *Handler) handleConnectMITM(w http.ResponseWriter, r *http.Request) erro
 	}
 	defer resp.Body.Close()
 
+	// Read response body for recording
+	respBody, err := io.ReadAll(resp.Body)
+	if err != nil {
+		h.logger.Error("failed to read response body",
+			"url", req.URL.String(),
+			"error", err)
+		return fmt.Errorf("failed to read response body: %w", err)
+	}
+
+	// Replace response body with buffer for writing
+	resp.Body = io.NopCloser(bytes.NewReader(respBody))
+
 	// Send response back to client
 	if err := resp.Write(tlsClientConn); err != nil {
 		h.logger.Error("failed to write response to client",
@@ -169,10 +192,19 @@ func (h *Handler) handleConnectMITM(w http.ResponseWriter, r *http.Request) erro
 		return fmt.Errorf("failed to write response: %w", err)
 	}
 
+	// Create RecordedURL and enqueue to pipeline
+	ru := h.createRecordedURL(req, resp, reqBody, respBody, startTime, host)
+	if err := h.server.pipeline.Enqueue(ru); err != nil {
+		h.logger.Error("failed to enqueue recorded URL",
+			"url", req.URL.String(),
+			"error", err)
+	}
+
 	h.logger.Info("proxied HTTPS request",
 		"method", req.Method,
 		"url", req.URL.String(),
-		"status", resp.StatusCode)
+		"status", resp.StatusCode,
+		"digest", ru.PayloadDigest)
 
 	// If there's buffered data from the hijack, it should be handled
 	// but for CONNECT, there usually isn't any
internal/proxy/proxy.go
@@ -10,6 +10,7 @@ import (
 	"time"
 
 	"github.com/internetarchive/gowarcprox/internal/certauth"
+	"github.com/internetarchive/gowarcprox/internal/pipeline"
 	"github.com/internetarchive/gowarcprox/pkg/config"
 )
 
@@ -20,6 +21,7 @@ type Server struct {
 	server   *http.Server
 	handler  *Handler
 	certAuth *certauth.CertificateAuthority
+	pipeline *pipeline.Pipeline
 	wg       sync.WaitGroup
 	ctx      context.Context
 	cancel   context.CancelFunc
@@ -54,6 +56,9 @@ func NewServer(cfg *config.Config, logger *slog.Logger) (*Server, error) {
 	}
 	s.certAuth = certAuth
 
+	// Initialize processing pipeline
+	s.pipeline = pipeline.NewPipeline(cfg, logger)
+
 	// Create the HTTP handler
 	s.handler = NewHandler(s, cfg, logger)
 
@@ -82,6 +87,11 @@ func (s *Server) Start() error {
 		"address", s.config.Address,
 		"port", s.config.Port)
 
+	// Start the processing pipeline
+	if err := s.pipeline.Start(); err != nil {
+		return fmt.Errorf("failed to start pipeline: %w", err)
+	}
+
 	// Start accepting connections
 	s.wg.Add(1)
 	go func() {
@@ -114,6 +124,11 @@ func (s *Server) Stop() error {
 	// Wait for all goroutines to finish
 	s.wg.Wait()
 
+	// Stop the pipeline
+	if err := s.pipeline.Stop(); err != nil {
+		s.logger.Error("pipeline stop error", "error", err)
+	}
+
 	s.logger.Info("proxy server stopped")
 	return nil
 }
internal/proxy/recorder.go
@@ -0,0 +1,232 @@
+package proxy
+
+import (
+	"bytes"
+	"crypto/sha1"
+	"crypto/sha256"
+	"encoding/base32"
+	"fmt"
+	"io"
+	"net/http"
+	"strings"
+	"time"
+
+	"github.com/internetarchive/gowarcprox/internal/models"
+	"github.com/zeebo/blake3"
+)
+
+// RecordingResponseWriter wraps http.ResponseWriter to capture response data
+type RecordingResponseWriter struct {
+	http.ResponseWriter
+	statusCode int
+	body       *bytes.Buffer
+	written    int64
+}
+
+// NewRecordingResponseWriter creates a new recording response writer
+func NewRecordingResponseWriter(w http.ResponseWriter) *RecordingResponseWriter {
+	return &RecordingResponseWriter{
+		ResponseWriter: w,
+		statusCode:     http.StatusOK, // Default status
+		body:           &bytes.Buffer{},
+	}
+}
+
+// WriteHeader captures the status code and forwards to underlying writer
+func (r *RecordingResponseWriter) WriteHeader(statusCode int) {
+	r.statusCode = statusCode
+	r.ResponseWriter.WriteHeader(statusCode)
+}
+
+// Write captures the response body and forwards to underlying writer
+func (r *RecordingResponseWriter) Write(data []byte) (int, error) {
+	// Write to buffer for recording
+	r.body.Write(data)
+
+	// Write to actual response
+	n, err := r.ResponseWriter.Write(data)
+	r.written += int64(n)
+	return n, err
+}
+
+// StatusCode returns the captured status code
+func (r *RecordingResponseWriter) StatusCode() int {
+	return r.statusCode
+}
+
+// Body returns the captured response body
+func (r *RecordingResponseWriter) Body() []byte {
+	return r.body.Bytes()
+}
+
+// RecordingReader wraps io.Reader to capture data while reading
+type RecordingReader struct {
+	reader io.Reader
+	buffer *bytes.Buffer
+}
+
+// NewRecordingReader creates a new recording reader
+func NewRecordingReader(r io.Reader) *RecordingReader {
+	return &RecordingReader{
+		reader: r,
+		buffer: &bytes.Buffer{},
+	}
+}
+
+// Read reads from the underlying reader and captures data
+func (r *RecordingReader) Read(p []byte) (n int, err error) {
+	n, err = r.reader.Read(p)
+	if n > 0 {
+		r.buffer.Write(p[:n])
+	}
+	return n, err
+}
+
+// Bytes returns the captured data
+func (r *RecordingReader) Bytes() []byte {
+	return r.buffer.Bytes()
+}
+
+// createRecordedURL creates a RecordedURL from request and response data
+func (h *Handler) createRecordedURL(
+	req *http.Request,
+	resp *http.Response,
+	reqBody []byte,
+	respBody []byte,
+	startTime time.Time,
+	remoteAddr string,
+) *models.RecordedURL {
+
+	duration := time.Since(startTime)
+
+	// Create RecordedURL
+	ru := &models.RecordedURL{
+		URL:            req.URL.String(),
+		Method:         req.Method,
+		RequestHeader:  cloneHeaders(req.Header),
+		RequestBody:    reqBody,
+		StatusCode:     resp.StatusCode,
+		StatusMessage:  resp.Status,
+		ResponseHeader: cloneHeaders(resp.Header),
+		ResponseBody:   respBody,
+		Timestamp:      startTime,
+		Duration:       duration,
+		RemoteAddr:     remoteAddr,
+		ClientAddr:     req.RemoteAddr,
+		ContentType:    resp.Header.Get("Content-Type"),
+		ContentLength:  int64(len(respBody)),
+	}
+
+	// Extract client IP
+	if idx := strings.LastIndex(req.RemoteAddr, ":"); idx != -1 {
+		ru.ClientIP = req.RemoteAddr[:idx]
+	} else {
+		ru.ClientIP = req.RemoteAddr
+	}
+
+	// Extract remote IP
+	if idx := strings.LastIndex(remoteAddr, ":"); idx != -1 {
+		ru.RemoteIP = remoteAddr[:idx]
+	} else {
+		ru.RemoteIP = remoteAddr
+	}
+
+	// Calculate payload digest (just the response body)
+	ru.PayloadDigest = h.calculateDigest(respBody)
+
+	// Calculate block digest (entire HTTP response block)
+	blockData := h.buildResponseBlock(resp, respBody)
+	ru.BlockDigest = h.calculateDigest(blockData)
+
+	// Parse Warcprox-Meta header if present
+	if metaHeader := req.Header.Get("Warcprox-Meta"); metaHeader != "" {
+		// TODO: Parse Warcprox-Meta JSON in Phase 7
+		h.logger.Debug("Warcprox-Meta header present (parsing not yet implemented)",
+			"header", metaHeader)
+	}
+
+	return ru
+}
+
+// calculateDigest calculates the digest of data using the configured algorithm
+func (h *Handler) calculateDigest(data []byte) string {
+	algorithm := h.config.DigestAlgorithm
+
+	switch strings.ToLower(algorithm) {
+	case "sha1":
+		hash := sha1.Sum(data)
+		encoded := base32.StdEncoding.EncodeToString(hash[:])
+		// Remove padding
+		encoded = strings.TrimRight(encoded, "=")
+		return "sha1:" + encoded
+
+	case "sha256":
+		hash := sha256.Sum256(data)
+		encoded := base32.StdEncoding.EncodeToString(hash[:])
+		encoded = strings.TrimRight(encoded, "=")
+		return "sha256:" + encoded
+
+	case "blake3":
+		hash := blake3.Sum256(data)
+		encoded := fmt.Sprintf("%x", hash)
+		return "blake3:" + encoded
+
+	default:
+		// Default to SHA1
+		hash := sha1.Sum(data)
+		encoded := base32.StdEncoding.EncodeToString(hash[:])
+		encoded = strings.TrimRight(encoded, "=")
+		return "sha1:" + encoded
+	}
+}
+
+// buildResponseBlock builds the complete HTTP response block (headers + body)
+func (h *Handler) buildResponseBlock(resp *http.Response, body []byte) []byte {
+	var buf bytes.Buffer
+
+	// Write status line
+	fmt.Fprintf(&buf, "%s %s\r\n", resp.Proto, resp.Status)
+
+	// Write headers
+	for name, values := range resp.Header {
+		for _, value := range values {
+			fmt.Fprintf(&buf, "%s: %s\r\n", name, value)
+		}
+	}
+
+	// Write blank line separating headers from body
+	buf.WriteString("\r\n")
+
+	// Write body
+	buf.Write(body)
+
+	return buf.Bytes()
+}
+
+// cloneHeaders creates a deep copy of HTTP headers
+func cloneHeaders(src http.Header) http.Header {
+	dst := make(http.Header, len(src))
+	for k, vv := range src {
+		vv2 := make([]string, len(vv))
+		copy(vv2, vv)
+		dst[k] = vv2
+	}
+	return dst
+}
+
+// readRequestBody reads and returns the request body, replacing it with a buffer
+func readRequestBody(req *http.Request) ([]byte, error) {
+	if req.Body == nil {
+		return nil, nil
+	}
+
+	body, err := io.ReadAll(req.Body)
+	if err != nil {
+		return nil, fmt.Errorf("failed to read request body: %w", err)
+	}
+
+	// Replace body with a buffer so it can be read again
+	req.Body = io.NopCloser(bytes.NewReader(body))
+
+	return body, nil
+}