# HG changeset patch # User Atul Varma # Date 1239749696 25200 # Node ID e82074d95e06eb050cecf72f10c10a506d64f77c # Parent a5e2db06b58d9d60e369e0080a68543e8fa10442 The WebWorkerMapReducer now doles out jobs to individual threads; still need to implement the reduce operation though. diff -r a5e2db06b58d -r e82074d95e06 browser-couch.js --- a/browser-couch.js Tue Apr 14 15:02:19 2009 -0700 +++ b/browser-couch.js Tue Apr 14 15:54:56 2009 -0700 @@ -99,7 +99,7 @@ var pool = []; - function MapWorker() { + function MapWorker(id) { var worker = new Worker('worker-map-reducer.js'); var onDone; @@ -107,6 +107,7 @@ onDone(event.data); }; + this.id = id; this.map = function MW_map(map, dict, cb) { onDone = cb; worker.postMessage({map: map.toString(), dict: dict}); @@ -114,31 +115,68 @@ } for (var i = 0; i < numWorkers; i++) - pool.push(new MapWorker()); + pool.push(new MapWorker(i)); this.map = function WWMR_map(map, dict, progress, chunkSize, finished) { - pool[0].map(map, - dict.pickle(), - function onDone(mapDict) { - var mapKeys = []; - for (name in mapDict) - mapKeys.push(name); - mapKeys.sort(); - finished({dict: mapDict, keys: mapKeys}); - }); + var keys = dict.getKeys(); + var size = keys.length; + var workersDone = 0; + var mapDict = {}; + + function getNextChunk() { + if (keys.length) { + var chunkKeys = keys.slice(0, chunkSize); + keys = keys.slice(chunkSize); + var chunk = {}; + for (var i = 0; i < chunkKeys.length; i++) + chunk[chunkKeys[i]] = dict.get(chunkKeys[i]); + return chunk; + } else + return null; + } - // TODO: - - // Break up the dict into multiple chunks. - - // Issue each worker a chunk. + function nextJob(mapWorker) { + var chunk = getNextChunk(); + if (chunk) { + mapWorker.map( + map, + chunk, + function jobDone(aMapDict) { + for (name in aMapDict) + if (name in mapDict) { + var item = mapDict[name]; + item.keys = item.keys.concat(aMapDict[name].keys); + item.values = item.values.concat(aMapDict[name].values); + } else + mapDict[name] = aMapDict[name]; - // When a worker is done with a chunk, pass it a new one and call - // the progress callback. + if (keys.length) + progress("map", + (size - keys.length) / size, + function() { nextJob(mapWorker); }); + else + workerDone(); + }); + } else + workerDone(); + } - // When there are no more chunks left to pass out, we're done; - // merge all the results into a single mapResult and pass it - // to the finished() callback. + function workerDone() { + workersDone += 1; + if (workersDone == numWorkers) + allWorkersDone(); + } + + function allWorkersDone() { + var mapKeys = []; + for (name in mapDict) + mapKeys.push(name); + mapKeys.sort(); + finished({dict: mapDict, keys: mapKeys}); + } + + for (var i = 0; i < numWorkers; i++) + nextJob(pool[i]); }; this.reduce = SingleThreadedMapReducer.reduce;