main
1package main
2
3import (
4 "errors"
5 "fmt"
6 "io"
7 "log/slog"
8 "net"
9 "time"
10
11 "github.com/dustin/go-humanize"
12)
13
14func (c connection) Proxy() error {
15 c.proxyData(send)
16 c.proxyData(recv)
17 return c.errgroup.Wait()
18}
19
20func (c connection) proxyData(d direction) {
21
22 c.errgroup.Go(func() error {
23
24 // send/recv direction
25 id := c.Id()
26 name := d.String()
27 src, dst := c.sConn, c.dConn
28 if d == recv {
29 src, dst = c.dConn, c.sConn
30 }
31
32 start := time.Now()
33 lastByte := time.Now()
34 bytes := uint64(0)
35
36 defer func() {
37 c.cancel()
38 speed := uint64(0)
39 elapsed := time.Since(start).Round(time.Second)
40 if elapsed > 0 {
41 speed = bytes / uint64(elapsed.Seconds())
42 }
43 elapsedLB := time.Since(lastByte).Round(time.Second)
44 if elapsedLB > 0 {
45 speed = bytes / uint64(elapsedLB.Seconds())
46 }
47 human_speed := humanize.Bytes(speed)
48 slog.Info("complete",
49 slog.String("conn", d.String()),
50 slog.String("speed", human_speed+"/s"),
51 slog.Uint64("bytes", bytes),
52 slog.Duration("duration", elapsed),
53 slog.Duration("durationLB", elapsedLB))
54 }()
55
56 buf := make([]byte, bufSize)
57 for {
58 select {
59 case <-c.ctx.Done():
60 err := src.Close()
61 if err != nil {
62 return fmt.Errorf("proxy: %s failed to close id=%q: %w",
63 name, id, err)
64 }
65 return nil
66 default:
67 src.SetReadDeadline(time.Now().Add(timeout))
68 n, err := src.Read(buf)
69 if err != nil {
70 if errors.Is(err, io.EOF) {
71 return nil
72 }
73 opErr, ok := err.(*net.OpError)
74 if ok && opErr.Timeout() {
75 return fmt.Errorf("proxy: %s timeout id=%q: %w",
76 name, id, err)
77 }
78 return fmt.Errorf("proxy: %s failed to read id=%q: %w",
79 name, id, err)
80 }
81 lastByte = time.Now()
82 bytes += uint64(n)
83 dst.SetWriteDeadline(time.Now().Add(timeout))
84 _, err = dst.Write(buf[:n])
85 if err != nil {
86 return fmt.Errorf("proxy: %s failed to write id=%q: %w",
87 name, id, err)
88 }
89 }
90 }
91 })
92}