From bab94ca4e775acc4ffc61e4cfbf094e05254e328 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Wed, 16 Apr 2025 21:56:53 -0500 Subject: [PATCH] Cleanup and note more work --- interface.go | 20 +++++++++++++------- udp/conn.go | 2 +- udp/udp_generic.go | 6 +++--- udp/udp_linux.go | 5 ++--- udp/udp_rio_windows.go | 2 +- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/interface.go b/interface.go index 545dd395..4dde159f 100644 --- a/interface.go +++ b/interface.go @@ -256,14 +256,14 @@ func (f *Interface) activate() error { func (f *Interface) run() (func(), error) { // Launch n queues to read packets from udp for i := 0; i < f.routines; i++ { - go f.listenOut(i) f.wg.Add(1) + go f.listenOut(i) } // Launch n queues to read packets from tun dev for i := 0; i < f.routines; i++ { - go f.listenIn(f.readers[i], i) f.wg.Add(1) + go f.listenIn(f.readers[i], i) } return f.wg.Wait, nil @@ -285,15 +285,21 @@ func (f *Interface) listenOut(i int) { fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) - li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) { + err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) { f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l)) }) - f.l.Errorf("udp reader %v is done", i) + if err != nil && !f.closed.Load() { + f.l.WithError(err).Error("Error while reading packet inbound packet, closing") + //TODO: Trigger Control to close + } + + f.l.Debugf("underlay reader %v is done", i) f.wg.Done() } func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { + runtime.LockOSThread() packet := make([]byte, mtu) out := make([]byte, mtu) fwPacket := &firewall.Packet{} @@ -305,8 +311,8 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { n, err := reader.Read(packet) if err != nil { if !f.closed.Load() { - //TODO: should we close? yes - f.l.WithError(err).Error("Error while reading outbound packet") + f.l.WithError(err).Error("Error while reading outbound packet, closing") + //TODO: Trigger Control to close } break } @@ -314,7 +320,7 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l)) } - f.l.Errorf("tun reader %v is done", i) + f.l.Debugf("overlay reader %v is done", i) f.wg.Done() } diff --git a/udp/conn.go b/udp/conn.go index 1ae585c2..1a275d19 100644 --- a/udp/conn.go +++ b/udp/conn.go @@ -16,7 +16,7 @@ type EncReader func( type Conn interface { Rebind() error LocalAddr() (netip.AddrPort, error) - ListenOut(r EncReader) + ListenOut(r EncReader) error WriteTo(b []byte, addr netip.AddrPort) error ReloadConfig(c *config.C) SupportsMultipleReaders() bool diff --git a/udp/udp_generic.go b/udp/udp_generic.go index e9dad6c5..398e7643 100644 --- a/udp/udp_generic.go +++ b/udp/udp_generic.go @@ -73,7 +73,7 @@ type rawMessage struct { Len uint32 } -func (u *GenericConn) ListenOut(r EncReader) { +func (u *GenericConn) ListenOut(r EncReader) error { buffer := make([]byte, MTU) var lastRecvErr time.Time @@ -82,9 +82,9 @@ func (u *GenericConn) ListenOut(r EncReader) { // Just read one packet at a time n, rua, err := u.ReadFromUDPAddrPort(buffer) if err != nil { + return err if errors.Is(err, net.ErrClosed) { - u.l.WithError(err).Debug("udp socket is closed, exiting read loop") - return + return err } // Dampen unexpected message warns to once per minute if lastRecvErr.IsZero() || time.Since(lastRecvErr) > time.Minute { diff --git a/udp/udp_linux.go b/udp/udp_linux.go index e0d7dc6e..888d46af 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -122,7 +122,7 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) { } } -func (u *StdConn) ListenOut(r EncReader) { +func (u *StdConn) ListenOut(r EncReader) error { var ip netip.Addr msgs, buffers, names := u.PrepareRawMessages(u.batch) @@ -134,8 +134,7 @@ func (u *StdConn) ListenOut(r EncReader) { for { n, err := read(msgs) if err != nil { - u.l.WithError(err).Debug("udp socket is closed, exiting read loop") - return + return err } for i := 0; i < n; i++ { diff --git a/udp/udp_rio_windows.go b/udp/udp_rio_windows.go index 3d60f34c..524c8a0b 100644 --- a/udp/udp_rio_windows.go +++ b/udp/udp_rio_windows.go @@ -140,7 +140,7 @@ func (u *RIOConn) bind(l *logrus.Logger, sa windows.Sockaddr) error { return nil } -func (u *RIOConn) ListenOut(r EncReader) { +func (u *RIOConn) ListenOut(r EncReader) error { buffer := make([]byte, MTU) var lastRecvErr time.Time