Mercurial > my-enso-commands
diff my-enso-commands.py @ 0:429e04abae50
Origination.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Thu, 08 May 2008 11:34:42 -0700 |
parents | |
children | 3924ba5f3621 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/my-enso-commands.py Thu May 08 11:34:42 2008 -0700 @@ -0,0 +1,831 @@ +# -*-Mode:python-*- + +import os +import tempfile +import sys +import subprocess +import webbrowser +import urllib +import threading +from StringIO import StringIO + +def getFuelApplication(): + from xpcom import components + fuel_cls = components.classes["@mozilla.org/fuel/application;1"] + fuel_abs = fuel_cls.createInstance() + fuel_if = components.interfaces.fuelIApplication + return fuel_abs.queryInterface(fuel_if) + +def getCommandDispatcher(): + from xpcom import components + + ci = components.interfaces + cc = components.classes + + Application = getFuelApplication() + document = Application.activeWindow.activeTab.document + + mediator = cc["@mozilla.org/appshell/window-mediator;1"].getService(ci.nsIWindowMediator) + window = mediator.getMostRecentWindow("navigator:browser") + return window.document.commandDispatcher + +def cmd_info(ensoapi): + thing = getCommandDispatcher() + + thing._build_all_supported_interfaces_() + lines = [] + for iname in thing._interface_names_: + methods = [] + properties = [] + for mname in thing._interface_names_[iname]._method_infos_: + if mname != "QueryInterface": + methods.append( "%s()" % mname ) + for pname in thing._interface_names_[iname]._property_getters_: + properties.append( "%s" % pname ) + lines.append("%s:" % iname) + if methods: + lines.extend([" METHODS: %s" % ", ".join(methods)]) + if properties: + lines.extend([" PROPERTIES: %s" % ", ".join(properties)]) + + ensoapi.set_selection( {"text" : "\n\n".join(lines)} ) + +def cmd_highlight(ensoapi): + """ + Highlights your current selection. + """ + + window = getCommandDispatcher().focusedWindow + document = window.document + sel = window.getSelection() + if sel.rangeCount >= 1: + range = sel.getRangeAt(0) + newNode = document.createElement("span") + newNode.style.background = "yellow" + range.surroundContents(newNode) + +def cmd_count_lines(ensoapi): + seldict = ensoapi.get_selection() + ensoapi.display_message( "Selection is %s lines." % + len(seldict.get("text","").splitlines()) ) + +def _get_filelike_timestamp(): + import time + + return time.strftime("%Y%m%d%H%M%S") + +def _get_clipboard_img_datafile(): + import AppKit + + pb = AppKit.NSPasteboard.generalPasteboard() + if pb.availableTypeFromArray_([AppKit.NSTIFFPboardType]): + bytes = pb.dataForType_(AppKit.NSTIFFPboardType).bytes() + datafile = StringIO(str(bytes)) + return datafile + else: + return None + +def image_command(func): + def wrapper(ensoapi): + import Image + + datafile = _get_clipboard_img_datafile() + if datafile: + img = Image.open(datafile) + return func(ensoapi, img) + else: + files = ensoapi.get_selection().get("files", []) + if files: + if len(files) == 1: + filename = files[0] + img = None + try: + img = Image.open(filename) + except IOError, e: + ensoapi.display_message("An error occurred: %s." % e) + if img: + return func(ensoapi, img) + else: + ensoapi.display_message("More than one file selected.") + else: + ensoapi.display_message("No image in clipboard, and no " + "file selected.") + return wrapper + +@image_command +def cmd_get_image_size(ensoapi, img): + outputfile = StringIO() + img.save(outputfile, "PNG") + png_size = outputfile.tell() + outputfile = StringIO() + img.save(outputfile, "JPEG", quality=90) + jpg_size = outputfile.tell() + ensoapi.display_message("png size: %d bytes. jpg-90 size: %d bytes." % + (png_size, jpg_size)) + +def cmd_unupload_image(ensoapi): + url = ensoapi.get_selection().get("text", "") + if url: + if url.startswith(REMOTE_UPLOAD_URL): + filename = url[len(REMOTE_UPLOAD_URL):] + localfile = os.path.join(LOCAL_UPLOAD_DIR, filename) + # It's just easier to upload a truncated file than it is + # to remove the file remotely. + open(localfile, "w").close() + popen = subprocess.Popen( + ["scp", + localfile, + "%s%s" % (REMOTE_UPLOAD_DIR, filename)] + ) + while popen.poll() is None: + yield + if popen.returncode == 0: + os.remove(localfile) + ensoapi.display_message("Image removed.") + else: + ensoapi.display_message("An error occurred.") + else: + ensoapi.display_message("URL is not an upload URL.") + else: + ensoapi.display_message("No selection!") + +LOCAL_UPLOAD_DIR = "/Users/varmaa/Archive/toolness-images/" +REMOTE_UPLOAD_DIR = "toolness.com:/home/varmaa/toolness.com/images/" +REMOTE_UPLOAD_URL = "http://www.toolness.com/images/" + +@image_command +def cmd_upload_image(ensoapi, img): + filename = "%s.jpg" % _get_filelike_timestamp() + localfile = os.path.join(LOCAL_UPLOAD_DIR, filename) + img.save(localfile, quality=90) + + ensoapi.display_message("Uploading image...") + + popen = subprocess.Popen( + ["scp", + localfile, + "%s%s" % (REMOTE_UPLOAD_DIR, filename)] + ) + while popen.poll() is None: + yield + if popen.returncode == 0: + webbrowser.open("%s%s" % (REMOTE_UPLOAD_URL, filename)) + else: + ensoapi.display_message("An error occurred.") + +class ThreadedFunc(threading.Thread): + def __init__(self, target): + self.__target = target + threading.Thread.__init__(self) + self.__success = False + self.__retval = None + self.start() + + def run(self): + self.__retval = self.__target() + self.__success = True + + def wasSuccessful(self): + if self.isAlive(): + raise Exception( "Thread not finished" ) + return self.__success + + def getRetval(self): + if self.isAlive(): + raise Exception( "Thread not finished" ) + return self.__retval + +def htmlifier(func): + def wrapper(ensoapi): + seldict = ensoapi.get_selection() + text = seldict.get("text", "") + html = seldict.get("html", text) + if not text: + ensoapi.display_message("No selection!") + else: + result = func(ensoapi, html) + ensoapi.set_selection( + {"html":result, + "text":result} + ) + + return wrapper + +@htmlifier +def cmd_bold(ensoapi, text): + return "<b>%s</b>" % text + +@htmlifier +def cmd_italics(ensoapi, text): + return "<i>%s</i>" % text + +@htmlifier +def cmd_monospace(ensoapi, text): + return "<pre>%s</pre>" % text + +def cmd_normalize(ensoapi): + normal_template = "<span style=\"font-weight: normal;\">%s</span>" + seldict = ensoapi.get_selection() + text = seldict.get("text", "") + if not text: + ensoapi.display_message("No selection!") + else: + ensoapi.set_selection( + {"html":normal_template % text, + "text":text} ) + +def cmd_paste_html(ensoapi): + ensoapi.set_selection( + {"text":"u do not support html.", + "html":"<p>hai2u, <i>u support html</i>!</p>"} + ) + +def cmd_get_pb_info(ensoapi): + import AppKit + + pb = AppKit.NSPasteboard.generalPasteboard() + + pbtypes = pb.types() + for string in pbtypes: + print "type: %s" % string + +def make_get_url_func(url): + def get_url(): + import urllib2 + + f = urllib2.urlopen(url) + return f.read() + return get_url + +def get_weather(xml_data): + """ + Shows the weather for the given place. + """ + + import elementtree.ElementTree as ET + + page = ET.fromstring(xml_data) + + weather = page.find( "weather/current_conditions" ) + + return { + 'f' : weather.find( "temp_f" ).get( "data" ), + 'c' : weather.find( "temp_c" ).get( "data" ), + 'humidity' : weather.find( "humidity" ).get( "data" ), + 'wind' : weather.find( "wind_condition" ).get( "data" ) + } + +def test_get_weather(): + xml_data = """<?xml version="1.0"?><xml_api_reply version="1"><weather module_id="0" tab_id="0"><forecast_information><city data="Chicago, IL"/><postal_code data="60657"/><latitude_e6 data=""/><longitude_e6 data=""/><forecast_date data="2008-04-07"/><current_date_time data="2008-04-07 06:38:00 +0000"/><unit_system data="US"/></forecast_information><current_conditions><condition data="Cloudy"/><temp_f data="57"/><temp_c data="14"/><humidity data="Humidity: 47%"/><icon data="/images/weather/cloudy.gif"/><wind_condition data="Wind: N at 0 mph"/></current_conditions><forecast_conditions><day_of_week data="Today"/><low data="40"/><high data="56"/><icon data="/images/weather/cloudy.gif"/><condition data="Cloudy"/></forecast_conditions><forecast_conditions><day_of_week data="Tue"/><low data="45"/><high data="54"/><icon data="/images/weather/thunderstorm.gif"/><condition data="Thunderstorm"/></forecast_conditions><forecast_conditions><day_of_week data="Wed"/><low data="40"/><high data="54"/><icon data="/images/weather/mostly_sunny.gif"/><condition data="Partly Sunny"/></forecast_conditions><forecast_conditions><day_of_week data="Thu"/><low data="38"/><high data="49"/><icon data="/images/weather/chance_of_rain.gif"/><condition data="Chance of Showers"/></forecast_conditions></weather></xml_api_reply>""" + + assert get_weather(xml_data) == {'c': '14', 'humidity': 'Humidity: 47%', 'wind': 'Wind: N at 0 mph', 'f': '57'} + +def cmd_weather(ensoapi, place="san francisco"): + zipcode = cmd_weather.places[place] + url = "http://www.google.com/ig/api?weather=%s" % zipcode + thread = ThreadedFunc(make_get_url_func(url)) + while thread.isAlive(): + yield + weather_xml = thread.getRetval() + if not weather_xml: + ensoapi.display_message("An error occurred when getting the weather.") + else: + wdict = get_weather(weather_xml) + wdict["place"] = place + ensoapi.display_message(u"In %(place)s it is " + u"%(f)s\u00b0F/%(c)s\u00b0C, " + u"%(humidity)s, %(wind)s." % wdict) + +cmd_weather.places = { "san francisco" : "94115", + "chicago" : "60640", + "mountain view" : "94043" } + +cmd_weather.valid_args = cmd_weather.places.keys() + +def cmd_what_is_my_ip(ensoapi): + thread = ThreadedFunc(make_get_url_func("http://www.toolness.com/ip/")) + while thread.isAlive(): + yield + ensoapi.display_message("%s." % thread.getRetval()) + +def cmd_character_count(ensoapi): + text = ensoapi.get_selection().get("text", "").strip() + if not text: + ensoapi.display_message( "No selection." ) + ensoapi.display_message( "%d characters." % len(text) ) + +def cmd_email_to_me(ensoapi): + text = ensoapi.get_selection().get("text", "").strip() + if not text: + ensoapi.display_message( "No selection." ) + else: + server = "mail.toolness.com" + port = 587 + username = "no-reply@toolness.com" + password = "4p5o3j45ot%$" + + mailfrom = "varmaa@gmail.com" + rcpttos = ["varmaa@gmail.com"] + + text = text.encode("ascii", "xmlcharrefreplace") + subject = text.splitlines()[0] + text = ("This message was sent by Enso's " + "'email to' command.\n\n" + text) + data = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s" % \ + (mailfrom, ", ".join(rcpttos), subject, text) + + def sendmail(): + import smtplib + + try: + svr = smtplib.SMTP( server, port ) + svr.login( username, password ) + svr.sendmail( mailfrom, rcpttos, data ) + svr.quit() + except: + success[0] = False + raise + + thread = ThreadedFunc(sendmail) + ensoapi.display_message( "Sending message..." ) + while thread.isAlive(): + yield + if thread.wasSuccessful(): + ensoapi.display_message( "Message sent." ) + else: + ensoapi.display_message( + "An error occurred when sending the message." + ) + +def cmd_date(ensoapi): + import time + + ensoapi.display_message( "%s" % time.asctime() ) + +def cmd_insert_date(ensoapi): + import time + + ensoapi.set_selection( time.asctime() ) + +class WebSearchCmd(object): + def __init__(self, url_template): + self._url_template = url_template + + def __call__(self, ensoapi, query=None): + if not query: + query = ensoapi.get_selection().get("text", u"") + query = query.strip() + + if not query: + ensoapi.display_message( "No query." ) + return + + query = urllib.quote( query.encode("utf-8") ) + + webbrowser.open( self._url_template % {"query" : query} ) + +cmd_wiki = WebSearchCmd("http://en.wikipedia.org/wiki/%(query)s") +cmd_wowhead = WebSearchCmd("http://www.wowhead.com/?search=%(query)s") +cmd_amaz = WebSearchCmd("http://www.amazon.com/exec/obidos/search-handle-url/index%%3Dblended%%26field-keywords%%3D%(query)s%%26store-name%%3Dall-product-search") +cmd_yp = WebSearchCmd("http://local.google.com/local?sc=1&q=%(query)s&near=2418+washington+94115&btnG=Google+Search") +cmd_imdb = WebSearchCmd("http://www.imdb.com/find?s=all&q=%(query)s&x=0&y=0") +cmd_mdc = WebSearchCmd("http://www.google.com/search?hl=en&q=%(query)s+site%%3Adeveloper.mozilla.org&btnG=Google+Search") +cmd_mxr = WebSearchCmd("http://mxr.mozilla.org/mozilla/search?string=%(query)s") +cmd_phonebook = WebSearchCmd("https://ldap.mozilla.org/phonebook/search.php?search=%(query)s") + +# For help on this one see http://www.squarefree.com/bugzilla/quicksearch-help.html +cmd_bugzilla = WebSearchCmd("https://bugzilla.mozilla.org/buglist.cgi?quicksearch=%(query)s") + +def cmd_farewell(ensoapi): + import AppKit + + app = AppKit.NSApplication.sharedApplication() + app.terminate_(None) + +class Launcher(object): + def __init__(self): + import Foundation + import AppKit + + self._targets = {} + + query = Foundation.NSMetadataQuery.alloc().init() + queryString = ( "((kMDItemKind == \"Application\") " + " and (kMDItemSupportFileType != \"MDSystemFile\"))" ) + queryString += " or (kMDItemKind == \"Mac OS X Preference Pane\")" + predicate = Foundation.NSPredicate.predicateWithFormat_( + queryString + ) + query.setPredicate_( predicate ) + if query.startQuery() != True: + raise Exception( "startQuery() failed." ) + self._query = query + self._workspace = AppKit.NSWorkspace.sharedWorkspace() + self._targets = {} + + def get_namespace(self): + return self._targets.keys() + + def get_target(self, name): + return self._targets[name] + + def update(self): + query = self._query + + while query.isGathering(): + yield + + # TODO: Modify this so we just get notified whenever the query + # results change instead of constantly "polling" every time + # the quasimode starts. + + resultList = [] + targets = {} + + query.disableUpdates() + numresults = query.resultCount() + + BATCH_SIZE = 10 + + for i in range( numresults ): + result = query.resultAtIndex_( i ) + fsname = result.valueForAttribute_("kMDItemFSName") + name = result.valueForAttribute_("kMDItemDisplayName") + kind = result.valueForAttribute_("kMDItemKind") + if name: + name = name.lower() + itempath = result.valueForAttribute_("kMDItemPath") + if kind == "Mac OS X Preference Pane": + name += " preferences" + target = ShellOpenLaunchableTarget(itempath) + else: + target = AppLaunchableTarget(self._workspace, itempath) + resultList.append(name) + targets[name] = target + if i / BATCH_SIZE == i / float(BATCH_SIZE): + yield + #print "total results: %s" % numresults + query.enableUpdates() + targets["computer"] = ShellOpenLaunchableTarget("/") + self._targets = targets + +class AppLaunchableTarget(object): + def __init__(self, workspace, path): + self._workspace = workspace + self._path = path + + def launch(self, with_files=None): + if with_files: + for filename in with_files: + self._workspace.openFile_withApplication_( + filename, + self._path + ) + else: + self._workspace.launchApplication_( self._path ) + + def can_launch_with(self): + return True + +class ShellOpenLaunchableTarget(object): + def __init__(self, path): + self._path = path + + def launch(self): + subprocess.Popen( ["open", self._path] ) + + def can_launch_with(self): + return False + +class OpenCommand(object): + """ + Opens an application, folder, or URL. + """ + + def __init__(self, launcher): + self.launcher = launcher + self._isFetchingArgs = False + + def on_quasimode_start(self): + if self._isFetchingArgs: + return + + self._isFetchingArgs = True + + for _ in self.launcher.update(): + yield + + self.valid_args = self.launcher.get_namespace() + + self._isFetchingArgs = False + + valid_args = [] + + def __call__(self, ensoapi, target=None): + if not target: + seldict = ensoapi.get_selection() + if seldict.get("files"): + for file in seldict["files"]: + subprocess.Popen( ["open", file] ) + elif seldict.get("text"): + filename = seldict["text"].strip() + if os.path.isabs(filename): + subprocess.Popen( ["open", filename] ) + else: + webbrowser.open(filename) + else: + self.launcher.get_target(target).launch() + +class OpenWithCommand(object): + """ + Opens the selected file(s) with a particular application. + """ + + def __init__(self, launcher): + self.launcher = launcher + + def _get_valid_args(self): + return self.launcher.get_namespace() + + valid_args = property(_get_valid_args) + + def __call__(self, ensoapi, target): + files = ensoapi.get_selection().get("files", []) + targ = self.launcher.get_target(target) + if not files: + ensoapi.display_message("No files selected!") + elif not targ.can_launch_with(): + ensoapi.display_message("Can't open files with %s." % target) + else: + targ.launch(files) + +cmd_go = OpenCommand(Launcher()) +cmd_open = cmd_go +cmd_bust_with = OpenWithCommand(cmd_open.launcher) + +def cmd_run_python(ensoapi): + """ + runs the selected text as python code. + """ + + text = ensoapi.get_selection().get("text","") + + if not text: + ensoapi.display_message("No text selection!") + return + + output_fd, output_name = tempfile.mkstemp() + + # TODO: This is for OSX only; will have to figure out what to do + # for other platforms. + cmd = text.replace( "\r", "\n" ) + + try: + compile( cmd, "<selected text>", "eval" ) + is_eval = True + except: + is_eval = False + + if is_eval: + cmd = "print %s," % cmd + + cmd = ("try:\n" + " from autoimp import *\n" + "except ImportError:\n" + " pass\n\n" + cmd ) + + popen = subprocess.Popen( + [ sys.executable, + "-c", cmd ], + stdout = output_fd, + stderr = subprocess.STDOUT, + ) + + while popen.poll() is None: + yield + + os.close(output_fd) + + output_text = open(output_name, "r").read().strip() + if output_text: + seldict = {"text" : "\n".join([text, output_text])} + ensoapi.set_selection(seldict) + else: + if popen.returncode == 0: + ensoapi.display_message("Code run successfully.") + else: + ensoapi.display_message("An error occurred.") + + os.remove(output_name) + +def cmd_doctest(ensoapi): + """ + python doctests the selected section of plain text. + """ + + text = ensoapi.get_selection().get("text","") + + if not text: + ensoapi.display_message("No text selection!") + return + + # TODO: This is for OSX only; will have to figure out what to do + # for other platforms. + new_text = text.replace( "\r", "\n" ) + + fd, source_name = tempfile.mkstemp() + os.close(fd) + open(source_name, "wb").write(new_text) + + output_fd, output_name = tempfile.mkstemp() + + cmd = ("import doctest; doctest.testfile(%s, " + "module_relative=False)" % repr(source_name)) + + popen = subprocess.Popen( + [ sys.executable, + "-c", cmd ], + stdout = output_fd, + stderr = subprocess.STDOUT, + ) + + while popen.poll() is None: + yield + + os.close(output_fd) + + if popen.returncode == 0: + output_text = open(output_name, "r").read() + if output_text: + seldict = {"text":text + output_text} + ensoapi.set_selection(seldict) + else: + ensoapi.display_message("All doctests successful!") + else: + ensoapi.display_message("An error occurred.") + + os.remove(source_name) + os.remove(output_name) + +def cmd_nosetest(ensoapi): + """ + runs nosetests on the selected region of python code. + """ + + text = ensoapi.get_selection().get("text","") + + if not text: + ensoapi.display_message("No text selection!") + return + + # TODO: This is for OSX only; will have to figure out what to do + # for other platforms. + new_text = text.replace( "\r", "\n" ) + + fd, source_name = tempfile.mkstemp(suffix=".py") + os.close(fd) + open(source_name, "wb").write(new_text) + + output_fd, output_name = tempfile.mkstemp() + + popen = subprocess.Popen( + [ "nosetests", + source_name, + "--with-doctest" ], + stdout = output_fd, + stderr = subprocess.STDOUT, + ) + + while popen.poll() is None: + yield + + os.close(output_fd) + + output_text = open(output_name, "r").read() + if output_text: + if popen.returncode == 0: + lines = output_text.splitlines() + ensoapi.display_message(lines[2]) + else: + seldict = {"text":text + output_text} + ensoapi.set_selection(seldict) + else: + raise AssertionError( "nosetests didn't return anything!" ) + + os.remove(source_name) + os.remove(output_name) + +def cmd_insert_html(ensoapi): + ensoapi.set_selection( {"html":"<b>hi</b> <p>there</p>"} ) + +def _runAppleScript( script ): + params = [ "osascript", "-e", script ] + popen = subprocess.Popen( params ) + +def cmd_screen_saver(ensoapi): + """ + Starts your screen saver. + """ + + _runAppleScript( + "tell application id \"com.apple.ScreenSaver.Engine\" " + "to launch" + ) + +def cmd_sleep(ensoapi): + """ + Puts your computer to sleep. + """ + + _runAppleScript( + "tell application \"Finder\" to sleep" + ) + +def cmd_play(ensoapi): + """ + Plays the current iTunes song. + """ + + _runAppleScript( + "tell application \"iTunes\" to play" + ) + +def cmd_pause(ensoapi): + """ + Pauses the current iTunes song. + """ + + _runAppleScript( + "tell application \"iTunes\" to pause" + ) + + +def cmd_next_track(ensoapi): + """ + Goes to the next track on iTunes. + """ + + _runAppleScript( + "tell application \"iTunes\" to next track" + ) + +def cmd_previous_track(ensoapi): + """ + Goes to the previous track on iTunes. + """ + + _runAppleScript( + "tell application \"iTunes\" to back track" + ) + +KIRITSU_DIR = "/Users/varmaa/Documents/kiritsu" +KIRITSU_MAKE_SCRIPT = "%s/MakeEverything.py" % KIRITSU_DIR +KIRITSU_VIEW_TEMPLATE = "file://%s/%s.html" +KIRITSU_VIEWS = ["work", "play", "chores"] + +def cmd_refresh_kiritsu(ensoapi): + """ + Refreshes all kiritsu feeds. + """ + + popen = subprocess.Popen( + [sys.executable, + KIRITSU_MAKE_SCRIPT], + cwd = KIRITSU_DIR + ) + + ensoapi.display_message( "Please wait while kiritsu retrieves " + "new feeds." ) + + while popen.poll() is None: + yield + + if popen.returncode != 0: + ensoapi.display_message( "An error occurred in Kiritsu." ) + else: + ensoapi.display_message( "Kiritsu is done." ) + +def cmd_view( ensoapi, viewname ): + """ + Opens the given kiritsu view. + """ + + webbrowser.open( KIRITSU_VIEW_TEMPLATE % (KIRITSU_DIR, viewname) ) + +cmd_view.valid_args = KIRITSU_VIEWS + +def cmd_rest(ensoapi): + import sys + + ensoapi.display_message( "please wait..." ) + popen = subprocess.Popen( + [ sys.executable, + "-c", "import time;time.sleep(2.0)" ] + ) + while popen.poll() is None: + yield + if popen.returncode == 0: + msg = "done!" + else: + msg = "an error occurred." + ensoapi.display_message( msg )