#!/usr/bin/env python # TODO # * svn diff, svn status # * sar # * Fix the host/port mess in the directory # * Move most of this into a module so there can be other shells # * effective_cache_size, autovacuum, log_min_duration_statement? # * Put output files in their own directory and nuke it at the beginning # Results # * Resource graphs # * Expensive query analysis (pqa) # * Web server stats (average requests) from __future__ import with_statement from config import * from util import * import service import strategy import basic import apache import os import subprocess, time import re import logging from logging import debug, info, warning, error, critical, exception logging.getLogger().setLevel(logging.INFO) class SimpleService(service.Service): def __init__(self, name, host, waitRe, waitSecs = 10, cmd = None): service.Service.__init__(self, name, host) self.__basecmd = cmd self.__p = None self.__waitRe = waitRe self.__waitSecs = waitSecs def link(self, d, cmd = None): assert self.__basecmd or cmd assert not (self.__basecmd and cmd) if cmd: self.__basecmd = cmd self.__cmd = self.__basecmd.nonohup().ssh(self.host) self.__logpath = os.path.join(d[basic.LogSetting].logdir, self.name + ".log") def start(self): assert not self.__p with Progress("Starting %s" % self.name): stdout = file(self.__logpath, "w+") cmd = self.__cmd.redirect( stdin = subprocess.PIPE, stdout = stdout, stderr = subprocess.STDOUT) self.__p = cmd.start() self.__pcmd = cmd logfile = file(self.__logpath, "r") for i in range(0, 10 * self.__waitSecs): if self.__p.poll(): raise CmdError(cmd, self.__p.wait()) logfile.seek(0) data = logfile.read() if re.search(self.__waitRe, data): break time.sleep(0.1) else: warning("Timed out waiting for %s in %s", `self.__waitRe`, self.__logpath) logfile.close() def stop(self): if self.__p: with Progress("Stopping %s" % self.name): self.__p.stdin.close() self.__p.wait() self.__p = None def wait(self): assert self.__p with Progress("Waiting for %s" % self.name): code = self.__p.wait() if code: raise CmdError(self.__pcmd, code) class PostgresDBSetting(service.Setting): __config__ = ["dbname", "user"] def __init__(self, dbname, user): service.Setting.__init__(self) self.dbname, self.user = dbname, user def link(self, d): self.host = d[Postgres].host def conninfo(self, fromHost): ci = "dbname=%s user=%s" % (self.dbname, self.user) if self.host == fromHost: return ci + " host=/tmp" return ci + " host=%s" % self.host class Postgres(SimpleService): __config__ = ["pgpath", "options", "flushBC", "vacuum"] def __init__(self, host, pgpath, options = None, flushBC = False, vacuum = False): self.pgpath = pgpath if options == None: options = {} self.options = options self.flushBC = flushBC self.vacuum = vacuum name = "postgres.%s" % host self.__confPath = os.path.join(DESTDIR, "postgresql.conf") cmd = Cmd(os.path.join(pgpath, "bin", "postgres"), "-D", os.path.join(pgpath, "data"), "-c", "config_file=%s" % self.__confPath) SimpleService.__init__(self, name, host, "database system is ready", 60, cmd = cmd) def link(self, d): SimpleService.link(self, d) self.__dbs = d[PostgresDBSetting] self.__ls = d[basic.LogSetting] def setup(self): SimpleService.setup(self) # XXX This is a mess. Do it more like how I did it for # Apache. It would be nice if some of these could be recorded # as options even if we use the defaults (max_connections, # shared_buffers, fsync, checkpoint_segments, autovacuum, # stats_start_collector, stats_row_level) opts = self.options.copy() for dk, dv in [("listen_addresses", "*"), # XXX Check that this is >= Apache limit ("max_connections", 150), ("shared_buffers", (32, "MB")), ("log_line_prefix", "%t %c:%x: "), # Performance ("fsync", False), ("checkpoint_segments", 15), # Required by txcache ("enable_bitmapscan", False), ("enable_tidscan", False), ("default_transaction_isolation", "serializable"), # Minimize perturbation ("autovacuum", False), ("stats_start_collector", False), # XXX Necessary given the above? ("stats_row_level", True), # Locale settings ("datestyle", "iso, mdy"), ("lc_messages", "C"), ("lc_monetary", "C"), ("lc_numeric", "C"), ("lc_time", "C"), # TPC-W specific # XXX Put these somewhere else ("escape_string_warning", False), # A 100/1000 TPC-W DB takes 47 relations and # 3328 page slots. However, this is # proportional to the total of all databases, # and a relation only requires 70 bytes and a # page slot only 6, so there's no harm in using # fairly large numbers. ("max_fsm_relations", 1000), ("max_fsm_pages", 1024*1024/6)]: opts.setdefault(dk, dv) config = [] for k, v in opts.iteritems(): if isinstance(v, bool) or isinstance(v, int) or isinstance(v, float): vs = str(v) elif isinstance(v, str): vs = "'%s'" % v.replace("'", "''") elif (isinstance(v, tuple) and len(v) == 2 and isinstance(v[0], int) and isinstance(v[1], str)): vs = "%d%s" % v else: raise ValueError("Setting %s value %s has unknown %s", (k, v, type(v))) config.append("%s = %s" % (k, vs)) self.host.writeFile(self.__confPath, "\n".join(config)) def start(self): if self.flushBC: self.host.flushBC() SimpleService.start(self) if self.vacuum: with Progress("Vacuuming database"): # We run without a user name because only the database # owner can vacuum. cmd = (Cmd(os.path.join(self.pgpath, "bin", "psql"), "-v", "ON_ERROR_STOP=1", self.__dbs.dbname) .ssh(self.host).redirect(stdin = subprocess.PIPE)) p = cmd.start() print >> p.stdin, "VACUUM FULL;" print >> p.stdin, "REINDEX DATABASE %s;" % self.__dbs.dbname print >> p.stdin, "CLUSTER;" print >> p.stdin, "ANALYZE;" p.stdin.close() code = p.wait() if code: raise CmdError(cmd, code) def stop(self): SimpleService.stop(self) # Save configuration self.host.getFile(self.__confPath, os.path.join(self.__ls.logdir, "%s.conf" % self.name)) class Pincushion(SimpleService): __config__ = ["maxFreshness", "port"] def __init__(self, host, maxFreshness = 120, port = 16000): self.maxFreshness = maxFreshness self.port = port name = "pincushion.%s.%d" % (host, port) SimpleService.__init__(self, name, host, "Starting pincushion") def link(self, d): pg = d[PostgresDBSetting] cmd = Cmd(os.path.join(DESTDIR, "pincushion"), "-h", pg.host, "-u", pg.user, "-d", pg.dbname, "-s", self.maxFreshness, "-p", self.port) SimpleService.link(self, d, cmd) def setup(self): self.host.sendSrcFiles("src/pincushion/pincushion") SimpleService.setup(self) def nodeLine(self): return "%s:%d" % (self.host, self.port) class CacheServer(SimpleService): __config__ = ["size", "port"] def __init__(self, host, size = "128M", port = 16001): self.size = size self.port = port cmd = Cmd(os.path.join(DESTDIR, "server"), "-s", size, "-p", port) name = "server.%s.%d" % (host, port) SimpleService.__init__(self, name, host, "Created .* cache", cmd = cmd) def link(self, d): SimpleService.link(self, d) self.__ls = d[basic.LogSetting] def setup(self): self.host.sendSrcFiles("src/server/server") SimpleService.setup(self) def nodeLine(self): return "%s:%d" % (self.host, self.port) def stop(self): with Progress("Retrieving cache stats from %s" % self.name): Cmd(os.path.join(SRCDIR, "src", "support", "serverstats"), "%s:%d" % (self.host, self.port), ">", os.path.join(self.__ls.logdir, "cachestats.%s.%d" % (self.host, self.port))).sh().attempt() SimpleService.stop(self) class PHPClient(service.Service): __config__ = ["pinPolicy", "bypass", "freshness"] def __init__(self, host, pinPolicy = "latest bounded pin with variety (5 secs)", bypass = False, freshness = 30): self.pinPolicy = pinPolicy self.bypass = bypass self.freshness = freshness service.Service.__init__(self, "phpclient.%s" % host, host) def link(self, d): self.__pincushion = d[Pincushion] self.__cacheServers = d.getService(CacheServer) self.__ls = d[basic.LogSetting] def setup(self): self.host.sendSrcFiles("src/phpclient/txcache.so", "src/phpclient/txcache.ini") with Progress("Installing PHP txcache on %s" % self.host): edir = (Cmd("php-config", "--extension-dir") .ssh(self.host).runRead().strip()) Cmd("ln", "-sf", os.path.join(DESTDIR, "txcache.so"), os.path.join(edir, "txcache.so")).sudo().ssh(self.host).run() Cmd("ln", "-sf", os.path.join(DESTDIR, "txcache.ini"), "/etc/php5/conf.d/txcache.ini").sudo().ssh(self.host).run() def setupNodesTxt(self, path): data = [self.pinPolicy, self.__pincushion.nodeLine()] data.extend(s.nodeLine() for s in self.__cacheServers.values()) self.host.writeFile(path, "\n".join(data)) def start(self): # Clear clogs and stats. Really we should do this in setup, # but we must do this after Apache has been cleaned up. with Progress("Deleting old clogs on %s" % self.host): if self.host.pathExists("/tmp/clog"): Cmd("find", "/tmp/clog", "-delete").sudo().ssh(self.host).run() if self.host.pathExists("/tmp/stats"): Cmd("find", "/tmp/stats", "-delete").sudo().ssh(self.host).run() def stop(self): # Gather clogs and stats if self.host.pathExists("/tmp/clog"): self.host.getFile("/tmp/clog/", os.path.join(self.__ls.logdir, "clog.%s" % self.host)) if self.host.pathExists("/tmp/stats"): self.host.getFile("/tmp/stats/", os.path.join(self.__ls.logdir, "stats.%s" % self.host)) class TPCWDBSetting(PostgresDBSetting): __config__ = ["popEBs", "popItems"] def __init__(self, user, popEBs, popItems): PostgresDBSetting.__init__(self, "tpcw_%d_%d" % (popEBs, popItems), user) self.popEBs = popEBs self.popItems = popItems # 4.3 if popItems not in [1000, 10000, 100000, 1000000, 10000000]: warning("Number of items %d does not conform to spec" % popItems) class TPCWServer(service.Service): __config__ = ["txcache"] def __init__(self, host, txcache): self.txcache = txcache service.Service.__init__(self, "tpcwserver.%s" % host, host) def link(self, d): d.get(apache.Apache, self.host).addPart("tpcw") self.__dbs = d[TPCWDBSetting] if self.txcache: self.__phpClient = d.get(PHPClient, self.host) def __disable(self): Cmd("rm", "-f", os.path.join(DESTDIR, "tpcw-run")).ssh(self.host).run() def setup(self): # Send TPCW server code self.__disable() self.host.sendSrcFiles("src/tpc-w-php/html/tpcw") # Generate TPCW server configuration configinc = ["") self.host.writeFile(os.path.join(DESTDIR, "tpcw", "config.inc"), "\n".join(configinc)) # Generate SSL key keyPath = os.path.join(DESTDIR, "tpcw.pem") if not self.host.pathExists(keyPath): Cmd("openssl", "req", "-new", "-newkey", "rsa:1024", "-days", "365", "-nodes", "-x509", "-keyout", keyPath, "-out", keyPath , "-subj", "/CN=%s" % self.host).ssh(self.host).run() # Generate nodes.txt if self.txcache: self.__phpClient.setupNodesTxt(os.path.join(DESTDIR, "tpcw", "nodes.txt")) def start(self): # Expose the TPC-W code to Apache Cmd("ln", "-sf", os.path.join(DESTDIR, "tpcw"), os.path.join(DESTDIR, "tpcw-run")).ssh(self.host).run() def stop(self): self.__disable() class TPCWRBE(SimpleService): MIX_BROWSING = 1 MIX_SHOPPING = 2 MIX_ORDERING = 3 __config__ = ["mix", "runEBs", "upSecs", "runSecs", "downSecs"] def __init__(self, host, mix, runEBs, upSecs, runSecs, downSecs): if mix not in [self.MIX_BROWSING, self.MIX_SHOPPING, self.MIX_ORDERING]: raise ValueError("Unknown mix %d" % mix) # 5.5.1.1 and 5.5.3.1 for name, low, val in [("Up-ramp", 10*60, upSecs), ("Measurement interval", 30*60, runSecs), ("Down-ramp", 5*60, downSecs)]: if val < low: warning("%s of %d < required minimum of %d" % (name, val, low)) self.mix = mix self.runEBs = runEBs self.upSecs, self.runSecs, self.downSecs = upSecs, runSecs, downSecs name = "tpcwrbe.%s" % host self.outFile = os.path.join(DESTDIR, name + ".out") self.wips = None SimpleService.__init__(self, name, host, "All of the EBs are alive!") def link(self, d): self.__ls = d[basic.LogSetting] dbs = d[TPCWDBSetting] if dbs.popEBs != self.runEBs: msg = "Database populated for %d EB's, running with %d EB's" % \ (dbs.popEBs, self.runEBs) if dbs.popEBs > self.runEBs: warning(msg) else: critical(msg) raise ValueError("runEBs %d > dbs.popEBs %d" % (self.runEBs, dbs.popEBs)) # Based on RBE/run_RBE_100EB_1000ITEM.sh servers = d.getService(TPCWServer).values() cmd = Cmd("java", "-cp", os.path.join(DESTDIR, "rbe.jar"), "rbe.RBE", "-EB", "rbe.EBTPCW%dFactory" % self.mix, self.runEBs, "IAABBBCHNOOPSSS", "-HOSTS", ",".join(str(s.host) for s in servers), "-OUT", self.outFile, "-DEBUG", 0, "-ITEM", dbs.popItems, "-CUST", dbs.popEBs * 2880, "-RU", self.upSecs, "-MI", self.runSecs, "-RD", self.downSecs) SimpleService.link(self, d, cmd) def setup(self): SimpleService.setup(self) self.host.sendSrcFiles("src/tpc-w-php/RBE/rbe.jar") Cmd("rm", "-f", self.outFile).ssh(self.host).run() def stop(self): SimpleService.stop(self) if self.host.pathExists(self.outFile): data = self.host.readFile(self.outFile) # It creates a zero-sized file at startup. Ignore it if # there's no data. if len(data): path = os.path.join(self.__ls.logdir, self.name + ".out") f = file(path, "w") f.write(data) f.close() self.wips = RBEParser(file(path)).getWIPS() class RBEParser(object): def __init__(self, fileobj): self.args = {} self.dat = {} self.__read(fileobj) while self.__peek() != "": self.__doStatement() del self.__toks, self.__pos def getWIPS(self): ru, mi = self.args["RU"], self.args["MI"] values = [x for x, in self.dat["wips"][ru:ru+mi]] return float(sum(values))/len(values) def __read(self, f): self.__toks = [] self.__pos = 0 while True: l = f.readline() if len(l) == 0: break if "%" in l: l, _, c = l.partition("%") self.__doComment(c) for sym in [";", "[", "]", "="]: if sym in l: l = l.replace(sym, " " + sym + " ") self.__toks.extend(l.strip().split()) self.__toks.append("\n") self.__toks.append("") def __peekPos(self, nl): pos = self.__pos + 1 if pos == len(self.__toks): pos -= 1 if nl: return pos while self.__toks[pos] == "\n": pos += 1 return pos def __next(self, nl = False): self.__pos = self.__peekPos(nl) return self.__toks[self.__pos] def __peek(self, nl = False): return self.__toks[self.__peekPos(nl)] def __expect(self, tok): got = self.__next() if tok != got: raise RuntimeError("Expected %s, got %s" % (tok, got)) def __doComment(self, c): # Some of the arguments are actually relevant toks = c.split() if not toks[0].startswith("-"): return k = toks[0][1:] if toks[-1] == "(default)": toks = toks[:-1] if toks[-1].isdigit(): v = int(toks[-1]) elif toks[-1].replace(".", "", 1).isdigit(): v = float(toks[-1]) elif toks[-1] == "true": v = True elif toks[-1] == "false": v = False else: v = None if v != None: self.args[k] = v def __doStatement(self): t = self.__next() if t == "function": while self.__next(True) not in ["\n", ""]: pass return self.__expect("=") dct = self.dat assert t.startswith("dat.") name = t[4:] while "." in name: n, _, name = name.partition(".") dct = dct.setdefault(n, {}) dct[name] = self.__doExpr() self.__expect(";") def __doExpr(self): if self.__peek() != "[": return self.__doValue() self.__expect("[") table = [] while self.__peek() not in ["]", ""]: row = [] table.append(row) while self.__peek(True) == "\n": self.__next(True) while self.__peek(True) not in ["\n", "]", ""]: row.append(self.__doValue()) self.__expect("]") return table def __doValue(self): return float(self.__next()) class WaitForEnter(service.Service): def __init__(self): # XXX Still need to pass host service.Service.__init__(self, "WaitForEnter", None) def wait(self): raw_input("Press enter to continue...") class EZTPCW(service.Directory): def __init__(self, suiteName): service.Directory.__init__(self) self.__suiteName = suiteName def run(self): self += self.__createLogSetting() # Push nonohup to all hosts # XXX Dumb for host in set(s.host for s in self.allServices()): host.sendSrcFiles("src/bench/nonohup") # XXX Record the running status # Set up self.link() self[basic.LogSetting].saveConfig() self.setup() # Run! seq = [Postgres, Pincushion, CacheServer, apache.Apache, PHPClient, TPCWServer, TPCWRBE] success = False try: for s in seq: if s in self: self.startAll(s) if seq[-1] in self: self[seq[-1]].wait() else: raw_input("Press enter to continue...") success = True except CmdError, e: e.detailError() raw_input("Press enter to continue...") raise finally: for s in reversed(seq): try: self.stopAll(s) except Exception, e: if success: raise else: # If we already failed, sweep this exception # under the rug import traceback traceback.print_exc() if TPCWRBE in self: info("WIPS: %s", self[TPCWRBE].wips) def getResult(self): return self[TPCWRBE].wips def __createLogSetting(self): res = os.path.join(RESULTDIR, self.__suiteName) tpcwdb = self[TPCWDBSetting] res += "-tpcw-%deb-%ditem" % (tpcwdb.popEBs, tpcwdb.popItems) tpcw = self[TPCWServer] pg = self[Postgres] if not tpcw.txcache: res += "-notxcache" addpg = True else: client = self[PHPClient] if client.bypass: addpg = True else: assert pg.pgpath == TXPG addpg = False res += "-txcache-%dfreshness" % client.freshness if addpg: if pg.pgpath == TXPG: res += "-modifiedpg" else: res += "-stockpg" rbe = self[TPCWRBE] res = os.path.join(res, "%d" % rbe.runEBs) assert not os.path.exists(res) return basic.LogSetting(res) def EZTPCWSuite(): suiteName = time.strftime("%Y%m%d-%H%M%S") def mkResult(): return EZTPCW(suiteName) return mkResult def test(): #localhost = Host("localhost") localhost = Host(None) for ebs, d in strategy.sweep(EZTPCWSuite(), 200, 10, maxInput = 100): d += Postgres(localhost, TXPG) d += TPCWDBSetting("apache", 200, 1000) d += Pincushion(localhost) d += CacheServer(localhost) d += apache.Apache(localhost) d += apache.ApachePreforkSettting() d += PHPClient(localhost) d += TPCWServer(localhost, True) d += TPCWRBE(localhost, TPCWRBE.MIX_SHOPPING, ebs, 10*60, 30*60, 5*60) d.run() if __name__ == "__main__": test()