Update the mapreduce collections in background
This commit is contained in:
parent
9af72ea1b4
commit
4130e43689
5 changed files with 369 additions and 304 deletions
95
database.go
95
database.go
|
@ -173,9 +173,8 @@ func (d *DB) GetBooks(query bson.M, r ...int) (books []Book, num int, err error)
|
|||
/* Get the most visited books
|
||||
*/
|
||||
func (d *DB) GetVisitedBooks(num int) (books []Book, err error) {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
bookId, err := mr.GetMostVisited(num, statsColl)
|
||||
visitedColl := d.session.DB(DB_NAME).C(VISITED_COLL)
|
||||
bookId, err := GetBooksVisited(num, visitedColl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -189,12 +188,17 @@ func (d *DB) GetVisitedBooks(num int) (books []Book, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (d *DB) UpdateMostVisited() error {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.UpdateMostVisited(statsColl)
|
||||
}
|
||||
|
||||
/* Get the most downloaded books
|
||||
*/
|
||||
func (d *DB) GetDownloadedBooks(num int) (books []Book, err error) {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
bookId, err := mr.GetMostDownloaded(num, statsColl)
|
||||
downloadedColl := d.session.DB(DB_NAME).C(DOWNLOADED_COLL)
|
||||
bookId, err := GetBooksVisited(num, downloadedColl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -208,6 +212,12 @@ func (d *DB) GetDownloadedBooks(num int) (books []Book, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (d *DB) UpdateDownloadedBooks() error {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.UpdateMostDownloaded(statsColl)
|
||||
}
|
||||
|
||||
/* optional parameters: length and start index
|
||||
*
|
||||
* Returns: list of books, number found and err
|
||||
|
@ -231,9 +241,14 @@ func (d *DB) GetFS(prefix string) *mgo.GridFS {
|
|||
}
|
||||
|
||||
func (d *DB) GetTags(numTags int) ([]string, error) {
|
||||
tagsColl := d.session.DB(DB_NAME).C(TAGS_COLL)
|
||||
return GetTags(numTags, tagsColl)
|
||||
}
|
||||
|
||||
func (d *DB) UpdateTags() error {
|
||||
booksColl := d.session.DB(DB_NAME).C(BOOKS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.GetTags(numTags, booksColl)
|
||||
return mr.UpdateTags(booksColl)
|
||||
}
|
||||
|
||||
type Visits struct {
|
||||
|
@ -241,38 +256,68 @@ type Visits struct {
|
|||
Count int "value"
|
||||
}
|
||||
|
||||
func (d *DB) GetHourVisits(start time.Time) ([]Visits, error) {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.GetHourVisits(start, statsColl)
|
||||
func (d *DB) GetHourVisits() ([]Visits, error) {
|
||||
hourlyColl := d.session.DB(DB_NAME).C(HOURLY_VISITS_COLL)
|
||||
return GetVisits(hourlyColl)
|
||||
}
|
||||
|
||||
func (d *DB) GetDayVisits(start time.Time) ([]Visits, error) {
|
||||
func (d *DB) UpdateHourVisits() error {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.GetDayVisits(start, statsColl)
|
||||
return mr.UpdateHourVisits(statsColl)
|
||||
}
|
||||
|
||||
func (d *DB) GetMonthVisits(start time.Time) ([]Visits, error) {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.GetMonthVisits(start, statsColl)
|
||||
func (d *DB) GetDayVisits() ([]Visits, error) {
|
||||
dailyColl := d.session.DB(DB_NAME).C(DAILY_VISITS_COLL)
|
||||
return GetVisits(dailyColl)
|
||||
}
|
||||
|
||||
func (d *DB) GetHourDownloads(start time.Time) ([]Visits, error) {
|
||||
func (d *DB) UpdateDayVisits() error {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.GetHourDownloads(start, statsColl)
|
||||
return mr.UpdateDayVisits(statsColl)
|
||||
}
|
||||
|
||||
func (d *DB) GetDayDownloads(start time.Time) ([]Visits, error) {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.GetDayDowloads(start, statsColl)
|
||||
func (d *DB) GetMonthVisits() ([]Visits, error) {
|
||||
monthlyColl := d.session.DB(DB_NAME).C(MONTHLY_VISITS_COLL)
|
||||
return GetVisits(monthlyColl)
|
||||
}
|
||||
|
||||
func (d *DB) GetMonthDownloads(start time.Time) ([]Visits, error) {
|
||||
func (d *DB) UpdateMonthVisits() error {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.GetMonthDowloads(start, statsColl)
|
||||
return mr.UpdateMonthVisits(statsColl)
|
||||
}
|
||||
|
||||
func (d *DB) GetHourDownloads() ([]Visits, error) {
|
||||
hourlyColl := d.session.DB(DB_NAME).C(HOURLY_DOWNLOADS_COLL)
|
||||
return GetVisits(hourlyColl)
|
||||
}
|
||||
|
||||
func (d *DB) UpdateHourDownloads() error {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.UpdateHourDownloads(statsColl)
|
||||
}
|
||||
|
||||
func (d *DB) GetDayDownloads() ([]Visits, error) {
|
||||
dailyColl := d.session.DB(DB_NAME).C(DAILY_DOWNLOADS_COLL)
|
||||
return GetVisits(dailyColl)
|
||||
}
|
||||
|
||||
func (d *DB) UpdateDayDownloads() error {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.UpdateDayDownloads(statsColl)
|
||||
}
|
||||
|
||||
func (d *DB) GetMonthDownloads() ([]Visits, error) {
|
||||
monthlyColl := d.session.DB(DB_NAME).C(MONTHLY_DOWNLOADS_COLL)
|
||||
return GetVisits(monthlyColl)
|
||||
}
|
||||
|
||||
func (d *DB) UpdateMonthDownloads() error {
|
||||
statsColl := d.session.DB(DB_NAME).C(STATS_COLL)
|
||||
mr := NewMR(d.session.DB(DB_NAME))
|
||||
return mr.UpdateMonthDownloads(statsColl)
|
||||
}
|
||||
|
|
462
mapreduce.go
462
mapreduce.go
|
@ -6,39 +6,10 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
type MR struct {
|
||||
database *mgo.Database
|
||||
}
|
||||
|
||||
func NewMR(database *mgo.Database) *MR {
|
||||
m := new(MR)
|
||||
m.database = database
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *MR) GetTags(numTags int, booksColl *mgo.Collection) ([]string, error) {
|
||||
if m.isOutdated(TAGS_COLL, MINUTES_UPDATE_TAGS) {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.subject) {
|
||||
this.subject.forEach(function(s) { emit(s, 1); });
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(tag, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function() { count += 1; });
|
||||
return count;
|
||||
}`
|
||||
err := m.update(&mr, bson.M{"active": true}, booksColl, TAGS_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func GetTags(numTags int, tagsColl *mgo.Collection) ([]string, error) {
|
||||
var result []struct {
|
||||
Tag string "_id"
|
||||
}
|
||||
tagsColl := m.database.C(TAGS_COLL)
|
||||
err := tagsColl.Find(nil).Sort("-value").Limit(numTags).All(&result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -51,27 +22,10 @@ func (m *MR) GetTags(numTags int, booksColl *mgo.Collection) ([]string, error) {
|
|||
return tags, nil
|
||||
}
|
||||
|
||||
func (m *MR) GetMostVisited(num int, statsColl *mgo.Collection) ([]bson.ObjectId, error) {
|
||||
if m.isOutdated(VISITED_COLL, MINUTES_UPDATE_VISITED) {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
emit(this.id, 1);
|
||||
}`
|
||||
mr.Reduce = `function(tag, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function() { count += 1; });
|
||||
return count;
|
||||
}`
|
||||
err := m.update(&mr, bson.M{"section": "book"}, statsColl, VISITED_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func GetBooksVisited(num int, visitedColl *mgo.Collection) ([]bson.ObjectId, error) {
|
||||
var result []struct {
|
||||
Book bson.ObjectId "_id"
|
||||
}
|
||||
visitedColl := m.database.C(VISITED_COLL)
|
||||
err := visitedColl.Find(nil).Sort("-value").Limit(num).All(&result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -84,232 +38,218 @@ func (m *MR) GetMostVisited(num int, statsColl *mgo.Collection) ([]bson.ObjectId
|
|||
return books, nil
|
||||
}
|
||||
|
||||
func (m *MR) GetMostDownloaded(num int, statsColl *mgo.Collection) ([]bson.ObjectId, error) {
|
||||
if m.isOutdated(DOWNLOADED_COLL, MINUTES_UPDATE_DOWNLOADED) {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
emit(this.id, 1);
|
||||
}`
|
||||
mr.Reduce = `function(tag, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function() { count += 1; });
|
||||
return count;
|
||||
}`
|
||||
err := m.update(&mr, bson.M{"section": "download"}, statsColl, DOWNLOADED_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
func GetVisits(visitsColl *mgo.Collection) ([]Visits, error) {
|
||||
var result []Visits
|
||||
err := visitsColl.Find(nil).All(&result)
|
||||
return result, err
|
||||
}
|
||||
|
||||
var result []struct {
|
||||
Book bson.ObjectId "_id"
|
||||
}
|
||||
downloadedColl := m.database.C(DOWNLOADED_COLL)
|
||||
err := downloadedColl.Find(nil).Sort("-value").Limit(num).All(&result)
|
||||
type MR struct {
|
||||
database *mgo.Database
|
||||
}
|
||||
|
||||
func NewMR(database *mgo.Database) *MR {
|
||||
m := new(MR)
|
||||
m.database = database
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *MR) UpdateTags(booksColl *mgo.Collection) error {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.subject) {
|
||||
this.subject.forEach(function(s) { emit(s, 1); });
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(tag, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function() { count += 1; });
|
||||
return count;
|
||||
}`
|
||||
return m.update(&mr, bson.M{"active": true}, booksColl, TAGS_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) UpdateMostVisited(statsColl *mgo.Collection) error {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.id) {
|
||||
emit(this.id, 1);
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(tag, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function() { count += 1; });
|
||||
return count;
|
||||
}`
|
||||
return m.update(&mr, bson.M{"section": "book"}, statsColl, VISITED_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) UpdateMostDownloaded(statsColl *mgo.Collection) error {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
emit(this.id, 1);
|
||||
}`
|
||||
mr.Reduce = `function(tag, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function() { count += 1; });
|
||||
return count;
|
||||
}`
|
||||
return m.update(&mr, bson.M{"section": "download"}, statsColl, DOWNLOADED_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) UpdateHourVisits(statsColl *mgo.Collection) error {
|
||||
const numDays = 2
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour)
|
||||
|
||||
const reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth(),
|
||||
this.date.getUTCDate(),
|
||||
this.date.getUTCHours());
|
||||
emit({date: date, session: this.session}, 1);
|
||||
}`
|
||||
mr.Reduce = reduce
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, HOURLY_VISITS_COLL+"_raw")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
books := make([]bson.ObjectId, len(result))
|
||||
for i, r := range result {
|
||||
books[i] = r.Book
|
||||
}
|
||||
return books, nil
|
||||
var mr2 mgo.MapReduce
|
||||
mr2.Map = `function() {
|
||||
emit(this['_id']['date'], 1);
|
||||
}`
|
||||
mr2.Reduce = reduce
|
||||
hourly_raw := m.database.C(HOURLY_VISITS_COLL + "_raw")
|
||||
return m.update(&mr2, bson.M{}, hourly_raw, HOURLY_VISITS_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) GetHourVisits(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
|
||||
if m.isOutdated(HOURLY_VISITS_COLL, MINUTES_UPDATE_HOURLY) {
|
||||
const reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth(),
|
||||
this.date.getUTCDate(),
|
||||
func (m *MR) UpdateDayVisits(statsColl *mgo.Collection) error {
|
||||
const numDays = 30
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
|
||||
|
||||
const reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth(),
|
||||
this.date.getUTCDate());
|
||||
emit({date: date, session: this.session}, 1);
|
||||
}`
|
||||
mr.Reduce = reduce
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, DAILY_VISITS_COLL+"_raw")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var mr2 mgo.MapReduce
|
||||
mr2.Map = `function() {
|
||||
emit(this['_id']['date'], 1);
|
||||
}`
|
||||
mr2.Reduce = reduce
|
||||
daily_raw := m.database.C(DAILY_VISITS_COLL + "_raw")
|
||||
return m.update(&mr2, bson.M{}, daily_raw, DAILY_VISITS_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) UpdateMonthVisits(statsColl *mgo.Collection) error {
|
||||
const numDays = 365
|
||||
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
|
||||
|
||||
const reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth());
|
||||
emit({date: date, session: this.session}, 1);
|
||||
}`
|
||||
mr.Reduce = reduce
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, MONTHLY_VISITS_COLL+"_raw")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var mr2 mgo.MapReduce
|
||||
mr2.Map = `function() {
|
||||
emit(this['_id']['date'], 1);
|
||||
}`
|
||||
mr2.Reduce = reduce
|
||||
monthly_raw := m.database.C(MONTHLY_VISITS_COLL + "_raw")
|
||||
return m.update(&mr2, bson.M{}, monthly_raw, MONTHLY_VISITS_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) UpdateHourDownloads(statsColl *mgo.Collection) error {
|
||||
const numDays = 2
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour)
|
||||
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.section == "download") {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth(),
|
||||
this.date.getUTCDate(),
|
||||
this.date.getUTCHours());
|
||||
emit({date: date, session: this.session}, 1);
|
||||
}`
|
||||
mr.Reduce = reduce
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, HOURLY_VISITS_COLL+"_raw")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var mr2 mgo.MapReduce
|
||||
mr2.Map = `function() {
|
||||
emit(this['_id']['date'], 1);
|
||||
}`
|
||||
mr2.Reduce = reduce
|
||||
hourly_raw := m.database.C(HOURLY_VISITS_COLL + "_raw")
|
||||
err = m.update(&mr2, bson.M{}, hourly_raw, HOURLY_VISITS_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var result []Visits
|
||||
hourlyColl := m.database.C(HOURLY_VISITS_COLL)
|
||||
err := hourlyColl.Find(nil).All(&result)
|
||||
return result, err
|
||||
emit(date, 1);
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
return m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, HOURLY_DOWNLOADS_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) GetDayVisits(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
|
||||
if m.isOutdated(DAILY_VISITS_COLL, MINUTES_UPDATE_DAILY) {
|
||||
const reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth(),
|
||||
this.date.getUTCDate());
|
||||
emit({date: date, session: this.session}, 1);
|
||||
}`
|
||||
mr.Reduce = reduce
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, DAILY_VISITS_COLL+"_raw")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var mr2 mgo.MapReduce
|
||||
mr2.Map = `function() {
|
||||
emit(this['_id']['date'], 1);
|
||||
}`
|
||||
mr2.Reduce = reduce
|
||||
daily_raw := m.database.C(DAILY_VISITS_COLL + "_raw")
|
||||
err = m.update(&mr2, bson.M{}, daily_raw, DAILY_VISITS_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
func (m *MR) UpdateDayDownloads(statsColl *mgo.Collection) error {
|
||||
const numDays = 30
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
|
||||
|
||||
var result []Visits
|
||||
dailyColl := m.database.C(DAILY_VISITS_COLL)
|
||||
err := dailyColl.Find(nil).All(&result)
|
||||
return result, err
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.section == "download") {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth(),
|
||||
this.date.getUTCDate());
|
||||
emit(date, 1);
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
return m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, DAILY_DOWNLOADS_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) GetMonthVisits(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
|
||||
if m.isOutdated(MONTHLY_VISITS_COLL, MINUTES_UPDATE_MONTHLY) {
|
||||
const reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth());
|
||||
emit({date: date, session: this.session}, 1);
|
||||
}`
|
||||
mr.Reduce = reduce
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, MONTHLY_VISITS_COLL+"_raw")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var mr2 mgo.MapReduce
|
||||
mr2.Map = `function() {
|
||||
emit(this['_id']['date'], 1);
|
||||
}`
|
||||
mr2.Reduce = reduce
|
||||
monthly_raw := m.database.C(MONTHLY_VISITS_COLL + "_raw")
|
||||
err = m.update(&mr2, bson.M{}, monthly_raw, MONTHLY_VISITS_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
func (m *MR) UpdateMonthDownloads(statsColl *mgo.Collection) error {
|
||||
const numDays = 365
|
||||
|
||||
var result []Visits
|
||||
monthlyColl := m.database.C(MONTHLY_VISITS_COLL)
|
||||
err := monthlyColl.Find(nil).All(&result)
|
||||
return result, err
|
||||
}
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
|
||||
|
||||
func (m *MR) GetHourDownloads(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
|
||||
if m.isOutdated(HOURLY_DOWNLOADS_COLL, MINUTES_UPDATE_HOURLY) {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.section == "download") {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth(),
|
||||
this.date.getUTCDate(),
|
||||
this.date.getUTCHours());
|
||||
emit(date, 1);
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, HOURLY_DOWNLOADS_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var result []Visits
|
||||
hourlyColl := m.database.C(HOURLY_DOWNLOADS_COLL)
|
||||
err := hourlyColl.Find(nil).All(&result)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (m *MR) GetDayDowloads(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
|
||||
if m.isOutdated(DAILY_DOWNLOADS_COLL, MINUTES_UPDATE_DAILY) {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.section == "download") {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth(),
|
||||
this.date.getUTCDate());
|
||||
emit(date, 1);
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, DAILY_DOWNLOADS_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var result []Visits
|
||||
dailyColl := m.database.C(DAILY_DOWNLOADS_COLL)
|
||||
err := dailyColl.Find(nil).All(&result)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (m *MR) GetMonthDowloads(start time.Time, statsColl *mgo.Collection) ([]Visits, error) {
|
||||
if m.isOutdated(MONTHLY_DOWNLOADS_COLL, MINUTES_UPDATE_MONTHLY) {
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.section == "download") {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth());
|
||||
emit(date, 1);
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
err := m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, MONTHLY_DOWNLOADS_COLL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var result []Visits
|
||||
monthlyColl := m.database.C(MONTHLY_DOWNLOADS_COLL)
|
||||
err := monthlyColl.Find(nil).All(&result)
|
||||
return result, err
|
||||
var mr mgo.MapReduce
|
||||
mr.Map = `function() {
|
||||
if (this.section == "download") {
|
||||
var date = Date.UTC(this.date.getUTCFullYear(),
|
||||
this.date.getUTCMonth());
|
||||
emit(date, 1);
|
||||
}
|
||||
}`
|
||||
mr.Reduce = `function(date, vals) {
|
||||
var count = 0;
|
||||
vals.forEach(function(v) { count += v; });
|
||||
return count;
|
||||
}`
|
||||
return m.update(&mr, bson.M{"date": bson.M{"$gte": start}}, statsColl, MONTHLY_DOWNLOADS_COLL)
|
||||
}
|
||||
|
||||
func (m *MR) update(mr *mgo.MapReduce, query bson.M, queryColl *mgo.Collection, storeColl string) error {
|
||||
|
|
24
stats.go
24
stats.go
|
@ -91,11 +91,9 @@ type visitData struct {
|
|||
}
|
||||
|
||||
func getHourlyVisits(db *DB) []visitData {
|
||||
const numDays = 2
|
||||
var visits []visitData
|
||||
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour)
|
||||
visit, _ := db.GetHourVisits(start)
|
||||
visit, _ := db.GetHourVisits()
|
||||
for _, v := range visit {
|
||||
var elem visitData
|
||||
hour := time.Unix(v.Date/1000, 0).UTC().Hour()
|
||||
|
@ -108,11 +106,9 @@ func getHourlyVisits(db *DB) []visitData {
|
|||
}
|
||||
|
||||
func getDailyVisits(db *DB) []visitData {
|
||||
const numDays = 30
|
||||
var visits []visitData
|
||||
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
|
||||
visit, _ := db.GetDayVisits(start)
|
||||
visit, _ := db.GetDayVisits()
|
||||
for _, v := range visit {
|
||||
var elem visitData
|
||||
day := time.Unix(v.Date/1000, 0).UTC().Day()
|
||||
|
@ -125,11 +121,9 @@ func getDailyVisits(db *DB) []visitData {
|
|||
}
|
||||
|
||||
func getMonthlyVisits(db *DB) []visitData {
|
||||
const numDays = 365
|
||||
var visits []visitData
|
||||
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
|
||||
visit, _ := db.GetMonthVisits(start)
|
||||
visit, _ := db.GetMonthVisits()
|
||||
for _, v := range visit {
|
||||
var elem visitData
|
||||
month := time.Unix(v.Date/1000, 0).UTC().Month()
|
||||
|
@ -142,11 +136,9 @@ func getMonthlyVisits(db *DB) []visitData {
|
|||
}
|
||||
|
||||
func getHourlyDownloads(db *DB) []visitData {
|
||||
const numDays = 2
|
||||
var visits []visitData
|
||||
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour)
|
||||
visit, _ := db.GetHourDownloads(start)
|
||||
visit, _ := db.GetHourDownloads()
|
||||
for _, v := range visit {
|
||||
var elem visitData
|
||||
hour := time.Unix(v.Date/1000, 0).UTC().Hour()
|
||||
|
@ -159,11 +151,9 @@ func getHourlyDownloads(db *DB) []visitData {
|
|||
}
|
||||
|
||||
func getDailyDownloads(db *DB) []visitData {
|
||||
const numDays = 30
|
||||
var visits []visitData
|
||||
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
|
||||
visit, _ := db.GetDayDownloads(start)
|
||||
visit, _ := db.GetDayDownloads()
|
||||
for _, v := range visit {
|
||||
var elem visitData
|
||||
day := time.Unix(v.Date/1000, 0).UTC().Day()
|
||||
|
@ -176,11 +166,9 @@ func getDailyDownloads(db *DB) []visitData {
|
|||
}
|
||||
|
||||
func getMonthlyDownloads(db *DB) []visitData {
|
||||
const numDays = 365
|
||||
var visits []visitData
|
||||
|
||||
start := time.Now().UTC().Add(-numDays * 24 * time.Hour).Truncate(24 * time.Hour)
|
||||
visit, _ := db.GetMonthDownloads(start)
|
||||
visit, _ := db.GetMonthDownloads()
|
||||
for _, v := range visit {
|
||||
var elem visitData
|
||||
month := time.Unix(v.Date/1000, 0).UTC().Month()
|
||||
|
|
91
tasker.go
Normal file
91
tasker.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func InitTasks(db *DB) {
|
||||
initTagsTask(db)
|
||||
initVisitedTask(db)
|
||||
initDownloadedTask(db)
|
||||
initHourVisitsTask(db)
|
||||
initDayVisitsTask(db)
|
||||
initMonthVisitsTask(db)
|
||||
initHourDownloadsTask(db)
|
||||
initDayDownloadsTask(db)
|
||||
initMonthDownloadsTask(db)
|
||||
}
|
||||
|
||||
func initTagsTask(db *DB) {
|
||||
updateTags := func() {
|
||||
db.UpdateTags()
|
||||
}
|
||||
periodicTask(updateTags, MINUTES_UPDATE_TAGS*time.Minute)
|
||||
}
|
||||
|
||||
func initVisitedTask(db *DB) {
|
||||
updateVisited := func() {
|
||||
db.UpdateMostVisited()
|
||||
}
|
||||
periodicTask(updateVisited, MINUTES_UPDATE_VISITED*time.Minute)
|
||||
}
|
||||
|
||||
func initDownloadedTask(db *DB) {
|
||||
updateDownloaded := func() {
|
||||
db.UpdateDownloadedBooks()
|
||||
}
|
||||
periodicTask(updateDownloaded, MINUTES_UPDATE_DOWNLOADED*time.Minute)
|
||||
}
|
||||
|
||||
func initHourVisitsTask(db *DB) {
|
||||
updateHourVisits := func() {
|
||||
db.UpdateHourVisits()
|
||||
}
|
||||
periodicTask(updateHourVisits, MINUTES_UPDATE_HOURLY*time.Minute)
|
||||
}
|
||||
|
||||
func initDayVisitsTask(db *DB) {
|
||||
updateDayVisits := func() {
|
||||
db.UpdateDayVisits()
|
||||
}
|
||||
periodicTask(updateDayVisits, MINUTES_UPDATE_HOURLY*time.Minute)
|
||||
}
|
||||
|
||||
func initMonthVisitsTask(db *DB) {
|
||||
updateMonthVisits := func() {
|
||||
db.UpdateMonthVisits()
|
||||
}
|
||||
periodicTask(updateMonthVisits, MINUTES_UPDATE_HOURLY*time.Minute)
|
||||
}
|
||||
|
||||
func initHourDownloadsTask(db *DB) {
|
||||
updateHourDownloads := func() {
|
||||
db.UpdateHourDownloads()
|
||||
}
|
||||
periodicTask(updateHourDownloads, MINUTES_UPDATE_HOURLY*time.Minute)
|
||||
}
|
||||
|
||||
func initDayDownloadsTask(db *DB) {
|
||||
updateDayDownloads := func() {
|
||||
db.UpdateDayDownloads()
|
||||
}
|
||||
periodicTask(updateDayDownloads, MINUTES_UPDATE_HOURLY*time.Minute)
|
||||
}
|
||||
|
||||
func initMonthDownloadsTask(db *DB) {
|
||||
updateMonthDownloads := func() {
|
||||
db.UpdateMonthDownloads()
|
||||
}
|
||||
periodicTask(updateMonthDownloads, MINUTES_UPDATE_HOURLY*time.Minute)
|
||||
}
|
||||
|
||||
func periodicTask(task func(), periodicity time.Duration) {
|
||||
go tasker(task, periodicity)
|
||||
}
|
||||
|
||||
func tasker(task func(), periodicity time.Duration) {
|
||||
for true {
|
||||
time.Sleep(periodicity)
|
||||
task()
|
||||
}
|
||||
}
|
|
@ -148,6 +148,7 @@ func main() {
|
|||
db := initDB()
|
||||
defer db.Close()
|
||||
|
||||
InitTasks(db)
|
||||
InitStats(db)
|
||||
InitUpload(db)
|
||||
|
||||
|
|
Reference in a new issue