# HG changeset patch # User Atul Varma # Date 1239746539 25200 # Node ID a5e2db06b58d9d60e369e0080a68543e8fa10442 # Parent b0d5bc990a8043af506eeab4a610461534334769 Initial proof-of-concept of a WebWorkerMapReducer. Still needs lots of work. diff -r b0d5bc990a80 -r a5e2db06b58d browser-couch.js --- a/browser-couch.js Tue Apr 14 12:50:23 2009 -0700 +++ b/browser-couch.js Tue Apr 14 15:02:19 2009 -0700 @@ -93,6 +93,57 @@ } }; +function WebWorkerMapReducer(numWorkers, Worker) { + if (!Worker) + Worker = window.Worker; + + var pool = []; + + function MapWorker() { + var worker = new Worker('worker-map-reducer.js'); + var onDone; + + worker.onmessage = function(event) { + onDone(event.data); + }; + + this.map = function MW_map(map, dict, cb) { + onDone = cb; + worker.postMessage({map: map.toString(), dict: dict}); + }; + } + + for (var i = 0; i < numWorkers; i++) + pool.push(new MapWorker()); + + this.map = function WWMR_map(map, dict, progress, chunkSize, finished) { + pool[0].map(map, + dict.pickle(), + function onDone(mapDict) { + var mapKeys = []; + for (name in mapDict) + mapKeys.push(name); + mapKeys.sort(); + finished({dict: mapDict, keys: mapKeys}); + }); + + // TODO: + + // Break up the dict into multiple chunks. + + // Issue each worker a chunk. + + // When a worker is done with a chunk, pass it a new one and call + // the progress callback. + + // When there are no more chunks left to pass out, we're done; + // merge all the results into a single mapResult and pass it + // to the finished() callback. + }; + + this.reduce = SingleThreadedMapReducer.reduce; +}; + var SingleThreadedMapReducer = { map: function STMR_map(map, dict, progress, chunkSize, finished) { diff -r b0d5bc990a80 -r a5e2db06b58d worker-map-reducer.js --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/worker-map-reducer.js Tue Apr 14 15:02:19 2009 -0700 @@ -0,0 +1,24 @@ +function map(func, dict) { + var mapDict = {}; + var currDoc; + + function emit(key, value) { + var item = mapDict[key]; + if (!item) + item = mapDict[key] = {keys: [], values: []}; + item.keys.push(currDoc.id); + item.values.push(value); + } + + for (key in dict) { + currDoc = dict[key]; + func(currDoc, emit); + } + + return mapDict; +} + +onmessage = function(event) { + var mapFunc = eval("(" + event.data.map + ")"); + postMessage(map(mapFunc, event.data.dict)); +};