Experimental: "cacheless restore" using a seekable stream

Do two passes instead of just sequentially writing all chunks to the standard output.
On the first pass, all "chunk emit" instructions are remembered together with their output
positions indexed by bundle id, and all "byte emit" instructions are executed using seeks.
On the second pass, all remembered "chunk emit" instructions are executed in the bundle
order. This makes zbackup decompress every used bundle only ONCE instead of doing
it (basically the same work) many times while reading different chunks.

This allows for bigger bundle sizes (I use 32M), which reduces the number of files
in the repository and makes it more cloud-storage-sync friendly, and further improves
the compression ratio.
master
Vitaliy Filippov 2015-05-21 01:15:10 +03:00
parent 4bd4fa8fa1
commit 9824cb2fb7
14 changed files with 193 additions and 37 deletions

View File

@ -12,6 +12,7 @@ Code contributions:
Benjamin Koch <bbbsnowball@gmail.com>
Gleb Golubitsky <sectoid@gnolltech.org>
Igor Katson <igor.katson@gmail.com>
Vitaliy Filippov <vitalif@yourcmc.ru>
Eugene Agafonov <e.a.agafonov@gmail.com>
Antonia Stevens <a@antevens.com>
Frank Groeneveld <frank@frankgroeneveld.nl>

View File

