Compare commits

...

6 Commits

Author SHA1 Message Date
Vitaliy Filippov e8004f04a5 Allow to spawn more than 1 FUSE file descriptor reader goroutine 2022-02-01 21:15:00 +03:00
Vitaliy Filippov 775aacf12c Allow to use "zero-copy" writes 2022-02-01 21:15:00 +03:00
Vitaliy Filippov 2be4ecc37d Fix examples to use vectored read 2022-02-01 21:15:00 +03:00
Vitaliy Filippov ac82ada21e Add vectored read to readbenchfs
You can now run `./readbenchfs --mount_point dir --vectored` and then
`dd if=dir/test of=/dev/null iflag=direct bs=1M status=progress` to test
vectored read speed.

My results with GOMAXPROCS=1:
- Before vectored read patch: 390 MB/s read
- Non-vectored read after vectored read patch: 830 MB/s read
- Vectored read: 1200 MB/s read
2022-02-01 21:14:59 +03:00
Vitaliy Filippov 694c1bf9db Introduce VectoredReadOp
Read requests can now take vectored responses from the filesystem
implementation and send them to FUSE device via the writev() system call.

This allows file systems to send data without copying it into the
library-provided buffer if the data is already in memory.

The change also speeds up normal ReadFileOps as a side effect because
it removes extra memory allocations.
2022-02-01 21:14:40 +03:00
Vitaliy Filippov b5cbfcd8b6 Add ReadBenchFS to test linear read speed 2022-02-01 21:14:38 +03:00
23 changed files with 509 additions and 135 deletions

View File

@ -338,7 +338,7 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) {
// Loop past transient errors.
for {
// Attempt a reaed.
// Attempt a read.
err := m.Init(c.dev)
// Special cases:
@ -405,7 +405,7 @@ func (c *Connection) ReadOp() (_ context.Context, op interface{}, _ error) {
// Convert the message to an op.
outMsg := c.getOutMessage()
op, err = convertInMessage(inMsg, outMsg, c.protocol)
op, err = convertInMessage(&c.cfg, inMsg, outMsg, c.protocol)
if err != nil {
c.putOutMessage(outMsg)
return nil, nil, fmt.Errorf("convertInMessage: %v", err)
@ -485,8 +485,15 @@ func (c *Connection) Reply(ctx context.Context, opErr error) {
outMsg := state.outMsg
fuseID := inMsg.Header().Unique
suppressReuse := false
if wr, ok := op.(*fuseops.WriteFileOp); ok {
suppressReuse = wr.SuppressReuse
}
// Make sure we destroy the messages when we're done.
defer c.putInMessage(inMsg)
if !suppressReuse {
defer c.putInMessage(inMsg)
}
defer c.putOutMessage(outMsg)
// Clean up state for this op.
@ -510,10 +517,16 @@ func (c *Connection) Reply(ctx context.Context, opErr error) {
noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr)
if !noResponse {
err := c.writeMessage(outMsg.Bytes())
if err != nil && c.errorLogger != nil {
c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.Bytes())
var err error
if outMsg.Sglist != nil {
_, err = writev(int(c.dev.Fd()), outMsg.Sglist)
} else {
err = c.writeMessage(outMsg.OutHeaderBytes())
}
if err != nil && c.errorLogger != nil {
c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.OutHeaderBytes())
}
outMsg.Sglist = nil
}
}

View File

@ -38,6 +38,7 @@ import (
//
// The caller is responsible for arranging for the message to be destroyed.
func convertInMessage(
config *MountConfig,
inMsg *buffer.InMessage,
outMsg *buffer.OutMessage,
protocol fusekernel.Protocol) (o interface{}, err error) {
@ -284,24 +285,28 @@ func convertInMessage(
return nil, errors.New("Corrupt OpRead")
}
to := &fuseops.ReadFileOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Handle: fuseops.HandleID(in.Fh),
Offset: int64(in.Offset),
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
if !config.UseVectoredRead {
// Use part of the incoming message storage as the read buffer
buf := inMsg.GetFree(int(in.Size))
to := &fuseops.ReadFileOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Handle: fuseops.HandleID(in.Fh),
Offset: int64(in.Offset),
Dst: buf,
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
}
o = to
} else {
// Don't allocate any buffers when zero-copy is used
to := &fuseops.VectoredReadOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Handle: fuseops.HandleID(in.Fh),
Offset: int64(in.Offset),
Size: int64(in.Size),
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
}
o = to
}
o = to
readSize := int(in.Size)
p := outMsg.GrowNoZero(readSize)
if p == nil {
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
}
sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst))
sh.Data = uintptr(p)
sh.Len = readSize
sh.Cap = readSize
case fusekernel.OpReaddir:
in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol)))
@ -318,7 +323,7 @@ func convertInMessage(
o = to
readSize := int(in.Size)
p := outMsg.GrowNoZero(readSize)
p := outMsg.Grow(readSize)
if p == nil {
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
}
@ -489,15 +494,19 @@ func convertInMessage(
o = to
readSize := int(in.Size)
p := outMsg.GrowNoZero(readSize)
if p == nil {
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
}
if readSize > 0 {
p := outMsg.Grow(readSize)
if p == nil {
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
}
sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst))
sh.Data = uintptr(p)
sh.Len = readSize
sh.Cap = readSize
sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst))
sh.Data = uintptr(p)
sh.Len = readSize
sh.Cap = readSize
} else {
to.Dst = nil
}
case fusekernel.OpListxattr:
type input fusekernel.ListxattrIn
@ -514,7 +523,7 @@ func convertInMessage(
readSize := int(in.Size)
if readSize != 0 {
p := outMsg.GrowNoZero(readSize)
p := outMsg.Grow(readSize)
if p == nil {
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
}
@ -718,9 +727,11 @@ func (c *Connection) kernelResponseForOp(
}
case *fuseops.ReadFileOp:
// convertInMessage already set up the destination buffer to be at the end
// of the out message. We need only shrink to the right size based on how
// much the user read.
m.Append(o.Dst)
m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead)
case *fuseops.VectoredReadOp:
m.Append(o.Data...)
m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead)
case *fuseops.WriteFileOp:

