This repository has been archived on 2025-03-01. You can view files and clone it, but cannot push or open issues or pull requests.
trantor/mapreduce.go

199 lines
5.6 KiB
Go
Raw Normal View History

2013-06-01 02:34:11 +02:00
package main
import (
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
"time"
)
type MR struct {
2013-06-01 04:17:15 +02:00
meta *mgo.Collection
tags *mgo.Collection
hourly_raw *mgo.Collection
daily_raw *mgo.Collection
monthly_raw *mgo.Collection
hourly *mgo.Collection
daily *mgo.Collection
monthly *mgo.Collection
2013-06-01 02:34:11 +02:00
}
func NewMR(database *mgo.Database) *MR {
m := new(MR)
m.meta = database.C(META_COLL)
m.tags = database.C(TAGS_COLL)
2013-06-01 04:17:15 +02:00
m.hourly_raw = database.C(HOURLY_VISITS_COLL + "_raw")
m.daily_raw = database.C(DAILY_VISITS_COLL + "_raw")
m.monthly_raw = database.C(MONTHLY_VISITS_COLL + "_raw")
2013-06-01 03:40:06 +02:00
m.hourly = database.C(HOURLY_VISITS_COLL)
2013-06-01 02:34:11 +02:00
m.daily = database.C(DAILY_VISITS_COLL)
2013-06-01 03:40:06 +02:00
m.monthly = database.C(MONTHLY_VISITS_COLL)
2013-06-01 02:34:11 +02:00
return m
}
func (m *MR) GetTags(numTags int, booksColl *mgo.Collection) ([]string, error) {
if m.isOutdated(TAGS_COLL, MINUTES_UPDATE_TAGS) {
var mr mgo.MapReduce
mr.Map = `function() {
if (this.subject) {
this.subject.forEach(function(s) { emit(s, 1); });
}
}`
mr.Reduce = `function(tag, vals) {
var count = 0;
vals.forEach(function() { count += 1; });
return count;
}`
err := m.update(&mr, bson.M{"active": true}, booksColl, TAGS_COLL)
if err != nil {
return nil, err
}
}
var result []struct {
Tag string "_id"
}
err := m.tags.Find(nil).Sort("-value").Limit(numTags).All(&result)
if err != nil {
return nil, err
}
tags := make([]string, len(result))
for i, r := range result {
tags[i] = r.Tag
}
return tags, nil
}
2013-06-01 03:40:06 +02:00
func (m *MR) GetHourVisits(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
if m.isOutdated(HOURLY_VISITS_COLL, MINUTES_UPDATE_HOURLY) {
2013-06-01 04:17:15 +02:00
const reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
2013-06-01 03:40:06 +02:00
var mr mgo.MapReduce
mr.Map = `function() {
2013-06-01 04:17:15 +02:00
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth(),
this.date.getUTCDate(),
this.date.getUTCHours());
emit({date: date, session: this.session}, 1);
2013-06-01 03:40:06 +02:00
}`
2013-06-01 04:17:15 +02:00
mr.Reduce = reduce
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, HOURLY_VISITS_COLL+"_raw")
if err != nil {
return nil, err
}
var mr2 mgo.MapReduce
mr2.Map = `function() {
emit(this['_id']['date'], 1);
}`
mr2.Reduce = reduce
err = m.update(&mr2, bson.M{}, m.hourly_raw, HOURLY_VISITS_COLL)
2013-06-01 03:40:06 +02:00
if err != nil {
return nil, err
}
}
var result []Visits
err := m.hourly.Find(nil).All(&result)
return result, err
}
2013-06-01 02:34:11 +02:00
func (m *MR) GetDayVisits(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
if m.isOutdated(DAILY_VISITS_COLL, MINUTES_UPDATE_DAILY) {
2013-06-01 04:17:15 +02:00
const reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
2013-06-01 02:34:11 +02:00
var mr mgo.MapReduce
mr.Map = `function() {
2013-06-01 04:17:15 +02:00
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth(),
this.date.getUTCDate());
emit({date: date, session: this.session}, 1);
2013-06-01 03:40:06 +02:00
}`
2013-06-01 04:17:15 +02:00
mr.Reduce = reduce
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, DAILY_VISITS_COLL+"_raw")
if err != nil {
return nil, err
}
var mr2 mgo.MapReduce
mr2.Map = `function() {
emit(this['_id']['date'], 1);
}`
mr2.Reduce = reduce
err = m.update(&mr2, bson.M{}, m.daily_raw, DAILY_VISITS_COLL)
2013-06-01 02:34:11 +02:00
if err != nil {
return nil, err
}
}
var result []Visits
err := m.daily.Find(nil).All(&result)
return result, err
}
2013-06-01 03:40:06 +02:00
func (m *MR) GetMonthVisits(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
if m.isOutdated(MONTHLY_VISITS_COLL, MINUTES_UPDATE_MONTHLY) {
2013-06-01 04:17:15 +02:00
const reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
2013-06-01 03:40:06 +02:00
var mr mgo.MapReduce
mr.Map = `function() {
2013-06-01 04:17:15 +02:00
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth());
emit({date: date, session: this.session}, 1);
2013-06-01 03:40:06 +02:00
}`
2013-06-01 04:17:15 +02:00
mr.Reduce = reduce
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, MONTHLY_VISITS_COLL+"_raw")
if err != nil {
return nil, err
}
var mr2 mgo.MapReduce
mr2.Map = `function() {
emit(this['_id']['date'], 1);
}`
mr2.Reduce = reduce
err = m.update(&mr2, bson.M{}, m.monthly_raw, MONTHLY_VISITS_COLL)
2013-06-01 03:40:06 +02:00
if err != nil {
return nil, err
}
}
var result []Visits
err := m.monthly.Find(nil).All(&result)
return result, err
}
2013-06-01 02:34:11 +02:00
func (m *MR) update(mr *mgo.MapReduce, query bson.M, queryColl *mgo.Collection, storeColl string) error {
_, err := m.meta.RemoveAll(bson.M{"type": storeColl})
if err != nil {
return err
}
mr.Out = bson.M{"replace": storeColl}
_, err = queryColl.Find(query).MapReduce(mr, nil)
if err != nil {
return err
}
return m.meta.Insert(bson.M{"type": storeColl})
}
func (m *MR) isOutdated(coll string, minutes float64) bool {
var result struct {
Id bson.ObjectId `bson:"_id"`
}
err := m.meta.Find(bson.M{"type": coll}).One(&result)
if err != nil {
return true
}
lastUpdate := result.Id.Time()
return time.Since(lastUpdate).Minutes() > minutes
}