snowflake/common/ipsetsink/sinkcluster/reader.go
David Fifield 8e5ea82611 Add a scanner error check to ClusterCounter.Count.
It was silently exiting at the "recordingStart":"2022-09-23T17:06:59.680537075Z"
line, the first line whose length (66873) exceeds
bufio.MaxScanTokenSize. Now distinctcounter exits with an error status
instead of reporting partial results.

$ ./distinctcounter -from 2023-01-01T00:00:00Z -to 2023-01-10T00:00:00Z -in metrics-ip-salted.jsonl
2023/04/20 13:54:11 unable to count:bufio.Scanner: token too long
2023-04-20 11:28:58 -04:00

64 lines
1.3 KiB
Go

package sinkcluster
import (
"bufio"
"encoding/json"
"github.com/clarkduvall/hyperloglog"
"io"
"time"
)
func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter {
return &ClusterCounter{from: from, to: to}
}
type ClusterCounter struct {
from time.Time
to time.Time
}
type ClusterCountResult struct {
Sum uint64
ChunkIncluded int64
}
func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) {
result := ClusterCountResult{}
counter, err := hyperloglog.NewPlus(18)
if err != nil {
return nil, err
}
inputScanner := bufio.NewScanner(reader)
for inputScanner.Scan() {
inputLine := inputScanner.Bytes()
sinkInfo := SinkEntry{}
if err := json.Unmarshal(inputLine, &sinkInfo); err != nil {
return nil, err
}
if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) ||
sinkInfo.RecordingEnd.After(c.to) {
continue
}
restoredCounter, err := hyperloglog.NewPlus(18)
if err != nil {
return nil, err
}
err = restoredCounter.GobDecode(sinkInfo.Recorded)
if err != nil {
return nil, err
}
result.ChunkIncluded++
err = counter.Merge(restoredCounter)
if err != nil {
return nil, err
}
}
err = inputScanner.Err()
if err != nil {
return nil, err
}
result.Sum = counter.Count()
return &result, nil
}