Mercurial > browser-couch
changeset 50:e82074d95e06
The WebWorkerMapReducer now doles out jobs to individual threads; still need to implement the reduce operation though.
author | Atul Varma <varmaa@toolness.com> |
---|---|
date | Tue, 14 Apr 2009 15:54:56 -0700 |
parents | a5e2db06b58d |
children | e29662aba1b6 |
files | browser-couch.js |
diffstat | 1 files changed, 59 insertions(+), 21 deletions(-) [+] |
line wrap: on
line diff
--- a/browser-couch.js Tue Apr 14 15:02:19 2009 -0700 +++ b/browser-couch.js Tue Apr 14 15:54:56 2009 -0700 @@ -99,7 +99,7 @@ var pool = []; - function MapWorker() { + function MapWorker(id) { var worker = new Worker('worker-map-reducer.js'); var onDone; @@ -107,6 +107,7 @@ onDone(event.data); }; + this.id = id; this.map = function MW_map(map, dict, cb) { onDone = cb; worker.postMessage({map: map.toString(), dict: dict}); @@ -114,31 +115,68 @@ } for (var i = 0; i < numWorkers; i++) - pool.push(new MapWorker()); + pool.push(new MapWorker(i)); this.map = function WWMR_map(map, dict, progress, chunkSize, finished) { - pool[0].map(map, - dict.pickle(), - function onDone(mapDict) { - var mapKeys = []; - for (name in mapDict) - mapKeys.push(name); - mapKeys.sort(); - finished({dict: mapDict, keys: mapKeys}); - }); + var keys = dict.getKeys(); + var size = keys.length; + var workersDone = 0; + var mapDict = {}; + + function getNextChunk() { + if (keys.length) { + var chunkKeys = keys.slice(0, chunkSize); + keys = keys.slice(chunkSize); + var chunk = {}; + for (var i = 0; i < chunkKeys.length; i++) + chunk[chunkKeys[i]] = dict.get(chunkKeys[i]); + return chunk; + } else + return null; + } - // TODO: - - // Break up the dict into multiple chunks. - - // Issue each worker a chunk. + function nextJob(mapWorker) { + var chunk = getNextChunk(); + if (chunk) { + mapWorker.map( + map, + chunk, + function jobDone(aMapDict) { + for (name in aMapDict) + if (name in mapDict) { + var item = mapDict[name]; + item.keys = item.keys.concat(aMapDict[name].keys); + item.values = item.values.concat(aMapDict[name].values); + } else + mapDict[name] = aMapDict[name]; - // When a worker is done with a chunk, pass it a new one and call - // the progress callback. + if (keys.length) + progress("map", + (size - keys.length) / size, + function() { nextJob(mapWorker); }); + else + workerDone(); + }); + } else + workerDone(); + } - // When there are no more chunks left to pass out, we're done; - // merge all the results into a single mapResult and pass it - // to the finished() callback. + function workerDone() { + workersDone += 1; + if (workersDone == numWorkers) + allWorkersDone(); + } + + function allWorkersDone() { + var mapKeys = []; + for (name in mapDict) + mapKeys.push(name); + mapKeys.sort(); + finished({dict: mapDict, keys: mapKeys}); + } + + for (var i = 0; i < numWorkers; i++) + nextJob(pool[i]); }; this.reduce = SingleThreadedMapReducer.reduce;