From 42de6bbf644e4890f8a99a7925aa9f77d52ea747 Mon Sep 17 00:00:00 2001 From: Las Zenow Date: Tue, 18 Feb 2014 19:05:27 +0100 Subject: [PATCH] Convert stats mapreduce to aggregations --- database.go | 4 +- mapreduce.go | 223 +++++++++++++++++++++++---------------------------- stats.go | 12 +-- 3 files changed, 108 insertions(+), 131 deletions(-) diff --git a/database.go b/database.go index e49b644..29b4fbb 100644 --- a/database.go +++ b/database.go @@ -256,8 +256,8 @@ func (d *DB) UpdateTags() error { } type Visits struct { - Date int64 "_id" - Count int "value" + Date time.Time "date" + Count int "count" } func (d *DB) GetHourVisits() ([]Visits, error) { diff --git a/mapreduce.go b/mapreduce.go index cf69442..b8eb10f 100644 --- a/mapreduce.go +++ b/mapreduce.go @@ -94,151 +94,128 @@ func (m *MR) updateMostBooks(statsColl *mgo.Collection, section string, resColl } func (m *MR) UpdateHourVisits(statsColl *mgo.Collection) error { - const numDays = 2 - start := time.Now().UTC().Add(-numDays * 24 * time.Hour) - - const reduce = `function(date, vals) { - var count = 0; - vals.forEach(function(v) { count += v; }); - return count; - }` - var mr mgo.MapReduce - mr.Map = `function() { - var date = Date.UTC(this.date.getUTCFullYear(), - this.date.getUTCMonth(), - this.date.getUTCDate(), - this.date.getUTCHours()); - emit({date: date, session: this.session}, 1); - }` - mr.Reduce = reduce - err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, HOURLY_VISITS_COLL+"_raw") - if err != nil { - return err + f := func(t time.Time) time.Time { + const span = time.Hour + return t.Add(span).Truncate(span) } - var mr2 mgo.MapReduce - mr2.Map = `function() { - emit(this['_id']['date'], 1); - }` - mr2.Reduce = reduce - hourly_raw := m.database.C(HOURLY_VISITS_COLL + "_raw") - return m.update(&mr2, bson.M{}, hourly_raw, HOURLY_VISITS_COLL) + const numDays = 2 + spanStore := numDays * 24 * time.Hour + return m.updateVisits(f, spanStore, HOURLY_VISITS_COLL, true) } func (m *MR) UpdateDayVisits(statsColl *mgo.Collection) error { - const numDays = 30 - start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour) - - const reduce = `function(date, vals) { - var count = 0; - vals.forEach(function(v) { count += v; }); - return count; - }` - var mr mgo.MapReduce - mr.Map = `function() { - var date = Date.UTC(this.date.getUTCFullYear(), - this.date.getUTCMonth(), - this.date.getUTCDate()); - emit({date: date, session: this.session}, 1); - }` - mr.Reduce = reduce - err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, DAILY_VISITS_COLL+"_raw") - if err != nil { - return err + f := func(t time.Time) time.Time { + const span = 24 * time.Hour + return t.Add(span).Truncate(span) } - var mr2 mgo.MapReduce - mr2.Map = `function() { - emit(this['_id']['date'], 1); - }` - mr2.Reduce = reduce - daily_raw := m.database.C(DAILY_VISITS_COLL + "_raw") - return m.update(&mr2, bson.M{}, daily_raw, DAILY_VISITS_COLL) + const numDays = 30 + spanStore := numDays * 24 * time.Hour + return m.updateVisits(f, spanStore, DAILY_VISITS_COLL, true) } func (m *MR) UpdateMonthVisits(statsColl *mgo.Collection) error { - const numDays = 365 - - start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour) - - const reduce = `function(date, vals) { - var count = 0; - vals.forEach(function(v) { count += v; }); - return count; - }` - var mr mgo.MapReduce - mr.Map = `function() { - var date = Date.UTC(this.date.getUTCFullYear(), - this.date.getUTCMonth()); - emit({date: date, session: this.session}, 1); - }` - mr.Reduce = reduce - err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, MONTHLY_VISITS_COLL+"_raw") - if err != nil { - return err + f := func(t time.Time) time.Time { + const span = 24 * time.Hour + return t.AddDate(0, 1, 1-t.Day()).Truncate(span) } - var mr2 mgo.MapReduce - mr2.Map = `function() { - emit(this['_id']['date'], 1); - }` - mr2.Reduce = reduce - monthly_raw := m.database.C(MONTHLY_VISITS_COLL + "_raw") - return m.update(&mr2, bson.M{}, monthly_raw, MONTHLY_VISITS_COLL) + const numDays = 365 + spanStore := numDays * 24 * time.Hour + return m.updateVisits(f, spanStore, MONTHLY_VISITS_COLL, true) } func (m *MR) UpdateHourDownloads(statsColl *mgo.Collection) error { + f := func(t time.Time) time.Time { + const span = time.Hour + return t.Add(span).Truncate(span) + } const numDays = 2 - start := time.Now().UTC().Add(-numDays * 24 * time.Hour) - - var mr mgo.MapReduce - mr.Map = `function() { - var date = Date.UTC(this.date.getUTCFullYear(), - this.date.getUTCMonth(), - this.date.getUTCDate(), - this.date.getUTCHours()); - emit(date, 1); - }` - mr.Reduce = `function(date, vals) { - var count = 0; - vals.forEach(function(v) { count += v; }); - return count; - }` - return m.update(&mr, bson.M{"date": bson.M{"$gte": start}, "section": "download"}, statsColl, HOURLY_DOWNLOADS_COLL) + spanStore := numDays * 24 * time.Hour + return m.updateVisits(f, spanStore, HOURLY_DOWNLOADS_COLL, false) } func (m *MR) UpdateDayDownloads(statsColl *mgo.Collection) error { + f := func(t time.Time) time.Time { + const span = 24 * time.Hour + return t.Add(span).Truncate(span) + } const numDays = 30 - start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour) - - var mr mgo.MapReduce - mr.Map = `function() { - var date = Date.UTC(this.date.getUTCFullYear(), - this.date.getUTCMonth(), - this.date.getUTCDate()); - emit(date, 1); - }` - mr.Reduce = `function(date, vals) { - var count = 0; - vals.forEach(function(v) { count += v; }); - return count; - }` - return m.update(&mr, bson.M{"date": bson.M{"$gte": start}, "section": "download"}, statsColl, DAILY_DOWNLOADS_COLL) + spanStore := numDays * 24 * time.Hour + return m.updateVisits(f, spanStore, DAILY_DOWNLOADS_COLL, false) } func (m *MR) UpdateMonthDownloads(statsColl *mgo.Collection) error { + f := func(t time.Time) time.Time { + const span = 24 * time.Hour + return t.AddDate(0, 1, 1-t.Day()).Truncate(span) + } const numDays = 365 - start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour) + spanStore := numDays * 24 * time.Hour + return m.updateVisits(f, spanStore, MONTHLY_DOWNLOADS_COLL, false) +} - var mr mgo.MapReduce - mr.Map = `function() { - var date = Date.UTC(this.date.getUTCFullYear(), - this.date.getUTCMonth()); - emit(date, 1); - }` - mr.Reduce = `function(date, vals) { - var count = 0; - vals.forEach(function(v) { count += v; }); - return count; - }` - return m.update(&mr, bson.M{"date": bson.M{"$gte": start}, "section": "download"}, statsColl, MONTHLY_DOWNLOADS_COLL) +func (m *MR) updateVisits(incTime func(time.Time) time.Time, spanStore time.Duration, coll string, useSession bool) error { + storeColl := m.database.C(coll) + start := m.calculateStart(spanStore, storeColl) + for start.Before(time.Now().UTC()) { + stop := incTime(start) + + var count int + var err error + if useSession { + count = m.countVisits(start, stop) + } else { + count, err = m.countDownloads(start, stop) + } + if err != nil { + return err + } + + err = storeColl.Insert(bson.M{"date": start, "count": count}) + if err != nil { + return err + } + + start = stop + } + + _, err := storeColl.RemoveAll(bson.M{"date": bson.M{"$lt": time.Now().UTC().Add(-spanStore)}}) + return err +} + +func (m *MR) calculateStart(spanStore time.Duration, storeColl *mgo.Collection) time.Time { + var date struct { + Id bson.ObjectId `bson:"_id"` + Date time.Time `bson:"date"` + } + err := storeColl.Find(bson.M{}).Sort("-date").One(&date) + if err == nil { + storeColl.RemoveId(date.Id) + return date.Date + } + return time.Now().UTC().Add(-spanStore).Truncate(time.Hour) +} + +func (m *MR) countVisits(start time.Time, stop time.Time) int { + statsColl := m.database.C(STATS_COLL) + var result struct { + Count int "count" + } + err := statsColl.Pipe([]bson.M{ + {"$match": bson.M{"date": bson.M{"$gte": start, "$lt": stop}}}, + {"$group": bson.M{"_id": "$session"}}, + {"$group": bson.M{"_id": 1, "count": bson.M{"$sum": 1}}}, + }).One(&result) + if err != nil { + return 0 + } + + return result.Count +} + +func (m *MR) countDownloads(start time.Time, stop time.Time) (int, error) { + query := bson.M{"date": bson.M{"$gte": start, "$lt": stop}, "section": "download"} + statsColl := m.database.C(STATS_COLL) + return statsColl.Find(query).Count() } func (m *MR) update(mr *mgo.MapReduce, query bson.M, queryColl *mgo.Collection, storeColl string) error { diff --git a/stats.go b/stats.go index 4218ddf..626aafa 100644 --- a/stats.go +++ b/stats.go @@ -100,7 +100,7 @@ func getHourlyVisits(db *DB) []visitData { visit, _ := db.GetHourVisits() for _, v := range visit { var elem visitData - hour := time.Unix(v.Date/1000, 0).UTC().Hour() + hour := v.Date.UTC().Hour() elem.Label = strconv.Itoa(hour + 1) elem.Count = v.Count visits = append(visits, elem) @@ -115,7 +115,7 @@ func getDailyVisits(db *DB) []visitData { visit, _ := db.GetDayVisits() for _, v := range visit { var elem visitData - day := time.Unix(v.Date/1000, 0).UTC().Day() + day := v.Date.UTC().Day() elem.Label = strconv.Itoa(day) elem.Count = v.Count visits = append(visits, elem) @@ -130,7 +130,7 @@ func getMonthlyVisits(db *DB) []visitData { visit, _ := db.GetMonthVisits() for _, v := range visit { var elem visitData - month := time.Unix(v.Date/1000, 0).UTC().Month() + month := v.Date.UTC().Month() elem.Label = month.String() elem.Count = v.Count visits = append(visits, elem) @@ -145,7 +145,7 @@ func getHourlyDownloads(db *DB) []visitData { visit, _ := db.GetHourDownloads() for _, v := range visit { var elem visitData - hour := time.Unix(v.Date/1000, 0).UTC().Hour() + hour := v.Date.UTC().Hour() elem.Label = strconv.Itoa(hour + 1) elem.Count = v.Count visits = append(visits, elem) @@ -160,7 +160,7 @@ func getDailyDownloads(db *DB) []visitData { visit, _ := db.GetDayDownloads() for _, v := range visit { var elem visitData - day := time.Unix(v.Date/1000, 0).UTC().Day() + day := v.Date.UTC().Day() elem.Label = strconv.Itoa(day) elem.Count = v.Count visits = append(visits, elem) @@ -175,7 +175,7 @@ func getMonthlyDownloads(db *DB) []visitData { visit, _ := db.GetMonthDownloads() for _, v := range visit { var elem visitData - month := time.Unix(v.Date/1000, 0).UTC().Month() + month := v.Date.UTC().Month() elem.Label = month.String() elem.Count = v.Count visits = append(visits, elem)