MongoDB MapReduce

MapReduce

db.collection.mapReduce()的用法:

db.collection.mapReduce(
                     <mapfunction>,
                     <reducefunction>,
                     {
                       out: <collection>,
                       query: <document>,
                       sort: <document>,
                       limit: <number>,
                       finalize: <function>,
                       scope: <document>,
                       jsMode: <boolean>,
                       verbose: <boolean>
                     }
                   )

  1. MapReduce 在map funtion 中 emit 相应的key 只有一个文档命中的时候,不会执行reduce方法,需要在finailize 方法中进行处理。
  2. reduce 方法针对相同的key 有可能多次执行,reduce 计算的结果只和当前传入的values 有关系。

现有有个表topic_photos结构如下:

{
  "_id" : ObjectId("50b223832115fb21eca4a485"),
  "created_at" : ISODate("2012-11-25T13:56:19.548Z"),
  "last_liked" : ISODate("2012-12-10T08:44:49.276Z"),
  "likes" : 6,
  "photo_id" : NumberLong("39967821345988608"),
  "topic_id" : NumberLong("38362556125028352"),
  "user_id" : NumberLong(12449)
}

现在我们要选出被likes最多的用户,以及该用户likes最多那张照片及该照片的likes 数。

public void groupPhotosByUser(long topicId, long startTime, long endTime){

    String map = " function(){ "
            + "        var key = this.user_id; " // 以用户的user_id 为 key
            + "        value={photo_id:this.photo_id,likes:this.likes,photo_count:1,created_at:ISODate()};" // value 包含photo_id,likes,photo_count 初始为1。
            + "        emit(key,value); "
            + "    };";

    String reduce = "  function (user_id, values){"
                            // map reduce 是并行执行的,所以每次执行的时候都要初始化result
                    + "        var result = {user_id:user_id,photo_id:0,photo_count:0,likes:0,created_at:Date()}; "
                    + "     for(var i=0; i<values.length; i++){ "
                    + "         value = values[i];              "
                    + "         if(value > result.likes){ " 
                        // 选中values中likes最多的对象,并记录likes数与 photo_id
                    + "             result.likes = value.likes; "
                    + "             result.photo_id = value.photo_id;  "
                    + "         } "
                    + "         result.photo_count += value.photo_count; "
                    // 在循环中累计photo_count, 这里不是取values.length!!! 因为values个数多的时候,是分多次map_reduce执行的。
                    // 例如有20个values,可能分别reduce 10个文档,再将两次reduce的结果再次reduce,这时如果直接获取 values.length 结果就为2了。
                    + "     }; "
                    + "      return result; "
                    + "}";

    String finalize = " function (user_id, result) { "
            + "        if(typeof(result.created_at) == 'undefined' ){ " 
                    // 用户只有一张topic_photo的情况,在这里进行初始化。
            + "            result = {user_id:user_id,photo_count:1,photo_id:result.photo_id,likes:result.likes,created_at:ISODate()}; "
            + "        } "
            + "      return result; "
            + "}";

    DBObject query = QueryBuilder.start("topic_id").is(topicId)
                    .put("created_at").greaterThanEquals(new Date(startTime))
                    .lessThanEquals(new Date(endTime))
                    .get();
    String outputCollection = String.format("topic_%s_users", topicId);

    DBCollection inputCollection = getMongoDB().getCollection("topic_photos");
    MapReduceCommand mapReduceCommand = new MapReduceCommand(inputCollection, map, reduce, outputCollection, OutputType.REPLACE, query);
    mapReduceCommand.setFinalize(finalize);
    inputCollection.mapReduce(mapReduceCommand);

    //TODO creat mongodb index
}

MongoDB的调试

Java程序中

static{

    // Enable MongoDB logging in general
    System.setProperty("DEBUG.MONGO", "true");

    // Enable DB operation tracing
    System.setProperty("DB.TRACE", "true");
}    

利用printjson()函数,在MapReduce 的执行过程打印出执行日志

var map = function(){
    var key = this.user_id;
    var value = {photo_id:this.photo_id, likes:this.likes, photo_count:1, created_at:ISODate()};
    emit(key,value);
}

var reduce = function (user_id, values){
    if (user_id == 350278) {
        printjson(values);
    }

    var result = {user_id:user_id,photo_id:0,photo_count:0,likes:0,created_at:Date()};
    for(var i=0; i<values.length; i++){
        var value = values[i];
        if(value > result.likes){
            result.likes = value.likes;
            result.photo_id = value.photo_id;
        }
        result.photo_count += value.photo_count;
    };
    return result;
}

var finalize = function (user_id, result) {
    if(typeof(result.created_at) == 'undefined' ){
        result = {user_id:user_id,photo_count:1,photo_id:result.photo_id,likes:result.likes,created_at:ISODate()};
    }
    return result;
}

var query = {"topic_id" : 38362556125028352}

db.runCommand({"mapreduce" : "topic_photos" ,
          "map" : map, 
          "reduce" : reduce,
          "finalize": finalize,
          "query" : query,           
          "out" : { "replace" : "topic_38362556125028352_users"}, 
          "verbose" : true
        })

MongoDB控制台输出

从上图可以看到,最后一次reduce 时,是将前两次reduce 的结果再执行 reduce 操作,所以reduce 中的 photo_count 计数,不能依赖于 values.length, 而应该从传入的参数中获取。

利用Underscore.js 框架调试MapReduce方法

参见Debugging MapReduce in MongoDB

Tips

  • MongoDB 最分将不同的业务进行垂直切分,存储到不到的db中,这样分别在找出慢查询及定位问题的时候更清晰。
  • Pretty print in MongoDB shell: db.collection.find().pretty()

参考资料