#!/usr/bin/env python2.6 # -*- python -*- # # Farm reservation system. # Dan Ports # # $Revision$ $Date$ # # This script tracks in a central database users of the farm machines, # either shared or exclusive, and notifies the world of reservations # via a webpage, a MOTD, wall(1), and email. # # This script is not generally intended to be invoked directly but # rather via a wrapper script that invokes it on the central server, # providing the right username via the -u argument. # # List of hosts managed by this script. This is mostly just a sanity # check to catch typos. MANAGED_HOSTS = ["farm" + str(x) for x in range(1, 15)] LOCKTABLE="/u/farmres/farmres/farmres.table" LOCKFILE="/u/farmres/farmres/farmres.lock" HTMLFILE="/var/www/farmres.html" URL="http://flute.csail.mit.edu/farmres.html" NOTIFICATION_EMAIL="farmers@csail.mit.edu" import sys, os, optparse, fcntl, time, cPickle, subprocess from collections import defaultdict, namedtuple from datetime import datetime # # Lock and lock table structures # class Lock(namedtuple("Lock", "resource user description start until exclusive")): pass class LockTable: def __init__(self): self.locks = defaultdict(list) @classmethod def load(cls, filename, createIfNotFound=True): try: x = cPickle.load(file(filename)) return x except IOError: if not createIfNotFound: raise return LockTable() def store(self, filename): cPickle.dump(self, file(filename, "w")) def add(self, lock): self.locks[lock.resource] += [lock] def remove(self, lock): self.locks[lock.resource].remove(lock) def locksByHost(self, resource): return self.locks[resource] def checkConflict(self, lock): for other in self.locks[lock.resource]: if other.exclusive or lock.exclusive: return other return False def dump(self, verbose): for host in MANAGED_HOSTS: locks = self.locks[host] print "%s: " % host, if len(locks) == 0: print "free" else: locks = sorted(locks, cmp=(lambda x, y: cmp(y.exclusive, x.exclusive))) if locks[0].exclusive: print "exclusive: ", else: print "shared: ", print " ".join([x.user for x in locks]) if verbose: for lock in locks: print " %s (%s): %s" % (lock.user, ("exclusive" if lock.exclusive else "shared"), lock.description) print " until %s" % lock.until def dumpHTML(self): html = "" html += "" for host in MANAGED_HOSTS: locks = self.locks[host] if len(locks) == 0: html += """""" % host else: locks = sorted(locks, cmp=(lambda x, y: cmp(y.exclusive, x.exclusive))) hostCell = "" % (len(locks), host) for x in locks: mode = "Shared" modeColor = "yellow" if x.exclusive: mode = "Exclusive" modeColor = "red" html += """%s""" % (hostCell, modeColor, mode, x.user, x.description, x.until) hostCell = "" html += "
HostStatusUserJob descriptionIn use until
%sFree%s
%s%s%s%s
" return html # # Notifiers # def notifyHTML(locktable): htmlFile = file(HTMLFILE, "w") htmlFile.write(locktable.dumpHTML()) def notifyMOTD(locktable, locks): for host in [x.resource for x in locks]: print "Updating motd on", host motd = "" hostLocks = locktable.locksByHost(host) hostLocks = list(sorted(hostLocks, cmp=(lambda x, y: cmp(y.exclusive, x.exclusive)))) hasLocks = (len(hostLocks) > 0) if hasLocks: hasExclusive = (len([x for x in hostLocks if x.exclusive]) > 0) if hasExclusive: motd += "*"*72 + "\n" motd += "*" + "\n" motd += "* THIS MACHINE IS EXCLUSIVELY LOCKED!\n" motd += "*" + "\n" motd += "* This probably means it is in use for performance experiments,\n" motd += "* and you are risking someone's wrath by using it!\n" motd += "*" + "\n" motd += "*"*72 + "\n" else: motd += "* This machine is non-exclusively locked, so should not be used for\n" motd += "* performance-critical experiments.\n" motd += "\n" motd += "Current users: \n" for lock in hostLocks: motd += " %s: %s\n" % (lock.user, lock.description) motd += " in use since %s, reserved until %s\n" % ( datetime.fromtimestamp(lock.start).strftime("%Y-%m-%d %H:%M:%S"), lock.until) else: motd += "This machine is not currently in use.\n" motd += "\n" motd += "To see all farm reservation status, go to\n%s\n" % URL motd += "\n" p = subprocess.Popen(["/usr/bin/ssh", host, "cat", ">", "/etc/motd.farmres"], stdin=subprocess.PIPE, shell=False) p.stdin.write(motd) p.stdin.close() p.wait() if p.returncode != 0: raise IOError("Got returncode %d updating motd on %s" % (p.returncode, host)) def notifyWall(locktable, locks, acquired): for lock in locks: host = lock.resource print "Sending wall to", host message = "Farm reservation: \n" if acquired: message += " %s has acquired a%s lock on this host\n" % (lock.user, "n EXCLUSIVE" if lock.exclusive else " shared") message += " Job description: %s\n" % lock.description message += " In use until: %s\n" % lock.until else: message += " %s has released lock on this host\n" % (lock.user) message += "\nSee %s for more information\n" % URL p = subprocess.Popen(["/usr/bin/ssh", host, "wall"], stdin=subprocess.PIPE, shell=False) p.stdin.write(message) p.stdin.close() p.wait() if p.returncode != 0: raise IOError("Got returncode %d sending wall on %s" % (p.returncode, host)) def notifyEmail(locktable, locks, acquired, unparsedHosts): if len(locks) == 0: return op = "" if acquired: if locks[0].exclusive: op = "acquired exclusive" else: op = "acquired shared" else: op = "released" subject = "%s %s lock on %s" % (locks[0].user, op, unparsedHosts) message = "Farm reservation:\n\n" message += "%s has %s lock(s)\n" % (locks[0].user, op) message += " Job description: %s\n" % locks[0].description if acquired: message += " In use until: %s\n" % locks[0].until message += "\nHosts affected:\n" for lock in locks: message += " %s\n" % lock.resource message += "\n\nSee %s for full farm status\nand more information\n" % URL p = subprocess.Popen(["/usr/bin/mail", "-s", subject, NOTIFICATION_EMAIL], stdin=subprocess.PIPE, shell=False) p.stdin.write(message) p.stdin.close() p.wait() if p.returncode != 0: raise IOError("Got returncode %d sending mail" % (p.returncode)) def notify(locktable, locks, acquired, unparsedHosts, quiet): notifyHTML(locktable) notifyMOTD(locktable, locks) if not quiet: notifyWall(locktable, locks, acquired) notifyEmail(locktable, locks, acquired, unparsedHosts) # # Operations (acquire and release) # def acquire(locktable, hosts, user, exclusive, force, desc, until): locks = [] start = time.time() for host in hosts: lock = Lock(host, user, desc, start, until, exclusive) conflict = locktable.checkConflict(lock) if conflict: print "%s already in use by %s" % (host, conflict.user) if force: print "...ignoring conflict because --force used" else: raise Exception("Lock conflict") locks.append(lock) for lock in locks: locktable.add(lock) locktable.store(LOCKTABLE) return locks def release(locktable, hosts, user): locks = [] for host in hosts: found = False for lock in locktable.locksByHost(host): if lock.user == user: found = True print "Releasing lock on %s" % host locks.append(lock) if not found: print "Warning: no lock held for %s" % host for x in locks: locktable.remove(x) locktable.store(LOCKTABLE) return locks # # Option parsing and main function # def expandHosts(hosts): for host in hosts: if host.startswith("farm") and ("-" in host or "," in host): for x in host[4:].split(","): if "-" in x: a = int(x.split("-")[0]) b = int(x.split("-")[1]) for i in range(a, b+1): yield "farm"+str(i) else: yield "farm"+x else: yield host def main(): parser = optparse.OptionParser( usage="%prog [options] hosts", version=("$Revision$").split()[1], description= """Acquire or release locks on hosts. These locks are not enforced by the system, but other users are notified about active locks via a webpage showing the system status, a MOTD message on login, a wall(1) message to terminals of logged-in users, and email. "hosts" is a list of hosts to acquire or release locks on. It supports ranges for farm hosts: for example, farm1,10-14 is a valid specification. """) actions = optparse.OptionGroup(parser, "Actions", "Action to take. Must choose exactly one.") actions.add_option("-a", "--acquire", action="store_true") actions.add_option("-r", "--release", action="store_true") actions.add_option("-l", "--list", action="store_true") parser.add_option_group(actions) modes = optparse.OptionGroup(parser, "Lock mode", "Lock mode (required for acquire).") modes.add_option("-s", "--shared", action="store_true") modes.add_option("-x", "--exclusive", action="store_true") parser.add_option_group(modes) parser.add_option("-f", "--force", action="store_true", help="ignore conflicting locks on acquire") parser.add_option("-q", "--quiet", action="store_true", help="don't send notifications via email and wall") parser.add_option("-u", "--username", action="store", type="string", help=optparse.SUPPRESS_HELP) parser.add_option("-v", "--verbose", action="store_true", help="print detailed lock information") (options, args) = parser.parse_args() if (len(args) == 0) and not options.list: # No hosts specified. This is an error, but print the verbose # help message instead of the simplified usage information # that parser.error generates parser.print_help() sys.exit(1) if (len([x for x in [options.acquire, options.release, options.list] if x]) != 1): parser.error("must specify exactly one of --acquire, --release, --list") if (options.acquire and (options.shared == options.exclusive)): parser.error("must specify exactly one of --shared or --exclusive") if not options.username: parser.error("--username is required") hosts = list(expandHosts(args)) for x in hosts: if x not in MANAGED_HOSTS: parser.error("unknown host: " + x) if options.acquire: # Prompt for more info options.desc = raw_input("Description of your job: ") options.until = raw_input("In use until: ") print "Acquiring lock on lock database... ", sys.stdout.flush() lockfile = file(LOCKFILE, "w") fcntl.lockf(lockfile, fcntl.LOCK_EX) print "acquired." locktable = LockTable.load(LOCKTABLE) try: if options.acquire: locks = acquire(locktable, hosts, options.username, options.exclusive, options.force, options.desc, options.until) notify(locktable, locks, True, " ".join(args), options.quiet) elif options.release: locks = release(locktable, hosts, options.username) notify(locktable, locks, False, " ".join(args), options.quiet) elif options.list: locktable.dump(options.verbose) finally: print "Releasing lock database." fcntl.lockf(lockfile, fcntl.LOCK_UN) if __name__ == "__main__": main()