changeset 50:e82074d95e06

The WebWorkerMapReducer now doles out jobs to individual threads; still need to implement the reduce operation though.
author Atul Varma <varmaa@toolness.com>
date Tue, 14 Apr 2009 15:54:56 -0700
parents a5e2db06b58d
children e29662aba1b6
files browser-couch.js
diffstat 1 files changed, 59 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/browser-couch.js	Tue Apr 14 15:02:19 2009 -0700
+++ b/browser-couch.js	Tue Apr 14 15:54:56 2009 -0700
@@ -99,7 +99,7 @@
 
   var pool = [];
 
-  function MapWorker() {
+  function MapWorker(id) {
     var worker = new Worker('worker-map-reducer.js');
     var onDone;
 
@@ -107,6 +107,7 @@
       onDone(event.data);
     };
 
+    this.id = id;
     this.map = function MW_map(map, dict, cb) {
       onDone = cb;
       worker.postMessage({map: map.toString(), dict: dict});
@@ -114,31 +115,68 @@
   }
 
   for (var i = 0; i < numWorkers; i++)
-    pool.push(new MapWorker());
+    pool.push(new MapWorker(i));
 
   this.map = function WWMR_map(map, dict, progress, chunkSize, finished) {
-    pool[0].map(map,
-                dict.pickle(),
-                function onDone(mapDict) {
-                  var mapKeys = [];
-                  for (name in mapDict)
-                    mapKeys.push(name);
-                  mapKeys.sort();
-                  finished({dict: mapDict, keys: mapKeys});
-                });
+    var keys = dict.getKeys();
+    var size = keys.length;
+    var workersDone = 0;
+    var mapDict = {};
+
+    function getNextChunk() {
+      if (keys.length) {
+        var chunkKeys = keys.slice(0, chunkSize);
+        keys = keys.slice(chunkSize);
+        var chunk = {};
+        for (var i = 0; i < chunkKeys.length; i++)
+          chunk[chunkKeys[i]] = dict.get(chunkKeys[i]);
+        return chunk;
+      } else
+        return null;
+    }
 
-    // TODO:
-
-    // Break up the dict into multiple chunks.
-
-    // Issue each worker a chunk.
+    function nextJob(mapWorker) {
+      var chunk = getNextChunk();
+      if (chunk) {
+        mapWorker.map(
+          map,
+          chunk,
+          function jobDone(aMapDict) {
+            for (name in aMapDict)
+              if (name in mapDict) {
+                var item = mapDict[name];
+                item.keys = item.keys.concat(aMapDict[name].keys);
+                item.values = item.values.concat(aMapDict[name].values);
+              } else
+                mapDict[name] = aMapDict[name];
 
-    // When a worker is done with a chunk, pass it a new one and call
-    // the progress callback.
+            if (keys.length)
+              progress("map",
+                       (size - keys.length) / size,
+                       function() { nextJob(mapWorker); });
+            else
+              workerDone();
+          });
+      } else
+        workerDone();
+    }
 
-    // When there are no more chunks left to pass out, we're done;
-    // merge all the results into a single mapResult and pass it
-    // to the finished() callback.
+    function workerDone() {
+      workersDone += 1;
+      if (workersDone == numWorkers)
+        allWorkersDone();
+    }
+
+    function allWorkersDone() {
+      var mapKeys = [];
+      for (name in mapDict)
+        mapKeys.push(name);
+      mapKeys.sort();
+      finished({dict: mapDict, keys: mapKeys});
+    }
+
+    for (var i = 0; i < numWorkers; i++)
+      nextJob(pool[i]);
   };
 
   this.reduce = SingleThreadedMapReducer.reduce;