diff --git a/udp/udp_linux.go b/udp/udp_linux.go index ab52ec4a..dc39cc65 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -23,9 +23,10 @@ import ( var readTimeout = unix.NsecToTimeval(int64(time.Millisecond * 500)) const ( - defaultGSOMaxSegments = 8 - defaultGSOFlushTimeout = 150 * time.Microsecond - maxGSOBatchBytes = 0xFFFF + defaultGSOMaxSegments = 8 + defaultGSOFlushTimeout = 150 * time.Microsecond + defaultGROReadBufferSize = MTU * defaultGSOMaxSegments + maxGSOBatchBytes = 0xFFFF ) var ( @@ -51,6 +52,8 @@ type StdConn struct { gsoMaxBytes int gsoFlushTimeout time.Duration gsoTimer *time.Timer + + groBufSize int } func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) { @@ -103,6 +106,7 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in gsoMaxSegments: defaultGSOMaxSegments, gsoMaxBytes: MTU * defaultGSOMaxSegments, gsoFlushTimeout: defaultGSOFlushTimeout, + groBufSize: MTU, }, err } @@ -158,13 +162,20 @@ func (u *StdConn) ListenOut(r EncReader) error { controls [][]byte ) - msgs, buffers, names := u.PrepareRawMessages(u.batch) + bufSize := u.readBufferSize() + msgs, buffers, names := u.PrepareRawMessages(u.batch, bufSize) read := u.ReadMulti if u.batch == 1 { read = u.ReadSingle } for { + desired := u.readBufferSize() + if len(buffers) == 0 || cap(buffers[0]) < desired { + msgs, buffers, names = u.PrepareRawMessages(u.batch, desired) + controls = nil + } + if u.enableGRO { if controls == nil { controls = make([][]byte, len(msgs)) @@ -197,9 +208,44 @@ func (u *StdConn) ListenOut(r EncReader) error { addr := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])) payload := buffers[i][:msgs[i].Len] + if u.enableGRO && u.l.IsLevelEnabled(logrus.DebugLevel) { + ctrlLen := getRawMessageControlLen(&msgs[i]) + msgFlags := getRawMessageFlags(&msgs[i]) + u.l.WithFields(logrus.Fields{ + "tag": "gro-debug", + "stage": "recv", + "payload_len": len(payload), + "ctrl_len": ctrlLen, + "msg_flags": msgFlags, + }).Debug("gro batch data") + if controls != nil && ctrlLen > 0 { + maxDump := ctrlLen + if maxDump > 16 { + maxDump = 16 + } + u.l.WithFields(logrus.Fields{ + "tag": "gro-debug", + "stage": "control-bytes", + "control_hex": fmt.Sprintf("%x", controls[i][:maxDump]), + "datalen": ctrlLen, + }).Debug("gro control dump") + } + } + + sawControl := false if controls != nil { if ctrlLen := getRawMessageControlLen(&msgs[i]); ctrlLen > 0 { - if segSize, segCount := parseGROControl(controls[i][:ctrlLen]); segCount > 1 && segSize > 0 { + if segSize, segCount := parseGROControl(controls[i][:ctrlLen]); segSize > 0 { + sawControl = true + if u.l.IsLevelEnabled(logrus.DebugLevel) { + u.l.WithFields(logrus.Fields{ + "tag": "gro-debug", + "stage": "control", + "seg_size": segSize, + "seg_count": segCount, + "payloadLen": len(payload), + }).Debug("gro control parsed") + } segSize = normalizeGROSegSize(segSize, segCount, len(payload)) if segSize > 0 && segSize < len(payload) { if u.emitGROSegments(r, addr, payload, segSize) { @@ -210,11 +256,31 @@ func (u *StdConn) ListenOut(r EncReader) error { } } + if u.enableGRO && len(payload) > MTU { + if !sawControl && u.l.IsLevelEnabled(logrus.DebugLevel) { + u.l.WithFields(logrus.Fields{ + "tag": "gro-debug", + "stage": "fallback", + "payload_len": len(payload), + }).Debug("gro control missing; splitting payload by MTU") + } + if u.emitGROSegments(r, addr, payload, MTU) { + continue + } + } + r(addr, payload) } } } +func (u *StdConn) readBufferSize() int { + if u.enableGRO && u.groBufSize > MTU { + return u.groBufSize + } + return MTU +} + func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) { for { n, _, err := unix.Syscall6( @@ -378,12 +444,22 @@ func (u *StdConn) ReloadConfig(c *config.C) { } } - u.configureGRO(c.GetBool("listen.enable_gro", false)) + u.configureGRO(c) u.configureGSO(c) } -func (u *StdConn) configureGRO(enable bool) { +func (u *StdConn) configureGRO(c *config.C) { + if c == nil { + return + } + + enable := c.GetBool("listen.enable_gro", false) if enable == u.enableGRO { + if enable { + if size := c.GetInt("listen.gro_read_buffer", 0); size > 0 { + u.setGROBufferSize(size) + } + } return } @@ -393,7 +469,8 @@ func (u *StdConn) configureGRO(enable bool) { return } u.enableGRO = true - u.l.Info("UDP GRO enabled") + u.setGROBufferSize(c.GetInt("listen.gro_read_buffer", defaultGROReadBufferSize)) + u.l.WithField("buffer_size", u.groBufSize).Info("UDP GRO enabled") return } @@ -401,6 +478,7 @@ func (u *StdConn) configureGRO(enable bool) { u.l.WithError(err).Warn("Failed to disable UDP GRO") } u.enableGRO = false + u.groBufSize = MTU } func (u *StdConn) configureGSO(c *config.C) { @@ -434,6 +512,16 @@ func (u *StdConn) configureGSO(c *config.C) { u.gsoFlushTimeout = timeout } +func (u *StdConn) setGROBufferSize(size int) { + if size < MTU { + size = defaultGROReadBufferSize + } + if size > maxGSOBatchBytes { + size = maxGSOBatchBytes + } + u.groBufSize = size +} + func (u *StdConn) disableGSO() { u.gsoMu.Lock() defer u.gsoMu.Unlock() @@ -547,7 +635,7 @@ func (u *StdConn) sendSegmented(payload []byte, addr netip.AddrPort, segSize int hdr.Level = unix.SOL_UDP hdr.Type = unix.UDP_SEGMENT setCmsgLen(hdr, unix.CmsgLen(2)) - binary.LittleEndian.PutUint16(control[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(segSize)) + binary.NativeEndian.PutUint16(control[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(segSize)) var sa unix.Sockaddr if addr.Addr().Is4() { @@ -627,10 +715,10 @@ func parseGROControl(control []byte) (int, int) { for _, c := range cmsgs { if c.Header.Level == unix.SOL_UDP && c.Header.Type == unix.UDP_GRO && len(c.Data) >= 2 { - segSize := int(binary.LittleEndian.Uint16(c.Data[:2])) + segSize := int(binary.NativeEndian.Uint16(c.Data[:2])) segCount := 0 if len(c.Data) >= 4 { - segCount = int(binary.LittleEndian.Uint16(c.Data[2:4])) + segCount = int(binary.NativeEndian.Uint16(c.Data[2:4])) } return segSize, segCount } @@ -640,7 +728,7 @@ func parseGROControl(control []byte) (int, int) { } func (u *StdConn) emitGROSegments(r EncReader, addr netip.AddrPort, payload []byte, segSize int) bool { - if segSize <= 0 || segSize >= len(payload) { + if segSize <= 0 { return false } @@ -649,27 +737,39 @@ func (u *StdConn) emitGROSegments(r EncReader, addr netip.AddrPort, payload []by if end > len(payload) { end = len(payload) } - r(addr, payload[offset:end]) + segment := make([]byte, end-offset) + copy(segment, payload[offset:end]) + r(addr, segment) } return true } func normalizeGROSegSize(segSize, segCount, total int) int { - if segCount > 1 && total > 0 { - avg := total / segCount - if avg > 0 { - if segSize > avg { - if segSize-8 == avg { - segSize = avg - } else if segSize > total { - segSize = avg - } - } + if segSize <= 0 || total <= 0 { + return segSize + } + + if segSize > total && segCount > 0 { + segSize = total / segCount + if segSize == 0 { + segSize = total } } - if segSize > total { - segSize = total + + if segCount <= 1 && segSize > 0 && total > segSize { + calculated := total / segSize + if calculated <= 1 { + calculated = (total + segSize - 1) / segSize + } + if calculated > 1 { + segCount = calculated + } } + + if segSize > MTU { + return MTU + } + return segSize } diff --git a/udp/udp_linux_32.go b/udp/udp_linux_32.go index 3204776b..89e80024 100644 --- a/udp/udp_linux_32.go +++ b/udp/udp_linux_32.go @@ -30,13 +30,16 @@ type rawMessage struct { Len uint32 } -func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { +func (u *StdConn) PrepareRawMessages(n int, bufSize int) ([]rawMessage, [][]byte, [][]byte) { + if bufSize <= 0 { + bufSize = MTU + } msgs := make([]rawMessage, n) buffers := make([][]byte, n) names := make([][]byte, n) for i := range msgs { - buffers[i] = make([]byte, MTU) + buffers[i] = make([]byte, bufSize) names[i] = make([]byte, unix.SizeofSockaddrInet6) vs := []iovec{ @@ -67,6 +70,10 @@ func getRawMessageControlLen(msg *rawMessage) int { return int(msg.Hdr.Controllen) } +func getRawMessageFlags(msg *rawMessage) int { + return int(msg.Hdr.Flags) +} + func setCmsgLen(h *unix.Cmsghdr, l int) { h.Len = uint32(l) } diff --git a/udp/udp_linux_64.go b/udp/udp_linux_64.go index a09173d7..83e1b349 100644 --- a/udp/udp_linux_64.go +++ b/udp/udp_linux_64.go @@ -33,13 +33,16 @@ type rawMessage struct { Pad0 [4]byte } -func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { +func (u *StdConn) PrepareRawMessages(n int, bufSize int) ([]rawMessage, [][]byte, [][]byte) { + if bufSize <= 0 { + bufSize = MTU + } msgs := make([]rawMessage, n) buffers := make([][]byte, n) names := make([][]byte, n) for i := range msgs { - buffers[i] = make([]byte, MTU) + buffers[i] = make([]byte, bufSize) names[i] = make([]byte, unix.SizeofSockaddrInet6) vs := []iovec{ @@ -70,6 +73,10 @@ func getRawMessageControlLen(msg *rawMessage) int { return int(msg.Hdr.Controllen) } +func getRawMessageFlags(msg *rawMessage) int { + return int(msg.Hdr.Flags) +} + func setCmsgLen(h *unix.Cmsghdr, l int) { h.Len = uint64(l) }