Convert stats mapreduce to aggregations

This commit is contained in:
Las Zenow 2014-02-18 19:05:27 +01:00
parent f2c728191c
commit 42de6bbf64
3 changed files with 108 additions and 131 deletions

View file

@ -256,8 +256,8 @@ func (d *DB) UpdateTags() error {
}
type Visits struct {
Date int64 "_id"
Count int "value"
Date time.Time "date"
Count int "count"
}
func (d *DB) GetHourVisits() ([]Visits, error) {

View file

@ -94,151 +94,128 @@ func (m *MR) updateMostBooks(statsColl *mgo.Collection, section string, resColl
}
func (m *MR) UpdateHourVisits(statsColl *mgo.Collection) error {
const numDays = 2
start := time.Now().UTC().Add(-numDays * 24 * time.Hour)
const reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
var mr mgo.MapReduce
mr.Map = `function() {
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth(),
this.date.getUTCDate(),
this.date.getUTCHours());
emit({date: date, session: this.session}, 1);
}`
mr.Reduce = reduce
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, HOURLY_VISITS_COLL+"_raw")
if err != nil {
return err
f := func(t time.Time) time.Time {
const span = time.Hour
return t.Add(span).Truncate(span)
}
var mr2 mgo.MapReduce
mr2.Map = `function() {
emit(this['_id']['date'], 1);
}`
mr2.Reduce = reduce
hourly_raw := m.database.C(HOURLY_VISITS_COLL + "_raw")
return m.update(&mr2, bson.M{}, hourly_raw, HOURLY_VISITS_COLL)
const numDays = 2
spanStore := numDays * 24 * time.Hour
return m.updateVisits(f, spanStore, HOURLY_VISITS_COLL, true)
}
func (m *MR) UpdateDayVisits(statsColl *mgo.Collection) error {
const numDays = 30
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
const reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
var mr mgo.MapReduce
mr.Map = `function() {
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth(),
this.date.getUTCDate());
emit({date: date, session: this.session}, 1);
}`
mr.Reduce = reduce
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, DAILY_VISITS_COLL+"_raw")
if err != nil {
return err
f := func(t time.Time) time.Time {
const span = 24 * time.Hour
return t.Add(span).Truncate(span)
}
var mr2 mgo.MapReduce
mr2.Map = `function() {
emit(this['_id']['date'], 1);
}`
mr2.Reduce = reduce
daily_raw := m.database.C(DAILY_VISITS_COLL + "_raw")
return m.update(&mr2, bson.M{}, daily_raw, DAILY_VISITS_COLL)
const numDays = 30
spanStore := numDays * 24 * time.Hour
return m.updateVisits(f, spanStore, DAILY_VISITS_COLL, true)
}
func (m *MR) UpdateMonthVisits(statsColl *mgo.Collection) error {
const numDays = 365
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
const reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
var mr mgo.MapReduce
mr.Map = `function() {
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth());
emit({date: date, session: this.session}, 1);
}`
mr.Reduce = reduce
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, MONTHLY_VISITS_COLL+"_raw")
if err != nil {
return err
f := func(t time.Time) time.Time {
const span = 24 * time.Hour
return t.AddDate(0, 1, 1-t.Day()).Truncate(span)
}
var mr2 mgo.MapReduce
mr2.Map = `function() {
emit(this['_id']['date'], 1);
}`
mr2.Reduce = reduce
monthly_raw := m.database.C(MONTHLY_VISITS_COLL + "_raw")
return m.update(&mr2, bson.M{}, monthly_raw, MONTHLY_VISITS_COLL)
const numDays = 365
spanStore := numDays * 24 * time.Hour
return m.updateVisits(f, spanStore, MONTHLY_VISITS_COLL, true)
}
func (m *MR) UpdateHourDownloads(statsColl *mgo.Collection) error {
f := func(t time.Time) time.Time {
const span = time.Hour
return t.Add(span).Truncate(span)
}
const numDays = 2
start := time.Now().UTC().Add(-numDays * 24 * time.Hour)
var mr mgo.MapReduce
mr.Map = `function() {
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth(),
this.date.getUTCDate(),
this.date.getUTCHours());
emit(date, 1);
}`
mr.Reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
return m.update(&mr, bson.M{"date": bson.M{"$gte": start}, "section": "download"}, statsColl, HOURLY_DOWNLOADS_COLL)
spanStore := numDays * 24 * time.Hour
return m.updateVisits(f, spanStore, HOURLY_DOWNLOADS_COLL, false)
}
func (m *MR) UpdateDayDownloads(statsColl *mgo.Collection) error {
f := func(t time.Time) time.Time {
const span = 24 * time.Hour
return t.Add(span).Truncate(span)
}
const numDays = 30
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
var mr mgo.MapReduce
mr.Map = `function() {
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth(),
this.date.getUTCDate());
emit(date, 1);
}`
mr.Reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
return m.update(&mr, bson.M{"date": bson.M{"$gte": start}, "section": "download"}, statsColl, DAILY_DOWNLOADS_COLL)
spanStore := numDays * 24 * time.Hour
return m.updateVisits(f, spanStore, DAILY_DOWNLOADS_COLL, false)
}
func (m *MR) UpdateMonthDownloads(statsColl *mgo.Collection) error {
f := func(t time.Time) time.Time {
const span = 24 * time.Hour
return t.AddDate(0, 1, 1-t.Day()).Truncate(span)
}
const numDays = 365
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
spanStore := numDays * 24 * time.Hour
return m.updateVisits(f, spanStore, MONTHLY_DOWNLOADS_COLL, false)
}
var mr mgo.MapReduce
mr.Map = `function() {
var date = Date.UTC(this.date.getUTCFullYear(),
this.date.getUTCMonth());
emit(date, 1);
}`
mr.Reduce = `function(date, vals) {
var count = 0;
vals.forEach(function(v) { count += v; });
return count;
}`
return m.update(&mr, bson.M{"date": bson.M{"$gte": start}, "section": "download"}, statsColl, MONTHLY_DOWNLOADS_COLL)
func (m *MR) updateVisits(incTime func(time.Time) time.Time, spanStore time.Duration, coll string, useSession bool) error {
storeColl := m.database.C(coll)
start := m.calculateStart(spanStore, storeColl)
for start.Before(time.Now().UTC()) {
stop := incTime(start)
var count int
var err error
if useSession {
count = m.countVisits(start, stop)
} else {
count, err = m.countDownloads(start, stop)
}
if err != nil {
return err
}
err = storeColl.Insert(bson.M{"date": start, "count": count})
if err != nil {
return err
}
start = stop
}
_, err := storeColl.RemoveAll(bson.M{"date": bson.M{"$lt": time.Now().UTC().Add(-spanStore)}})
return err
}
func (m *MR) calculateStart(spanStore time.Duration, storeColl *mgo.Collection) time.Time {
var date struct {
Id bson.ObjectId `bson:"_id"`
Date time.Time `bson:"date"`
}
err := storeColl.Find(bson.M{}).Sort("-date").One(&date)
if err == nil {
storeColl.RemoveId(date.Id)
return date.Date
}
return time.Now().UTC().Add(-spanStore).Truncate(time.Hour)
}
func (m *MR) countVisits(start time.Time, stop time.Time) int {
statsColl := m.database.C(STATS_COLL)
var result struct {
Count int "count"
}
err := statsColl.Pipe([]bson.M{
{"$match": bson.M{"date": bson.M{"$gte": start, "$lt": stop}}},
{"$group": bson.M{"_id": "$session"}},
{"$group": bson.M{"_id": 1, "count": bson.M{"$sum": 1}}},
}).One(&result)
if err != nil {
return 0
}
return result.Count
}
func (m *MR) countDownloads(start time.Time, stop time.Time) (int, error) {
query := bson.M{"date": bson.M{"$gte": start, "$lt": stop}, "section": "download"}
statsColl := m.database.C(STATS_COLL)
return statsColl.Find(query).Count()
}
func (m *MR) update(mr *mgo.MapReduce, query bson.M, queryColl *mgo.Collection, storeColl string) error {

View file

@ -100,7 +100,7 @@ func getHourlyVisits(db *DB) []visitData {
visit, _ := db.GetHourVisits()
for _, v := range visit {
var elem visitData
hour := time.Unix(v.Date/1000, 0).UTC().Hour()
hour := v.Date.UTC().Hour()
elem.Label = strconv.Itoa(hour + 1)
elem.Count = v.Count
visits = append(visits, elem)
@ -115,7 +115,7 @@ func getDailyVisits(db *DB) []visitData {
visit, _ := db.GetDayVisits()
for _, v := range visit {
var elem visitData
day := time.Unix(v.Date/1000, 0).UTC().Day()
day := v.Date.UTC().Day()
elem.Label = strconv.Itoa(day)
elem.Count = v.Count
visits = append(visits, elem)
@ -130,7 +130,7 @@ func getMonthlyVisits(db *DB) []visitData {
visit, _ := db.GetMonthVisits()
for _, v := range visit {
var elem visitData
month := time.Unix(v.Date/1000, 0).UTC().Month()
month := v.Date.UTC().Month()
elem.Label = month.String()
elem.Count = v.Count
visits = append(visits, elem)
@ -145,7 +145,7 @@ func getHourlyDownloads(db *DB) []visitData {
visit, _ := db.GetHourDownloads()
for _, v := range visit {
var elem visitData
hour := time.Unix(v.Date/1000, 0).UTC().Hour()
hour := v.Date.UTC().Hour()
elem.Label = strconv.Itoa(hour + 1)
elem.Count = v.Count
visits = append(visits, elem)
@ -160,7 +160,7 @@ func getDailyDownloads(db *DB) []visitData {
visit, _ := db.GetDayDownloads()
for _, v := range visit {
var elem visitData
day := time.Unix(v.Date/1000, 0).UTC().Day()
day := v.Date.UTC().Day()
elem.Label = strconv.Itoa(day)
elem.Count = v.Count
visits = append(visits, elem)
@ -175,7 +175,7 @@ func getMonthlyDownloads(db *DB) []visitData {
visit, _ := db.GetMonthDownloads()
for _, v := range visit {
var elem visitData
month := time.Unix(v.Date/1000, 0).UTC().Month()
month := v.Date.UTC().Month()
elem.Label = month.String()
elem.Count = v.Count
visits = append(visits, elem)