View File

@ -97,6 +97,11 @@ func describeRequest(op interface{}) (s string) {
addComponent("offset %d", typed.Offset)
addComponent("%d bytes", len(typed.Dst))
case *fuseops.VectoredReadOp:
addComponent("handle %d", typed.Handle)
addComponent("offset %d", typed.Offset)
addComponent("%d bytes", typed.Size)
case *fuseops.WriteFileOp:
addComponent("handle %d", typed.Handle)
addComponent("offset %d", typed.Offset)

View File

@ -654,6 +654,37 @@ type ReadFileOp struct {
OpContext OpContext
}
// Vectored read - same as ReadFileOp, but the buffer isn't provided by the library.
// The file system returns a list of slices instead.
type VectoredReadOp struct {
// The file inode that we are reading, and the handle previously returned by
// CreateFile or OpenFile when opening that inode.
Inode InodeID
Handle HandleID
// The offset within the file at which to read.
Offset int64
// The size of the read.
Size int64
// Set by the file system: data to send back to the client.
Data [][]byte
// Set by the file system: the number of bytes read.
//
// The FUSE documentation requires that exactly the requested number of bytes
// be returned, except in the case of EOF or error (http://goo.gl/ZgfBkF).
// This appears to be because it uses file mmapping machinery
// (http://goo.gl/SGxnaN) to read a page at a time. It appears to understand
// where EOF is by checking the inode size (http://goo.gl/0BkqKD), returned
// by a previous call to LookUpInode, GetInodeAttributes, etc.
//
// If direct IO is enabled, semantics should match those of read(2).
BytesRead int
OpContext OpContext
}
// Write data to a file previously opened with CreateFile or OpenFile.
//
// When the user writes data using write(2), the write goes into the page
@ -710,7 +741,18 @@ type WriteFileOp struct {
// be written, except on error (http://goo.gl/KUpwwn). This appears to be
// because it uses file mmapping machinery (http://goo.gl/SGxnaN) to write a
// page at a time.
Data []byte
Data []byte
// Set by the file system: "no reuse" flag.
//
// By default, the Data buffer is reused by the library, so the file system
// must copy the data if it wants to use it later.
//
// However, if the file system sets this flag to true, the library doesn't
// reuse this buffer, so the file system can safely store and use Data slice
// without copying memory.
SuppressReuse bool
OpContext OpContext
}

View File

@ -52,6 +52,7 @@ type FileSystem interface {
ReleaseDirHandle(context.Context, *fuseops.ReleaseDirHandleOp) error
OpenFile(context.Context, *fuseops.OpenFileOp) error
ReadFile(context.Context, *fuseops.ReadFileOp) error
VectoredRead(context.Context, *fuseops.VectoredReadOp) error
WriteFile(context.Context, *fuseops.WriteFileOp) error
SyncFile(context.Context, *fuseops.SyncFileOp) error
FlushFile(context.Context, *fuseops.FlushFileOp) error
@ -190,6 +191,9 @@ func (s *fileSystemServer) handleOp(
case *fuseops.ReadFileOp:
err = s.fs.ReadFile(ctx, typed)
case *fuseops.VectoredReadOp:
err = s.fs.VectoredRead(ctx, typed)
case *fuseops.WriteFileOp:
err = s.fs.WriteFile(ctx, typed)

View File

@ -138,6 +138,12 @@ func (fs *NotImplementedFileSystem) ReadFile(
return fuse.ENOSYS
}
func (fs *NotImplementedFileSystem) VectoredRead(
ctx context.Context,
op *fuseops.VectoredReadOp) error {
return fuse.ENOSYS
}
func (fs *NotImplementedFileSystem) WriteFile(
ctx context.Context,
op *fuseops.WriteFileOp) error {

View File

@ -42,6 +42,7 @@ func init() {
type InMessage struct {
remaining []byte
storage []byte
size int
}
// NewInMessage creates a new InMessage with its storage initialized.
@ -66,6 +67,7 @@ func (m *InMessage) Init(r io.Reader) error {
return fmt.Errorf("Unexpectedly read only %d bytes.", n)
}
m.size = n
m.remaining = m.storage[headerSize:n]
// Check the header's length.
@ -114,3 +116,11 @@ func (m *InMessage) ConsumeBytes(n uintptr) []byte {
return b
}
// Get the next n bytes after the message to use them as a temporary buffer
func (m *InMessage) GetFree(n int) []byte {
if n <= 0 || n > len(m.storage)-m.size {
return nil
}
return m.storage[m.size : m.size+n]
}

View File

@ -17,4 +17,4 @@ package buffer
// The maximum fuse write request size that InMessage can acommodate.
//
// As of kernel 4.20 Linux accepts writes up to 256 pages or 1MiB
const MaxWriteSize = 1 << 20
const MaxWriteSize = 1 << 17

View File

@ -16,7 +16,6 @@ package buffer
import (
"fmt"
"log"
"reflect"
"unsafe"
@ -33,30 +32,15 @@ const OutMessageHeaderSize = int(unsafe.Sizeof(fusekernel.OutHeader{}))
//
// Must be initialized with Reset.
type OutMessage struct {
// The offset into payload to which we're currently writing.
payloadOffset int
header fusekernel.OutHeader
payload [MaxReadSize]byte
}
// Make sure that the header and payload are contiguous.
func init() {
a := unsafe.Offsetof(OutMessage{}.header) + uintptr(OutMessageHeaderSize)
b := unsafe.Offsetof(OutMessage{}.payload)
if a != b {
log.Panicf(
"header ends at offset %d, but payload starts at offset %d",
a, b)
}
header fusekernel.OutHeader
Sglist [][]byte
}
// Reset resets m so that it's ready to be used again. Afterward, the contents
// are solely a zeroed fusekernel.OutHeader struct.
func (m *OutMessage) Reset() {
m.payloadOffset = 0
m.header = fusekernel.OutHeader{}
m.Sglist = nil
}
// OutHeader returns a pointer to the header at the start of the message.
@ -64,30 +48,12 @@ func (m *OutMessage) OutHeader() *fusekernel.OutHeader {
return &m.header
}
// Grow grows m's buffer by the given number of bytes, returning a pointer to
// the start of the new segment, which is guaranteed to be zeroed. If there is
// insufficient space, it returns nil.
// Grow adds a new buffer of <n> bytes to the message, returning a pointer to
// the start of the new segment, which is guaranteed to be zeroed.
func (m *OutMessage) Grow(n int) unsafe.Pointer {
p := m.GrowNoZero(n)
if p != nil {
jacobsa_fuse_memclr(p, uintptr(n))
}
return p
}
// GrowNoZero is equivalent to Grow, except the new segment is not zeroed. Use
// with caution!
func (m *OutMessage) GrowNoZero(n int) unsafe.Pointer {
// Will we overflow the buffer?
o := m.payloadOffset
if len(m.payload)-o < n {
return nil
}
p := unsafe.Pointer(uintptr(unsafe.Pointer(&m.payload)) + uintptr(o))
m.payloadOffset = o + n
b := make([]byte, n)
m.Append(b)
p := unsafe.Pointer(&b[0])
return p
}
@ -100,51 +66,62 @@ func (m *OutMessage) ShrinkTo(n int) {
n,
m.Len()))
}
m.payloadOffset = n - OutMessageHeaderSize
if n == OutMessageHeaderSize {
m.Sglist = nil
} else {
i := 1
n -= OutMessageHeaderSize
for len(m.Sglist) > i && n >= len(m.Sglist[i]) {
n -= len(m.Sglist[i])
i++
}
if n > 0 {
m.Sglist[i] = m.Sglist[i][0:n]
i++
}
m.Sglist = m.Sglist[0:i]
}
}
// Append is equivalent to growing by len(src), then copying src over the new
// segment. Int panics if there is not enough room available.
func (m *OutMessage) Append(src []byte) {
p := m.GrowNoZero(len(src))
if p == nil {
panic(fmt.Sprintf("Can't grow %d bytes", len(src)))
func (m *OutMessage) Append(src ...[]byte) {
if m.Sglist == nil {
// First element of Sglist is pre-filled with a pointer to the header
// to allow sending it with a single writev() call without copying the
// slice again
m.Sglist = append(m.Sglist, m.OutHeaderBytes())
}
sh := (*reflect.SliceHeader)(unsafe.Pointer(&src))
jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len))
m.Sglist = append(m.Sglist, src...)
return
}
// AppendString is like Append, but accepts string input.
func (m *OutMessage) AppendString(src string) {
p := m.GrowNoZero(len(src))
if p == nil {
panic(fmt.Sprintf("Can't grow %d bytes", len(src)))
}
sh := (*reflect.StringHeader)(unsafe.Pointer(&src))
jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len))
m.Append([]byte(src))
return
}
// Len returns the current size of the message, including the leading header.
func (m *OutMessage) Len() int {
return OutMessageHeaderSize + m.payloadOffset
if m.Sglist == nil {
return OutMessageHeaderSize
}
// First element of Sglist is the header, so we don't need to count it here
r := 0
for _, b := range m.Sglist {
r += len(b)
}
return r
}
// Bytes returns a reference to the current contents of the buffer, including
// the leading header.
func (m *OutMessage) Bytes() []byte {
l := m.Len()
// OutHeaderBytes returns a byte slice containing the current header.
func (m *OutMessage) OutHeaderBytes() []byte {
l := OutMessageHeaderSize
sh := reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(&m.header)),
Len: l,
Cap: l,
}
return *(*[]byte)(unsafe.Pointer(&sh))
}

