diff options
Diffstat (limited to 'lurker/libesort/Master.cpp')
-rw-r--r-- | lurker/libesort/Master.cpp | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/lurker/libesort/Master.cpp b/lurker/libesort/Master.cpp new file mode 100644 index 0000000..b1b3327 --- /dev/null +++ b/lurker/libesort/Master.cpp @@ -0,0 +1,226 @@ +/* $Id: Master.cpp 1649 2009-10-19 14:35:01Z terpstra $ + * + * Master.cpp - Coordinate commit+read interface + * + * Copyright (C) 2002 - Wesley W. Terpstra + * + * License: GPL + * + * Authors: 'Wesley W. Terpstra' <wesley@terpstra.ca> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#define _FILE_OFFSET_BITS 64 + +#include "io.h" + +#include "Master.h" +#include "Source.h" +#include "Failer.h" +#include "Transaction.h" + +#include <list> +#include <iostream> +#include <cerrno> +#include <cassert> + +namespace ESort +{ + +Master::Master(const Parameters& p) + : view(p), memory(), man() +{ +} + +Master::~Master() +{ +} + +int Master::init(const string& db, int mode) +{ + if (man.dbopen(view, db, mode) != 0) return -1; + + return 0; +} + +struct CleanupHelper +{ + public: + DbMan* man; + int fd; + string id; + + CleanupHelper(DbMan* man_) : man(man_), fd(-1) { } + ~CleanupHelper() + { + if (fd != -1) + { + close(fd); + man->killSub(id); + } + } +}; + +int Master::commit() +{ + if (memory.empty()) return 0; + + CleanupHelper scopedData(&man); + scopedData.fd = man.openNew(scopedData.id); + if (scopedData.fd == -1) return -1; + + Transaction tran(scopedData.fd, &view.params); + Merger merge(view.params.unique(), true); // forward + auto_ptr<Source> m(memory.openMemory("", true)); // forward + int ok = m->advance(); + assert (ok != -1); // memory was already checked -> not empty! + merge.merge(m.get()); + m.release(); // seperate in case of exception above + + // What category is our RAM? + int category = memory.category(view.params); +// std::cout << "Category: " << category << std::endl; + + View::Files::iterator kill; + for (kill = view.files.begin(); kill != view.files.end(); ++kill) + { +// std::cout << "Merging: " << kill->category << std::endl; + if (kill->category > category) + break; // keep this one + + // If we have at least one of the same size, roll up as in + // binary addition + if (kill->category == category) + category++; + + auto_ptr<Source> f( + const_cast<File*>(&*kill)->openBlock(0, true)); + if (!f.get()) return -1; // something broke? + if (f->advance() == -1) return -1; // always has keys?! + merge.merge(f.get()); + f.release(); // above might throw + } + + // The list of ids which we commit + std::set<string> keepIds; + keepIds.insert(scopedData.id); // the new id + + // keep all of these ids + for (View::Files::iterator keep = kill; keep != view.files.end(); ++keep) + keepIds.insert(keep->id); + + if (merge.skiptill("", true) != 0) + return -1; // must work?! ram has entries + + int dup; + while ((dup = merge.advance()) != -1) + { // there is stuff to merge + if (tran.write(merge.key.length(), dup, merge.key.c_str()) != 0) + return -1; + } + if (tran.finish() != 0) + return -1; + + // Commit the summary information + if (man.commit(view.params, keepIds) != 0) + return -1; + + // done with the buffer + memory.flush(); + + // Queue useless files for delete + std::set<string> killIds; + for (View::Files::iterator zap = view.files.begin(); zap != kill; ++zap) + killIds.insert(zap->id); + + // Purge useless files from the View (closes them) + view.files.erase(view.files.begin(), kill); + + // Now that they are closed, delete them + for (std::set<string>::iterator del = killIds.begin(); del != killIds.end(); ++del) + man.killSub(*del); + + // Move the transaction to a File; thus becoming a part of the view + int fd = scopedData.fd; + scopedData.fd = -1; + view.files.insert(File(scopedData.id, fd, &view.params)); + + return 0; +} + +int Master::insert(const string& k) +{ + assert (k.length() < view.params.keySize()); + + // always succeeds + memory.insert(k); + return 0; +} + +auto_ptr<Walker> Master::seek(const string& k, Direction dir) +{ + assert (dir == Forward || dir == Backward); + + auto_ptr<Merger> out(new Merger(view.params.unique(), dir == Forward)); + + if (view.rawseek(out.get(), k, dir == Forward) != 0) + return auto_ptr<Walker>(new Failer(errno)); + + auto_ptr<Source> s = memory.openMemory(k, dir == Forward); + assert (s.get()); // always works + + // only possible error is eof + if (s->advance() != -1) out->merge(s.release()); + // else kill it on scope out + + if (out->skiptill(k, dir == Forward) == -1) + return auto_ptr<Walker>(new Failer(errno)); + + return auto_ptr<Walker>(out); +} + +auto_ptr<Walker> Master::seek(const string& pfx, const string& k, Direction dir) +{ + assert (dir == Forward || dir == Backward); + + auto_ptr<PrefixMerger> out(new PrefixMerger(view.params.unique(), dir == Forward)); + + if (view.rawseek(out.get(), pfx + k, dir == Forward) != 0) + return auto_ptr<Walker>(new Failer(errno)); + + auto_ptr<Source> s = memory.openMemory(k, dir == Forward); + assert (s.get()); // always works + + // only possible error is eof + if (s->advance() != -1) out->merge(s.release()); + // else kill it on scope out + + if (out->skiptill(pfx, k, dir == Forward) == -1) + return auto_ptr<Walker>(new Failer(errno)); + + return auto_ptr<Walker>(out); +} + +auto_ptr<Writer> Writer::opendb(const string& db, const Parameters& p, int mode) +{ + auto_ptr<Master> m(new Master(p)); + + if (m->init(db, mode) != 0) + return auto_ptr<Writer>(0); + + return auto_ptr<Writer>(m); +} + +} |