@ -38,7 +38,7 @@ void BundleCollector::startBundle( Bundle::Id const & bundleId )
usedChunks = 0;
}
void BundleCollector::processChunk( ChunkId const & chunkId )
void BundleCollector::processChunk( ChunkId const & chunkId, uint32_t size )
{
if ( gcDeep )
{

View File

@ -40,7 +40,7 @@ public:
void startBundle( Bundle::Id const & bundleId );
void processChunk( ChunkId const & chunkId );
void processChunk( ChunkId const & chunkId, uint32_t size );
void finishBundle( Bundle::Id const & bundleId, BundleInfo const & info );

View File

@ -15,9 +15,29 @@ namespace BackupRestorer {
using std::vector;
using google::protobuf::io::CodedInputStream;
void restoreMap( ChunkStorage::Reader & chunkStorageReader,
ChunkMap const * chunkMap, SeekableSink *output )
{
string chunk;
size_t chunkSize;
for ( ChunkMap::const_iterator it = chunkMap->begin(); it != chunkMap->end(); it++ )
{
for ( ChunkPosition::const_iterator pi = (*it).second.begin(); pi != (*it).second.end(); pi++ )
{
if ( output )
{
// Need to emit a chunk, reading it from the store
chunkStorageReader.get( (*pi).first, chunk, chunkSize );
output->saveData( (*pi).second, chunk.data(), chunkSize );
}
}
}
}
void restore( ChunkStorage::Reader & chunkStorageReader,
std::string const & backupData,
DataSink * output, ChunkSet * chunkSet )
DataSink * output, ChunkSet * chunkSet,
ChunkMap * chunkMap, SeekableSink * seekOut )
{
google::protobuf::io::ArrayInputStream is( backupData.data(),
backupData.size() );
@ -33,6 +53,7 @@ void restore( ChunkStorage::Reader & chunkStorageReader,
string chunk;
BackupInstruction instr;
int64_t position = 0;
while ( cis.BytesUntilLimit() > 0 )
{
Message::parse( instr, cis );
@ -40,24 +61,44 @@ void restore( ChunkStorage::Reader & chunkStorageReader,
if ( instr.has_chunk_to_emit() )
{
ChunkId id( instr.chunk_to_emit() );
size_t chunkSize;
if ( output )
{
// Need to emit a chunk, reading it from the store
size_t chunkSize;
chunkStorageReader.get( id, chunk, chunkSize );
output->saveData( chunk.data(), chunkSize );
}
if ( chunkMap )
{
Bundle::Id const *bundleId = chunkStorageReader.getBundleId( id, chunkSize );
ChunkMap::iterator it = chunkMap->find( *bundleId );
if ( it == chunkMap->end() )
{
ChunkPosition v;
std::pair< ChunkMap::iterator, bool > r = chunkMap->insert( std::make_pair( *bundleId, v ) );
it = r.first;
}
(*it).second.push_back( std::make_pair( id, position ) );
position += chunkSize;
}
if ( chunkSet )
{
chunkSet->insert( id );
}
}
if ( output && instr.has_bytes_to_emit() )
if ( ( output || chunkMap ) && instr.has_bytes_to_emit() )
{
// Need to emit the bytes directly
string const & bytes = instr.bytes_to_emit();
output->saveData( bytes.data(), bytes.size() );
if ( output )
output->saveData( bytes.data(), bytes.size() );
if ( chunkMap )
{
if ( seekOut )
seekOut->saveData( position, bytes.data(), bytes.size() );
position += bytes.size();
}
}
}
@ -84,7 +125,7 @@ void restoreIterations( ChunkStorage::Reader & chunkStorageReader,
}
} stringWriter;
restore( chunkStorageReader, backupData, &stringWriter, chunkSet );
restore( chunkStorageReader, backupData, &stringWriter, chunkSet, NULL, NULL );
backupInfo.mutable_backup_data()->swap( stringWriter.result );
backupInfo.set_iterations( backupInfo.iterations() - 1 );
}

View File

@ -9,6 +9,9 @@
#include <string>
#include <set>
#undef __DEPRECATED
#include <ext/hash_map>
#include "chunk_storage.hh"
#include "ex.hh"
@ -20,17 +23,41 @@ public:
virtual ~DataSink() {}
};
/// Generic interface to seekable data output
class SeekableSink
{
public:
virtual void saveData( int64_t position, void const * data, size_t size )=0;
};
namespace __gnu_cxx
{
template<>
struct hash< Bundle::Id >
{
size_t operator()( Bundle::Id v ) const
{ return *((size_t*)(v.blob)); }
};
}
/// Restores the backup
namespace BackupRestorer {
DEF_EX( Ex, "Backup restorer exception", std::exception )
DEF_EX( exTooManyBytesToEmit, "A backup record asks to emit too many bytes", Ex )
DEF_EX( exBytesToMap, "Can't restore bytes to ChunkMap", Ex )
typedef std::set< ChunkId > ChunkSet;
typedef std::vector< std::pair < ChunkId, int64_t > > ChunkPosition;
typedef __gnu_cxx::hash_map< Bundle::Id, ChunkPosition > ChunkMap;
/// Restores the given backup
void restore( ChunkStorage::Reader &, std::string const & backupData,
DataSink *, ChunkSet * );
DataSink *, ChunkSet *, ChunkMap *, SeekableSink * );
/// Restores ChunkMap using seekable output
void restoreMap( ChunkStorage::Reader & chunkStorageReader,
ChunkMap const * chunkMap, SeekableSink *output );
/// Performs restore iterations on backupData
void restoreIterations( ChunkStorage::Reader &, BackupInfo &, std::string &, ChunkSet * );

View File

@ -12,8 +12,8 @@
#include "index_file.hh"
#include "zbackup.pb.h"
ChunkIndex::Chain::Chain( ChunkId const & id, Bundle::Id const * bundleId ):
next( 0 ), bundleId( bundleId )
ChunkIndex::Chain::Chain( ChunkId const & id, uint32_t size, Bundle::Id const * bundleId ):
next( 0 ), size( size ), bundleId( bundleId )
{
memcpy( cryptoHash, id.cryptoHash, sizeof( cryptoHash ) );
}
@ -60,7 +60,7 @@ void ChunkIndex::loadIndex( IndexProcessor & ip )
throw exIncorrectChunkIdSize();
id.setFromBlob( record.id().data() );
ip.processChunk( id );
ip.processChunk( id, record.size() );
}
ip.finishBundle( *savedId, info );
@ -87,9 +87,9 @@ void ChunkIndex::startBundle( Bundle::Id const & bundleId )
lastBundleId = &bundleId;
}
void ChunkIndex::processChunk( ChunkId const & chunkId )
void ChunkIndex::processChunk( ChunkId const & chunkId, uint32_t size )
{
registerNewChunkId( chunkId, lastBundleId );
registerNewChunkId( chunkId, size, lastBundleId );
}
void ChunkIndex::finishBundle( Bundle::Id const &, BundleInfo const & )
@ -112,7 +112,7 @@ ChunkIndex::ChunkIndex( EncryptionKey const & key, TmpMgr & tmpMgr,
}
Bundle::Id const * ChunkIndex::findChunk( ChunkId::RollingHashPart rollingHash,
ChunkInfoInterface & chunkInfo )
ChunkInfoInterface & chunkInfo, uint32_t *size )
{
HashTable::iterator i = hashTable.find( rollingHash );
@ -124,8 +124,14 @@ Bundle::Id const * ChunkIndex::findChunk( ChunkId::RollingHashPart rollingHash,
id = &chunkInfo.getChunkId();
// Check the chains
for ( Chain * chain = i->second; chain; chain = chain->next )
{
if ( chain->equalsTo( *id ) )
{
if ( size )
*size = chain->size;
return chain->bundleId;
}
}
}
return NULL;
@ -143,13 +149,13 @@ struct ChunkInfoImmediate: public ChunkIndex::ChunkInfoInterface
};
}
Bundle::Id const * ChunkIndex::findChunk( ChunkId const & chunkId )
Bundle::Id const * ChunkIndex::findChunk( ChunkId const & chunkId, uint32_t *size )
{
ChunkInfoImmediate chunkInfo( chunkId );
return findChunk( chunkId.rollingHash, chunkInfo );
return findChunk( chunkId.rollingHash, chunkInfo, size );
}
ChunkIndex::Chain * ChunkIndex::registerNewChunkId( ChunkId const & id,
ChunkIndex::Chain * ChunkIndex::registerNewChunkId( ChunkId const & id, uint32_t size,
Bundle::Id const * bundleId )
{
HashTable::iterator i =
@ -165,15 +171,15 @@ ChunkIndex::Chain * ChunkIndex::registerNewChunkId( ChunkId const & id,
}
// Create a new chain
*chain = new ( storage.allocateObjects< Chain >( 1 ) ) Chain( id, bundleId );
*chain = new ( storage.allocateObjects< Chain >( 1 ) ) Chain( id, size, bundleId );
return *chain;
}
bool ChunkIndex::addChunk( ChunkId const & id, Bundle::Id const & bundleId )
bool ChunkIndex::addChunk( ChunkId const & id, uint32_t size, Bundle::Id const & bundleId )
{
if ( Chain * chain = registerNewChunkId( id, NULL ) )
if ( Chain * chain = registerNewChunkId( id, size, NULL ) )
{
// Allocate or re-use bundle id
if ( !lastBundleId || *lastBundleId != bundleId )

View File

@ -49,7 +49,7 @@ class IndexProcessor
public:
virtual void startIndex( string const & ) = 0;
virtual void startBundle( Bundle::Id const & ) = 0;
virtual void processChunk( ChunkId const & ) = 0;
virtual void processChunk( ChunkId const &, uint32_t ) = 0;
virtual void finishBundle( Bundle::Id const &, BundleInfo const & ) = 0;
virtual void finishIndex( string const & ) = 0;
};
@ -61,10 +61,11 @@ class ChunkIndex: NoCopy, IndexProcessor
struct Chain
{
ChunkId::CryptoHashPart cryptoHash;
uint32_t size;
Chain * next;
Bundle::Id const * bundleId;
Chain( ChunkId const &, Bundle::Id const * bundleId );
Chain( ChunkId const &, uint32_t, Bundle::Id const * bundleId );
bool equalsTo( ChunkId const & id );
};
@ -100,18 +101,18 @@ public:
/// If the given chunk exists, its bundle id is returned, otherwise NULL
Bundle::Id const * findChunk( ChunkId::RollingHashPart,
ChunkInfoInterface & );
ChunkInfoInterface &, uint32_t *size = NULL );
/// If the given chunk exists, its bundle id is returned, otherwise NULL
Bundle::Id const * findChunk( ChunkId const & );
Bundle::Id const * findChunk( ChunkId const &, uint32_t *size = NULL );
/// Adds a new chunk to the index if it did not exist already. Returns true
/// if added, false if existed already
bool addChunk( ChunkId const &, Bundle::Id const & );
bool addChunk( ChunkId const &, uint32_t, Bundle::Id const & );
void startIndex( string const & );
void startBundle( Bundle::Id const & );
void processChunk( ChunkId const & );
void processChunk( ChunkId const &, uint32_t );
void finishBundle( Bundle::Id const &, BundleInfo const & );
void finishIndex( string const & );
@ -120,7 +121,7 @@ public:
private:
/// Inserts new chunk id into the in-memory hash table. Returns the created
/// Chain if it was inserted, NULL if it existed before
Chain * registerNewChunkId( ChunkId const & id, Bundle::Id const * );
Chain * registerNewChunkId( ChunkId const & id, uint32_t, Bundle::Id const * );
};
#endif