View File

@ -107,9 +107,12 @@ func TestOutMessageAppend(t *testing.T) {
t.Errorf("om.Len() = %d, want %d", got, want)
}
b := om.Bytes()
b := []byte(nil)
for i := 0; i < len(om.Sglist); i++ {
b = append(b, om.Sglist[i]...)
}
if got, want := len(b), wantLen; got != want {
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want)
}
want := append(
@ -137,9 +140,12 @@ func TestOutMessageAppendString(t *testing.T) {
t.Errorf("om.Len() = %d, want %d", got, want)
}
b := om.Bytes()
b := []byte(nil)
for i := 0; i < len(om.Sglist); i++ {
b = append(b, om.Sglist[i]...)
}
if got, want := len(b), wantLen; got != want {
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want)
}
want := append(
@ -168,9 +174,12 @@ func TestOutMessageShrinkTo(t *testing.T) {
t.Errorf("om.Len() = %d, want %d", got, want)
}
b := om.Bytes()
b := []byte(nil)
for i := 0; i < len(om.Sglist); i++ {
b = append(b, om.Sglist[i]...)
}
if got, want := len(b), wantLen; got != want {
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want)
}
want := append(
@ -201,7 +210,7 @@ func TestOutMessageHeader(t *testing.T) {
*h = want
// Check that the result is as expected.
b := om.Bytes()
b := om.OutHeaderBytes()
if len(b) != int(unsafe.Sizeof(want)) {
t.Fatalf("unexpected length %d; want %d", len(b), unsafe.Sizeof(want))
}
@ -225,9 +234,7 @@ func TestOutMessageReset(t *testing.T) {
}
// Ensure a non-zero payload length.
if p := om.GrowNoZero(128); p == nil {
t.Fatal("GrowNoZero failed")
}
p := om.Grow(128)
// Reset.
om.Reset()
@ -259,10 +266,7 @@ func TestOutMessageGrow(t *testing.T) {
// Set up garbage where the payload will soon be.
const payloadSize = 1234
{
p := om.GrowNoZero(payloadSize)
if p == nil {
t.Fatal("GrowNoZero failed")
}
p := om.Grow(payloadSize)
err := fillWithGarbage(p, payloadSize)
if err != nil {
@ -283,7 +287,10 @@ func TestOutMessageGrow(t *testing.T) {
t.Errorf("om.Len() = %d, want %d", got, want)
}
b := om.Bytes()
b := []byte(nil)
for i := 0; i < len(om.Sglist); i++ {
b = append(b, om.Sglist[i]...)
}
if got, want := len(b), wantLen; got != want {
t.Fatalf("len(om.Len()) = %d, want %d", got, want)
}
@ -304,7 +311,7 @@ func BenchmarkOutMessageReset(b *testing.B) {
om.Reset()
}
b.SetBytes(int64(unsafe.Offsetof(om.payload)))
b.SetBytes(int64(om.Len()))
})
// Many megabytes worth of buffers, which should defeat the CPU cache.
@ -321,7 +328,7 @@ func BenchmarkOutMessageReset(b *testing.B) {
oms[i%numMessages].Reset()
}
b.SetBytes(int64(unsafe.Offsetof(oms[0].payload)))
b.SetBytes(int64(oms[0].Len()))
})
}

