Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ringbuf read on timeout #1111

Merged
merged 3 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/unix/types_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
BPF_FS_MAGIC = linux.BPF_FS_MAGIC
TRACEFS_MAGIC = linux.TRACEFS_MAGIC
DEBUGFS_MAGIC = linux.DEBUGFS_MAGIC
BPF_RB_NO_WAKEUP = linux.BPF_RB_NO_WAKEUP
BPF_RB_FORCE_WAKEUP = linux.BPF_RB_FORCE_WAKEUP
)

type Statfs_t = linux.Statfs_t
Expand Down
2 changes: 2 additions & 0 deletions internal/unix/types_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ const (
BPF_FS_MAGIC
TRACEFS_MAGIC
DEBUGFS_MAGIC
BPF_RB_NO_WAKEUP
BPF_RB_FORCE_WAKEUP
)

type Statfs_t struct {
Expand Down
11 changes: 9 additions & 2 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func (r *Reader) SetDeadline(t time.Time) {
// Read the next record from the BPF ringbuf.
//
// Returns os.ErrClosed if Close is called on the Reader, or os.ErrDeadlineExceeded
// if a deadline was set.
// if a deadline was set and no valid entry was present. A producer might use BPF_RB_NO_WAKEUP
// which may cause the deadline to expire but a valid entry will be present.
func (r *Reader) Read() (Record, error) {
var rec Record
return rec, r.ReadInto(&rec)
Expand All @@ -204,6 +205,11 @@ func (r *Reader) ReadInto(rec *Record) error {
for {
if !r.haveData {
_, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) && !r.ring.isEmpty() {
// Ignoring this for reading a valid entry after timeout
// This can occur if the producer submitted to the ring buffer with BPF_RB_NO_WAKEUP
err = nil
}
if err != nil {
return err
}
Expand All @@ -212,14 +218,15 @@ func (r *Reader) ReadInto(rec *Record) error {

for {
err := readRecord(r.ring, rec, r.header)
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy || err == errDiscard {
continue
}
if err == errEOR {
r.haveData = false
break
}

return err
}
}
Expand Down
88 changes: 69 additions & 19 deletions ringbuf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ import (
"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/testutils"
"github.com/cilium/ebpf/internal/testutils/fdtrace"
"github.com/cilium/ebpf/internal/unix"
"github.com/google/go-cmp/cmp"
)

type sampleMessage struct {
size int
flags int32
}

func TestMain(m *testing.M) {
fdtrace.TestMain(m)
}
Expand All @@ -24,19 +30,19 @@ func TestRingbufReader(t *testing.T) {

readerTests := []struct {
name string
messages []int
messages []sampleMessage
want map[int][]byte
}{
{
name: "send one short sample",
messages: []int{5},
messages: []sampleMessage{{size: 5}},
want: map[int][]byte{
5: {1, 2, 3, 4, 4},
},
},
{
name: "send three short samples, the second is discarded",
messages: []int{5, 10, 15},
messages: []sampleMessage{{size: 5}, {size: 10}, {size: 15}},
want: map[int][]byte{
5: {1, 2, 3, 4, 4},
15: {1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2},
Expand All @@ -45,7 +51,7 @@ func TestRingbufReader(t *testing.T) {
}
for _, tt := range readerTests {
t.Run(tt.name, func(t *testing.T) {
prog, events := mustOutputSamplesProg(t, 0, tt.messages...)
prog, events := mustOutputSamplesProg(t, tt.messages...)

rd, err := NewReader(events)
if err != nil {
Expand Down Expand Up @@ -80,7 +86,7 @@ func TestRingbufReader(t *testing.T) {
}
}

func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Map, error) {
func outputSamplesProg(sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map, error) {
events, err := ebpf.NewMap(&ebpf.MapSpec{
Type: ebpf.RingBuf,
MaxEntries: 4096,
Expand All @@ -90,9 +96,9 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma
}

var maxSampleSize int
for _, sampleSize := range sampleSizes {
if sampleSize > maxSampleSize {
maxSampleSize = sampleSize
for _, sampleMessage := range sampleMessages {
if sampleMessage.size > maxSampleSize {
maxSampleSize = sampleMessage.size
}
}

Expand All @@ -108,16 +114,16 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma
)
}

for sampleIdx, sampleSize := range sampleSizes {
for sampleIdx, sampleMessage := range sampleMessages {
insns = append(insns,
asm.LoadMapPtr(asm.R1, events.FD()),
asm.Mov.Imm(asm.R2, int32(sampleSize)),
asm.Mov.Imm(asm.R2, int32(sampleMessage.size)),
asm.Mov.Imm(asm.R3, int32(0)),
asm.FnRingbufReserve.Call(),
asm.JEq.Imm(asm.R0, 0, "exit"),
asm.Mov.Reg(asm.R5, asm.R0),
)
for i := 0; i < sampleSize; i++ {
for i := 0; i < sampleMessage.size; i++ {
insns = append(insns,
asm.LoadMem(asm.R4, asm.RFP, int16(i+1)*-1, asm.Byte),
asm.StoreMem(asm.R5, int16(i), asm.R4, asm.Byte),
Expand All @@ -128,13 +134,13 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma
if sampleIdx&1 != 0 {
insns = append(insns,
asm.Mov.Reg(asm.R1, asm.R5),
asm.Mov.Imm(asm.R2, flags),
asm.Mov.Imm(asm.R2, sampleMessage.flags),
asm.FnRingbufDiscard.Call(),
)
} else {
insns = append(insns,
asm.Mov.Reg(asm.R1, asm.R5),
asm.Mov.Imm(asm.R2, flags),
asm.Mov.Imm(asm.R2, sampleMessage.flags),
asm.FnRingbufSubmit.Call(),
)
}
Expand All @@ -158,10 +164,10 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma
return prog, events, nil
}

func mustOutputSamplesProg(tb testing.TB, flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Map) {
func mustOutputSamplesProg(tb testing.TB, sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map) {
tb.Helper()

prog, events, err := outputSamplesProg(flags, sampleSizes...)
prog, events, err := outputSamplesProg(sampleMessages...)
if err != nil {
tb.Fatal(err)
}
Expand All @@ -177,7 +183,7 @@ func mustOutputSamplesProg(tb testing.TB, flags int32, sampleSizes ...int) (*ebp
func TestReaderBlocking(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t, 0, 5)
prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5, flags: 0})
ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
if err != nil {
Expand Down Expand Up @@ -234,10 +240,54 @@ func TestReaderBlocking(t *testing.T) {
}
}

func TestReaderNoWakeup(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t,
sampleMessage{size: 5, flags: unix.BPF_RB_NO_WAKEUP}, // Read after timeout
sampleMessage{size: 6, flags: unix.BPF_RB_NO_WAKEUP}, // Discard
sampleMessage{size: 7, flags: unix.BPF_RB_NO_WAKEUP}) // Read won't block

rd, err := NewReader(events)
if err != nil {
t.Fatal(err)
}
defer rd.Close()

ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
if err != nil {
t.Fatal(err)
}

if errno := syscall.Errno(-int32(ret)); errno != 0 {
t.Fatal("Expected 0 as return value, got", errno)
}

rd.SetDeadline(time.Now())
record, err := rd.Read()

if err != nil {
t.Error("Expected no error from first Read, got:", err)
}
if len(record.RawSample) != 5 {
t.Errorf("Expected to read 5 bytes bot got %d", len(record.RawSample))
}

record, err = rd.Read()

if err != nil {
t.Error("Expected no error from second Read, got:", err)
}
if len(record.RawSample) != 7 {
t.Errorf("Expected to read 7 bytes bot got %d", len(record.RawSample))
}
}

func TestReaderSetDeadline(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

_, events := mustOutputSamplesProg(t, 0, 5)
_, events := mustOutputSamplesProg(t, sampleMessage{size: 5, flags: 0})
rd, err := NewReader(events)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -267,7 +317,7 @@ func BenchmarkReader(b *testing.B) {

for _, bm := range readerBenchmarks {
b.Run(bm.name, func(b *testing.B) {
prog, events := mustOutputSamplesProg(b, bm.flags, 80)
prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: bm.flags})

rd, err := NewReader(events)
if err != nil {
Expand Down Expand Up @@ -299,7 +349,7 @@ func BenchmarkReader(b *testing.B) {
func BenchmarkReadInto(b *testing.B) {
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(b, 0, 80)
prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: 0})

rd, err := NewReader(events)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ func (rr *ringReader) skipRead(skipBytes uint64) {
rr.cons += clamp(rr.cons, atomic.LoadUint64(rr.prod_pos), skipBytes)
}

func (rr *ringReader) isEmpty() bool {
cons := atomic.LoadUint64(rr.cons_pos)
prod := atomic.LoadUint64(rr.prod_pos)

return prod == cons
}

func (rr *ringReader) Read(p []byte) (int, error) {
prod := atomic.LoadUint64(rr.prod_pos)

Expand Down