View File

@ -30,7 +30,7 @@ Writer::~Writer()
bool Writer::add( ChunkId const & id, void const * data, size_t size )
{
if ( index.addChunk( id, getCurrentBundleId() ) )
if ( index.addChunk( id, size, getCurrentBundleId() ) )
{
// Added to the index? Emit to the bundle then
if ( getCurrentBundle().getPayloadSize() + size >
@ -211,6 +211,22 @@ Reader::Reader( Config const & configIn,
maxCacheSizeBytes / 1048576 );
}
Bundle::Id const * Reader::getBundleId( ChunkId const & chunkId, size_t & size )
{
uint32_t s;
if ( Bundle::Id const * bundleId = index.findChunk( chunkId, &s ) )
{
size = s;
return bundleId;
}
else
{
string blob = chunkId.toBlob();
throw exNoSuchChunk( toHex( ( unsigned char const * ) blob.data(),
blob.size() ) );
}
}
void Reader::get( ChunkId const & chunkId, string & data, size_t & size )
{
if ( Bundle::Id const * bundleId = index.findChunk( chunkId ) )

View File

@ -124,6 +124,8 @@ public:
Reader( Config const &, EncryptionKey const &, ChunkIndex & index,
string const & bundlesDir, size_t maxCacheSizeBytes );
Bundle::Id const * getBundleId( ChunkId const &, size_t & size );
/// Loads the given chunk from the store into the given buffer. May throw file
/// and decompression exceptions. 'data' may be enlarged but won't be shrunk.
/// The size of the actual chunk would be stored in 'size'

View File

@ -22,8 +22,8 @@ UnbufferedFile::UnbufferedFile( char const * fileName, Mode mode )
throw( exCantOpen )
{
int flags = ( mode == WriteOnly ? ( O_WRONLY | O_CREAT | O_TRUNC ) :
O_RDONLY );
int flags = ( mode == ReadWrite ? ( O_RDWR | O_CREAT ) :
( mode == WriteOnly ? ( O_WRONLY | O_CREAT | O_TRUNC ) : O_RDONLY ) );
#if !defined( __APPLE__ ) && !defined( __OpenBSD__ ) && !defined(__FreeBSD__) && !defined(__CYGWIN__)
flags |= O_LARGEFILE;
#endif
@ -100,6 +100,12 @@ void UnbufferedFile::seekCur( Offset offset ) throw( exSeekError )
throw exSeekError();
}
void UnbufferedFile::seek( Offset offset ) throw( exSeekError )
{
if ( lseek64( fd, offset, SEEK_SET ) < 0 )
throw exSeekError();
}
UnbufferedFile::~UnbufferedFile() throw()
{
close( fd );

View File

@ -31,7 +31,8 @@ public:
enum Mode
{
ReadOnly,
WriteOnly
WriteOnly,
ReadWrite
};
typedef int64_t Offset;
@ -53,6 +54,9 @@ public:
/// Seeks to the given offset, relative to the current file offset
void seekCur( Offset ) throw( exSeekError );
/// Seeks to the given offset, relative to the beginning
void seek( Offset ) throw( exSeekError );
~UnbufferedFile() throw();
private:

View File

@ -166,6 +166,8 @@ invalid_option:
" init <storage path> - initializes new storage\n"
" backup <backup file name> - performs a backup from stdin\n"
" restore <backup file name> - restores a backup to stdout\n"
" restore <backup file name> <output file name> -\n"
" restores a backup to file using two-pass \"cacheless\" process\n"
" export <source storage path> <destination storage path> -\n"
" performs export from source to destination storage\n"
" import <source storage path> <destination storage path> -\n"
@ -229,15 +231,18 @@ invalid_option:
if ( strcmp( args[ 0 ], "restore" ) == 0 )
{
// Perform the restore
if ( args.size() != 2 )
if ( args.size() != 2 && args.size() != 3 )
{
fprintf( stderr, "Usage: %s %s <backup file name>\n",
fprintf( stderr, "Usage: %s %s <backup file name> [output file name]\n",
*argv , args[ 0 ] );
return EXIT_FAILURE;
}
ZRestore zr( ZRestore::deriveStorageDirFromBackupsFile( args[ 1 ] ),
passwords[ 0 ], config );
zr.restoreToStdin( args[ 1 ] );
if ( args.size() == 3 )
zr.restoreToFile( args[ 1 ], args[ 2 ] );
else
zr.restoreToStdin( args[ 1 ] );
}
else
if ( strcmp( args[ 0 ], "export" ) == 0 || strcmp( args[ 0 ], "import" ) == 0 )

View File

@ -125,6 +125,50 @@ ZRestore::ZRestore( string const & storageDir, string const & password,
{
}
void ZRestore::restoreToFile( string const & inputFileName, string const & outputFileName )
{
BackupInfo backupInfo;
BackupFile::load( inputFileName, encryptionkey, backupInfo );
string backupData;
// Perform the iterations needed to get to the actual user backup data
BackupRestorer::restoreIterations( chunkStorageReader, backupInfo, backupData, NULL );
UnbufferedFile f( outputFileName.data(), UnbufferedFile::ReadWrite );
struct FileWriter: public SeekableSink
{
UnbufferedFile *f;
FileWriter( UnbufferedFile *f ):
f( f )
{
}
virtual void saveData( int64_t position, void const * data, size_t size )
{
f->seek( position );
f->write( data, size );
}
} seekWriter( &f );
BackupRestorer::ChunkMap map;
BackupRestorer::restore( chunkStorageReader, backupData, NULL, NULL, &map, &seekWriter );
BackupRestorer::restoreMap( chunkStorageReader, &map, &seekWriter );
Sha256 sha256;
string buf;
buf.resize( 0x100000 );
size_t r;
f.seek( 0 );
while ( ( r = f.read( (void*)buf.data(), buf.size() ) ) > 0 )
sha256.add( buf.data(), r );
if ( sha256.finish() != backupInfo.sha256() )
throw exChecksumError();
}
void ZRestore::restoreToStdin( string const & inputFileName )
{
if ( isatty( fileno( stdout ) ) )
@ -151,7 +195,7 @@ void ZRestore::restoreToStdin( string const & inputFileName )
}
} stdoutWriter;
BackupRestorer::restore( chunkStorageReader, backupData, &stdoutWriter, NULL );
BackupRestorer::restore( chunkStorageReader, backupData, &stdoutWriter, NULL, NULL, NULL );
if ( stdoutWriter.sha256.finish() != backupInfo.sha256() )
throw exChecksumError();
@ -342,7 +386,7 @@ void ZCollector::gc( bool gcDeep )
BackupRestorer::restoreIterations( chunkStorageReader, backupInfo, backupData, &collector.usedChunkSet );
BackupRestorer::restore( chunkStorageReader, backupData, NULL, &collector.usedChunkSet );
BackupRestorer::restore( chunkStorageReader, backupData, NULL, &collector.usedChunkSet, NULL, NULL );
}
verbosePrintf( "Checking bundles...\n" );

View File

@ -27,7 +27,10 @@ public:
ZRestore( string const & storageDir, string const & password,
Config & configIn );
/// Restores the data to stdin
/// Restores the data to file
void restoreToFile( string const & inputFileName, string const & outputFileName );
/// Restores the data to stdout
void restoreToStdin( string const & inputFileName );
};