View File

@ -20,6 +20,7 @@ import (
"net"
"os"
"os/exec"
"sync/atomic"
"syscall"
)
@ -71,6 +72,9 @@ func Mount(
if cfgCopy.OpContext == nil {
cfgCopy.OpContext = context.Background()
}
if cfgCopy.ReaderThreads < 1 {
cfgCopy.ReaderThreads = 1
}
// Create a Connection object wrapping the device.
connection, err := newConnection(
@ -83,11 +87,16 @@ func Mount(
}
// Serve the connection in the background. When done, set the join status.
go func() {
server.ServeOps(connection)
mfs.joinStatus = connection.close()
close(mfs.joinStatusAvailable)
}()
atomic.AddInt64(&mfs.joinRemaining, int64(cfgCopy.ReaderThreads))
for i := 0; i < cfgCopy.ReaderThreads; i++ {
go func() {
server.ServeOps(connection)
if atomic.AddInt64(&mfs.joinRemaining, -1) == 0 {
mfs.joinStatus = connection.close()
close(mfs.joinStatusAvailable)
}
}()
}
// Wait for the mount process to complete.
if err := <-ready; err != nil {

View File

@ -156,6 +156,16 @@ type MountConfig struct {
// actually utilise any form of qualifiable UNIX permissions.
DisableDefaultPermissions bool
// Use VectoredReadOp instead of ReadFileOp.
// Vectored read allows file systems to reduce memory copying overhead if
// the data is already in memory when they return it to FUSE.
UseVectoredRead bool
// Number of goroutines (and hopefully threads) to use for reading from
// the FUSE file descriptor. You can try to use more than 1 if memory
// copying during write operations is a bottleneck for you
ReaderThreads int
// OS X only.
//
// The name of the mounted volume, as displayed in the Finder. If empty, a

View File

@ -23,6 +23,7 @@ type MountedFileSystem struct {
// The result to return from Join. Not valid until the channel is closed.
joinStatus error
joinRemaining int64
joinStatusAvailable chan struct{}
}

View File

@ -368,6 +368,8 @@ func (fs *cachingFS) ReadFile(
ctx context.Context,
op *fuseops.ReadFileOp) error {
var err error
op.BytesRead, err = io.ReadFull(rand.Reader, op.Dst)
dst := make([]byte, op.Size)
op.BytesRead, err = io.ReadFull(rand.Reader, dst)
op.Data = [][]byte{dst}
return err
}

View File

@ -246,7 +246,8 @@ func (fs *dynamicFS) ReadFile(
}
reader := strings.NewReader(contents)
var err error
op.BytesRead, err = reader.ReadAt(op.Dst, op.Offset)
op.Data = [][]byte{ make([]byte, op.Size) }
op.BytesRead, err = reader.ReadAt(op.Data[0], op.Offset)
if err == io.EOF {
return nil
}

View File

@ -26,7 +26,7 @@ import (
"github.com/jacobsa/fuse/fuseutil"
)
const FooContents = "xxxx"
var FooContents = []byte("xxxx")
const fooInodeID = fuseops.RootInodeID + 1
@ -171,7 +171,8 @@ func (fs *errorFS) ReadFile(
return fmt.Errorf("Unexpected request: %#v", op)
}
op.BytesRead = copy(op.Dst, FooContents)
op.Data = [][]byte{FooContents}
op.BytesRead = len(FooContents)
return nil
}

View File

@ -196,7 +196,12 @@ func (fs *flushFS) ReadFile(
}
// Read what we can.
op.BytesRead = copy(op.Dst, fs.fooContents[op.Offset:])
end := op.Offset+op.Size
if end > int64(len(fs.fooContents)) {
end = int64(len(fs.fooContents))
}
op.Data = [][]byte{ fs.fooContents[op.Offset : end] }
op.BytesRead = int(end-op.Offset)
return nil
}

View File

@ -250,7 +250,8 @@ func (fs *helloFS) ReadFile(
reader := strings.NewReader("Hello, world!")
var err error
op.BytesRead, err = reader.ReadAt(op.Dst, op.Offset)
op.Data = [][]byte{ make([]byte, op.Size) }
op.BytesRead, err = reader.ReadAt(op.Data[0], op.Offset)
// Special case: FUSE doesn't expect us to return io.EOF.
if err == io.EOF {

View File

@ -692,7 +692,8 @@ func (fs *memFS) ReadFile(
// Serve the request.
var err error
op.BytesRead, err = inode.ReadAt(op.Dst, op.Offset)
op.Data = [][]byte{ make([]byte, op.Size) }
op.BytesRead, err = inode.ReadAt(op.Data[0], op.Offset)
// Don't return EOF errors; we just indicate EOF to fuse using a short read.
if err == io.EOF {

View File

@ -0,0 +1,48 @@
package main
import (
"context"
"flag"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/samples/readbenchfs"
"log"
"os"
)
var fMountPoint = flag.String("mount_point", "", "Path to mount point.")
var fReadOnly = flag.Bool("read_only", false, "Mount in read-only mode.")
var fVectored = flag.Bool("vectored", false, "Use vectored read.")
var fDebug = flag.Bool("debug", false, "Enable debug logging.")
func main() {
flag.Parse()
server, err := readbenchfs.NewReadBenchServer()
if err != nil {
log.Fatalf("makeFS: %v", err)
}
// Mount the file system.
if *fMountPoint == "" {
log.Fatalf("You must set --mount_point.")
}
cfg := &fuse.MountConfig{
ReadOnly: *fReadOnly,
UseVectoredRead: *fVectored,
}
if *fDebug {
cfg.DebugLogger = log.New(os.Stderr, "fuse: ", 0)
}
mfs, err := fuse.Mount(*fMountPoint, server, cfg)
if err != nil {
log.Fatalf("Mount: %v", err)
}
// Wait for it to be unmounted.
if err = mfs.Join(context.Background()); err != nil {
log.Fatalf("Join: %v", err)
}
}

View File

@ -0,0 +1,186 @@
// Copyright 2021 Vitaliy Filippov
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package readbenchfs
import (
"golang.org/x/net/context"
"io"
"math/rand"
"os"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
)
type readBenchFS struct {
fuseutil.NotImplementedFileSystem
buf []byte
}
// 1 TB
const fileSize = 1024 * 1024 * 1024 * 1024
var _ fuseutil.FileSystem = &readBenchFS{}
func NewReadBenchServer() (server fuse.Server, err error) {
// 1 GB of random data to exceed CPU cache
buf := make([]byte, 1024*1024*1024)
rand.Read(buf)
server = fuseutil.NewFileSystemServer(&readBenchFS{
buf: buf,
})
return
}
func (fs *readBenchFS) StatFS(ctx context.Context, op *fuseops.StatFSOp) error {
return nil
}
func (fs *readBenchFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error {
if op.Name == "test" {
op.Entry = fuseops.ChildInodeEntry{
Child: 2,
Attributes: fuseops.InodeAttributes{
Size: fileSize,
Nlink: 1,
Mode: 0444,
},
}
return nil
}
return fuse.ENOENT
}
func (fs *readBenchFS) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error {
if op.Inode == 1 {
op.Attributes = fuseops.InodeAttributes{
Nlink: 1,
Mode: 0755 | os.ModeDir,
}
return nil
} else if op.Inode == 2 {
op.Attributes = fuseops.InodeAttributes{
Size: fileSize,
Nlink: 1,
Mode: 0444,
}
return nil
}
return fuse.ENOENT
}
func (fs *readBenchFS) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) error {
// Allow opening any directory.
return nil
}
func (fs *readBenchFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error {
if op.Inode != 1 {
return fuse.ENOENT
}
if op.Offset > 0 {
return nil
}
entries := []fuseutil.Dirent{
fuseutil.Dirent{
Offset: 1,
Inode: 2,
Name: "test",
Type: fuseutil.DT_File,
},
}
for _, e := range entries[op.Offset:] {
n := fuseutil.WriteDirent(op.Dst[op.BytesRead:], e)
if n == 0 {
break
}
op.BytesRead += n
}
return nil
}
func (fs *readBenchFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error {
// Allow opening any file.
return nil
}
func (fs *readBenchFS) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error {
if op.Offset > fileSize {
return io.EOF
}
end := op.Offset + int64(len(op.Dst))
if end > fileSize {
end = fileSize
}
buflen := int64(len(fs.buf))
for pos := op.Offset; pos < end; {
s := pos % buflen
e := buflen
if e-s > end-pos {
e = s + end - pos
}
copy(op.Dst[pos-op.Offset:], fs.buf[s:])
pos = op.Offset + e
}
op.BytesRead = int(end - op.Offset)
return nil
}
func (fs *readBenchFS) VectoredRead(ctx context.Context, op *fuseops.VectoredReadOp) error {
if op.Offset > fileSize {
return io.EOF
}
end := op.Offset + op.Size
if end > fileSize {
end = fileSize
}
buflen := int64(len(fs.buf))
for pos := op.Offset; pos < end; {
s := pos % buflen
e := buflen
if e-s > end-pos {
e = s + end - pos
}
op.Data = append(op.Data, fs.buf[s:e])
pos = op.Offset + e
}
op.BytesRead = int(end - op.Offset)
return nil
}
func (fs *readBenchFS) ReleaseDirHandle(ctx context.Context, op *fuseops.ReleaseDirHandleOp) error {
return nil
}
func (fs *readBenchFS) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) error {
return nil
}
func (fs *readBenchFS) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) error {
return nil
}
func (fs *readBenchFS) ForgetInode(ctx context.Context, op *fuseops.ForgetInodeOp) error {
return nil
}
func (fs *readBenchFS) ReleaseFileHandle(ctx context.Context, op *fuseops.ReleaseFileHandleOp) error {
return nil
}
func (fs *readBenchFS) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) error {
return nil
}

View File

@ -160,8 +160,13 @@ func (fs *readonlyLoopbackFs) ReadFile(
return fuse.EIO
}
contents = contents[op.Offset:]
op.BytesRead = copy(op.Dst, contents)
end := op.Offset+op.Size
if end > int64(len(contents)) {
end = int64(len(contents))
}
op.Data = [][]byte{ contents[op.Offset : end] }
op.BytesRead = int(end-op.Offset)
return nil
}

29
writev.go Normal file
View File

@ -0,0 +1,29 @@
package fuse
import (
"syscall"
"unsafe"
)
func writev(fd int, packet [][]byte) (n int, err error) {
iovecs := make([]syscall.Iovec, 0, len(packet))
for _, v := range packet {
if len(v) == 0 {
continue
}
vec := syscall.Iovec{
Base: &v[0],
}
vec.SetLen(len(v))
iovecs = append(iovecs, vec)
}
n1, _, e1 := syscall.Syscall(
syscall.SYS_WRITEV,
uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)),
)
n = int(n1)
if e1 != 0 {
err = syscall.Errno(e1)
}
return
}