mirror of
https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake.git
synced 2025-10-14 14:11:23 -04:00
It was silently exiting at the "recordingStart":"2022-09-23T17:06:59.680537075Z" line, the first line whose length (66873) exceeds bufio.MaxScanTokenSize. Now distinctcounter exits with an error status instead of reporting partial results. $ ./distinctcounter -from 2023-01-01T00:00:00Z -to 2023-01-10T00:00:00Z -in metrics-ip-salted.jsonl 2023/04/20 13:54:11 unable to count:bufio.Scanner: token too long
64 lines
1.3 KiB
Go
64 lines
1.3 KiB
Go
package sinkcluster
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"github.com/clarkduvall/hyperloglog"
|
|
"io"
|
|
"time"
|
|
)
|
|
|
|
func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter {
|
|
return &ClusterCounter{from: from, to: to}
|
|
}
|
|
|
|
type ClusterCounter struct {
|
|
from time.Time
|
|
to time.Time
|
|
}
|
|
|
|
type ClusterCountResult struct {
|
|
Sum uint64
|
|
ChunkIncluded int64
|
|
}
|
|
|
|
func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) {
|
|
result := ClusterCountResult{}
|
|
counter, err := hyperloglog.NewPlus(18)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
inputScanner := bufio.NewScanner(reader)
|
|
for inputScanner.Scan() {
|
|
inputLine := inputScanner.Bytes()
|
|
sinkInfo := SinkEntry{}
|
|
if err := json.Unmarshal(inputLine, &sinkInfo); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) ||
|
|
sinkInfo.RecordingEnd.After(c.to) {
|
|
continue
|
|
}
|
|
|
|
restoredCounter, err := hyperloglog.NewPlus(18)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = restoredCounter.GobDecode(sinkInfo.Recorded)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result.ChunkIncluded++
|
|
err = counter.Merge(restoredCounter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
err = inputScanner.Err()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result.Sum = counter.Count()
|
|
return &result, nil
|
|
}
|