diff --git a/connection.go b/connection.go index 885ab49..7d08cc0 100644 --- a/connection.go +++ b/connection.go @@ -338,7 +338,7 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) { // Loop past transient errors. for { - // Attempt a reaed. + // Attempt a read. err := m.Init(c.dev) // Special cases: @@ -405,7 +405,7 @@ func (c *Connection) ReadOp() (_ context.Context, op interface{}, _ error) { // Convert the message to an op. outMsg := c.getOutMessage() - op, err = convertInMessage(inMsg, outMsg, c.protocol) + op, err = convertInMessage(&c.cfg, inMsg, outMsg, c.protocol) if err != nil { c.putOutMessage(outMsg) return nil, nil, fmt.Errorf("convertInMessage: %v", err) @@ -510,10 +510,16 @@ func (c *Connection) Reply(ctx context.Context, opErr error) { noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) if !noResponse { - err := c.writeMessage(outMsg.Bytes()) - if err != nil && c.errorLogger != nil { - c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.Bytes()) + var err error + if outMsg.Sglist != nil { + _, err = writev(int(c.dev.Fd()), outMsg.Sglist) + } else { + err = c.writeMessage(outMsg.OutHeaderBytes()) } + if err != nil && c.errorLogger != nil { + c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) + } + outMsg.Sglist = nil } } diff --git a/conversions.go b/conversions.go index 6e94f97..4416412 100644 --- a/conversions.go +++ b/conversions.go @@ -38,6 +38,7 @@ import ( // // The caller is responsible for arranging for the message to be destroyed. func convertInMessage( + config *MountConfig, inMsg *buffer.InMessage, outMsg *buffer.OutMessage, protocol fusekernel.Protocol) (o interface{}, err error) { @@ -288,20 +289,15 @@ func convertInMessage( Inode: fuseops.InodeID(inMsg.Header().Nodeid), Handle: fuseops.HandleID(in.Fh), Offset: int64(in.Offset), + Size: int64(in.Size), OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, } - o = to - - readSize := int(in.Size) - p := outMsg.GrowNoZero(readSize) - if p == nil { - return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) + if !config.UseVectoredRead { + // Use part of the incoming message storage as the read buffer + // For vectored zero-copy reads, don't allocate any buffers + to.Dst = inMsg.GetFree(int(in.Size)) } - - sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) - sh.Data = uintptr(p) - sh.Len = readSize - sh.Cap = readSize + o = to case fusekernel.OpReaddir: in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol))) @@ -318,7 +314,7 @@ func convertInMessage( o = to readSize := int(in.Size) - p := outMsg.GrowNoZero(readSize) + p := outMsg.Grow(readSize) if p == nil { return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) } @@ -489,15 +485,19 @@ func convertInMessage( o = to readSize := int(in.Size) - p := outMsg.GrowNoZero(readSize) - if p == nil { - return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) - } + if readSize > 0 { + p := outMsg.Grow(readSize) + if p == nil { + return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) + } - sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) - sh.Data = uintptr(p) - sh.Len = readSize - sh.Cap = readSize + sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) + sh.Data = uintptr(p) + sh.Len = readSize + sh.Cap = readSize + } else { + to.Dst = nil + } case fusekernel.OpListxattr: type input fusekernel.ListxattrIn @@ -514,7 +514,7 @@ func convertInMessage( readSize := int(in.Size) if readSize != 0 { - p := outMsg.GrowNoZero(readSize) + p := outMsg.Grow(readSize) if p == nil { return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) } @@ -718,9 +718,11 @@ func (c *Connection) kernelResponseForOp( } case *fuseops.ReadFileOp: - // convertInMessage already set up the destination buffer to be at the end - // of the out message. We need only shrink to the right size based on how - // much the user read. + if o.Dst != nil { + m.Append(o.Dst) + } else { + m.Append(o.Data...) + } m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead) case *fuseops.WriteFileOp: diff --git a/debug.go b/debug.go index 2a4dcc0..e6a3f8d 100644 --- a/debug.go +++ b/debug.go @@ -95,7 +95,7 @@ func describeRequest(op interface{}) (s string) { case *fuseops.ReadFileOp: addComponent("handle %d", typed.Handle) addComponent("offset %d", typed.Offset) - addComponent("%d bytes", len(typed.Dst)) + addComponent("%d bytes", typed.Size) case *fuseops.WriteFileOp: addComponent("handle %d", typed.Handle) diff --git a/fuseops/ops.go b/fuseops/ops.go index 353c70b..9a0b8c5 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -637,9 +637,17 @@ type ReadFileOp struct { // The offset within the file at which to read. Offset int64 + // The size of the read. + Size int64 + // The destination buffer, whose length gives the size of the read. + // For vectored reads, this field is always nil as the buffer is not provided. Dst []byte + // Set by the file system: + // A list of slices of data to send back to the client for vectored reads. + Data [][]byte + // Set by the file system: the number of bytes read. // // The FUSE documentation requires that exactly the requested number of bytes diff --git a/internal/buffer/in_message.go b/internal/buffer/in_message.go index 64df2c6..cb48816 100644 --- a/internal/buffer/in_message.go +++ b/internal/buffer/in_message.go @@ -42,6 +42,7 @@ func init() { type InMessage struct { remaining []byte storage []byte + size int } // NewInMessage creates a new InMessage with its storage initialized. @@ -66,6 +67,7 @@ func (m *InMessage) Init(r io.Reader) error { return fmt.Errorf("Unexpectedly read only %d bytes.", n) } + m.size = n m.remaining = m.storage[headerSize:n] // Check the header's length. @@ -114,3 +116,11 @@ func (m *InMessage) ConsumeBytes(n uintptr) []byte { return b } + +// Get the next n bytes after the message to use them as a temporary buffer +func (m *InMessage) GetFree(n int) []byte { + if n <= 0 || n > len(m.storage)-m.size { + return nil + } + return m.storage[m.size : m.size+n] +} diff --git a/internal/buffer/out_message.go b/internal/buffer/out_message.go index 7e89afb..c8036cb 100644 --- a/internal/buffer/out_message.go +++ b/internal/buffer/out_message.go @@ -16,7 +16,6 @@ package buffer import ( "fmt" - "log" "reflect" "unsafe" @@ -33,30 +32,15 @@ const OutMessageHeaderSize = int(unsafe.Sizeof(fusekernel.OutHeader{})) // // Must be initialized with Reset. type OutMessage struct { - // The offset into payload to which we're currently writing. - payloadOffset int - - header fusekernel.OutHeader - payload [MaxReadSize]byte -} - -// Make sure that the header and payload are contiguous. -func init() { - a := unsafe.Offsetof(OutMessage{}.header) + uintptr(OutMessageHeaderSize) - b := unsafe.Offsetof(OutMessage{}.payload) - - if a != b { - log.Panicf( - "header ends at offset %d, but payload starts at offset %d", - a, b) - } + header fusekernel.OutHeader + Sglist [][]byte } // Reset resets m so that it's ready to be used again. Afterward, the contents // are solely a zeroed fusekernel.OutHeader struct. func (m *OutMessage) Reset() { - m.payloadOffset = 0 m.header = fusekernel.OutHeader{} + m.Sglist = nil } // OutHeader returns a pointer to the header at the start of the message. @@ -64,30 +48,12 @@ func (m *OutMessage) OutHeader() *fusekernel.OutHeader { return &m.header } -// Grow grows m's buffer by the given number of bytes, returning a pointer to -// the start of the new segment, which is guaranteed to be zeroed. If there is -// insufficient space, it returns nil. +// Grow adds a new buffer of bytes to the message, returning a pointer to +// the start of the new segment, which is guaranteed to be zeroed. func (m *OutMessage) Grow(n int) unsafe.Pointer { - p := m.GrowNoZero(n) - if p != nil { - jacobsa_fuse_memclr(p, uintptr(n)) - } - - return p -} - -// GrowNoZero is equivalent to Grow, except the new segment is not zeroed. Use -// with caution! -func (m *OutMessage) GrowNoZero(n int) unsafe.Pointer { - // Will we overflow the buffer? - o := m.payloadOffset - if len(m.payload)-o < n { - return nil - } - - p := unsafe.Pointer(uintptr(unsafe.Pointer(&m.payload)) + uintptr(o)) - m.payloadOffset = o + n - + b := make([]byte, n) + m.Append(b) + p := unsafe.Pointer(&b[0]) return p } @@ -100,51 +66,62 @@ func (m *OutMessage) ShrinkTo(n int) { n, m.Len())) } - - m.payloadOffset = n - OutMessageHeaderSize + if n == OutMessageHeaderSize { + m.Sglist = nil + } else { + i := 1 + n -= OutMessageHeaderSize + for len(m.Sglist) > i && n >= len(m.Sglist[i]) { + n -= len(m.Sglist[i]) + i++ + } + if n > 0 { + m.Sglist[i] = m.Sglist[i][0:n] + i++ + } + m.Sglist = m.Sglist[0:i] + } } // Append is equivalent to growing by len(src), then copying src over the new // segment. Int panics if there is not enough room available. -func (m *OutMessage) Append(src []byte) { - p := m.GrowNoZero(len(src)) - if p == nil { - panic(fmt.Sprintf("Can't grow %d bytes", len(src))) +func (m *OutMessage) Append(src ...[]byte) { + if m.Sglist == nil { + // First element of Sglist is pre-filled with a pointer to the header + // to allow sending it with a single writev() call without copying the + // slice again + m.Sglist = append(m.Sglist, m.OutHeaderBytes()) } - - sh := (*reflect.SliceHeader)(unsafe.Pointer(&src)) - jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len)) - + m.Sglist = append(m.Sglist, src...) return } // AppendString is like Append, but accepts string input. func (m *OutMessage) AppendString(src string) { - p := m.GrowNoZero(len(src)) - if p == nil { - panic(fmt.Sprintf("Can't grow %d bytes", len(src))) - } - - sh := (*reflect.StringHeader)(unsafe.Pointer(&src)) - jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len)) - + m.Append([]byte(src)) return } // Len returns the current size of the message, including the leading header. func (m *OutMessage) Len() int { - return OutMessageHeaderSize + m.payloadOffset + if m.Sglist == nil { + return OutMessageHeaderSize + } + // First element of Sglist is the header, so we don't need to count it here + r := 0 + for _, b := range m.Sglist { + r += len(b) + } + return r } -// Bytes returns a reference to the current contents of the buffer, including -// the leading header. -func (m *OutMessage) Bytes() []byte { - l := m.Len() +// OutHeaderBytes returns a byte slice containing the current header. +func (m *OutMessage) OutHeaderBytes() []byte { + l := OutMessageHeaderSize sh := reflect.SliceHeader{ Data: uintptr(unsafe.Pointer(&m.header)), Len: l, Cap: l, } - return *(*[]byte)(unsafe.Pointer(&sh)) } diff --git a/internal/buffer/out_message_test.go b/internal/buffer/out_message_test.go index 6292794..1762b12 100644 --- a/internal/buffer/out_message_test.go +++ b/internal/buffer/out_message_test.go @@ -107,9 +107,12 @@ func TestOutMessageAppend(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { - t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) + t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want) } want := append( @@ -137,9 +140,12 @@ func TestOutMessageAppendString(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { - t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) + t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want) } want := append( @@ -168,9 +174,12 @@ func TestOutMessageShrinkTo(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { - t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) + t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want) } want := append( @@ -201,7 +210,7 @@ func TestOutMessageHeader(t *testing.T) { *h = want // Check that the result is as expected. - b := om.Bytes() + b := om.OutHeaderBytes() if len(b) != int(unsafe.Sizeof(want)) { t.Fatalf("unexpected length %d; want %d", len(b), unsafe.Sizeof(want)) } @@ -225,9 +234,7 @@ func TestOutMessageReset(t *testing.T) { } // Ensure a non-zero payload length. - if p := om.GrowNoZero(128); p == nil { - t.Fatal("GrowNoZero failed") - } + p := om.Grow(128) // Reset. om.Reset() @@ -259,10 +266,7 @@ func TestOutMessageGrow(t *testing.T) { // Set up garbage where the payload will soon be. const payloadSize = 1234 { - p := om.GrowNoZero(payloadSize) - if p == nil { - t.Fatal("GrowNoZero failed") - } + p := om.Grow(payloadSize) err := fillWithGarbage(p, payloadSize) if err != nil { @@ -283,7 +287,10 @@ func TestOutMessageGrow(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Len()) = %d, want %d", got, want) } @@ -304,7 +311,7 @@ func BenchmarkOutMessageReset(b *testing.B) { om.Reset() } - b.SetBytes(int64(unsafe.Offsetof(om.payload))) + b.SetBytes(int64(om.Len())) }) // Many megabytes worth of buffers, which should defeat the CPU cache. @@ -321,7 +328,7 @@ func BenchmarkOutMessageReset(b *testing.B) { oms[i%numMessages].Reset() } - b.SetBytes(int64(unsafe.Offsetof(oms[0].payload))) + b.SetBytes(int64(oms[0].Len())) }) } diff --git a/mount_config.go b/mount_config.go index d23224f..86be126 100644 --- a/mount_config.go +++ b/mount_config.go @@ -156,6 +156,13 @@ type MountConfig struct { // actually utilise any form of qualifiable UNIX permissions. DisableDefaultPermissions bool + // Use vectored reads. + // Vectored read allows file systems to avoid memory copying overhead if + // the data is already in memory when they return it to FUSE. + // When turned on, ReadFileOp.Dst is always nil and the FS must return data + // being read from the file as a list of slices in ReadFileOp.Data. + UseVectoredRead bool + // OS X only. // // The name of the mounted volume, as displayed in the Finder. If empty, a diff --git a/writev.go b/writev.go new file mode 100644 index 0000000..05c9358 --- /dev/null +++ b/writev.go @@ -0,0 +1,29 @@ +package fuse + +import ( + "syscall" + "unsafe" +) + +func writev(fd int, packet [][]byte) (n int, err error) { + iovecs := make([]syscall.Iovec, 0, len(packet)) + for _, v := range packet { + if len(v) == 0 { + continue + } + vec := syscall.Iovec{ + Base: &v[0], + } + vec.SetLen(len(v)) + iovecs = append(iovecs, vec) + } + n1, _, e1 := syscall.Syscall( + syscall.SYS_WRITEV, + uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)), + ) + n = int(n1) + if e1 != 0 { + err = syscall.Errno(e1) + } + return +}