mirror of
https://github.com/42wim/matterbridge.git
synced 2025-01-25 19:44:21 +01:00
665 lines
18 KiB
Go
665 lines
18 KiB
Go
|
package logr
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"os"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/wiggin77/cfg"
|
||
|
"github.com/wiggin77/merror"
|
||
|
)
|
||
|
|
||
|
// Logr maintains a list of log targets and accepts incoming
|
||
|
// log records.
|
||
|
type Logr struct {
|
||
|
tmux sync.RWMutex // target mutex
|
||
|
targets []Target
|
||
|
|
||
|
mux sync.RWMutex
|
||
|
maxQueueSizeActual int
|
||
|
in chan *LogRec
|
||
|
done chan struct{}
|
||
|
once sync.Once
|
||
|
shutdown bool
|
||
|
lvlCache levelCache
|
||
|
|
||
|
metricsInitOnce sync.Once
|
||
|
metricsCloseOnce sync.Once
|
||
|
metricsDone chan struct{}
|
||
|
metrics MetricsCollector
|
||
|
queueSizeGauge Gauge
|
||
|
loggedCounter Counter
|
||
|
errorCounter Counter
|
||
|
|
||
|
bufferPool sync.Pool
|
||
|
|
||
|
// MaxQueueSize is the maximum number of log records that can be queued.
|
||
|
// If exceeded, `OnQueueFull` is called which determines if the log
|
||
|
// record will be dropped or block until add is successful.
|
||
|
// If this is modified, it must be done before `Configure` or
|
||
|
// `AddTarget`. Defaults to DefaultMaxQueueSize.
|
||
|
MaxQueueSize int
|
||
|
|
||
|
// OnLoggerError, when not nil, is called any time an internal
|
||
|
// logging error occurs. For example, this can happen when a
|
||
|
// target cannot connect to its data sink.
|
||
|
OnLoggerError func(error)
|
||
|
|
||
|
// OnQueueFull, when not nil, is called on an attempt to add
|
||
|
// a log record to a full Logr queue.
|
||
|
// `MaxQueueSize` can be used to modify the maximum queue size.
|
||
|
// This function should return quickly, with a bool indicating whether
|
||
|
// the log record should be dropped (true) or block until the log record
|
||
|
// is successfully added (false). If nil then blocking (false) is assumed.
|
||
|
OnQueueFull func(rec *LogRec, maxQueueSize int) bool
|
||
|
|
||
|
// OnTargetQueueFull, when not nil, is called on an attempt to add
|
||
|
// a log record to a full target queue provided the target supports reporting
|
||
|
// this condition.
|
||
|
// This function should return quickly, with a bool indicating whether
|
||
|
// the log record should be dropped (true) or block until the log record
|
||
|
// is successfully added (false). If nil then blocking (false) is assumed.
|
||
|
OnTargetQueueFull func(target Target, rec *LogRec, maxQueueSize int) bool
|
||
|
|
||
|
// OnExit, when not nil, is called when a FatalXXX style log API is called.
|
||
|
// When nil, then the default behavior is to cleanly shut down this Logr and
|
||
|
// call `os.Exit(code)`.
|
||
|
OnExit func(code int)
|
||
|
|
||
|
// OnPanic, when not nil, is called when a PanicXXX style log API is called.
|
||
|
// When nil, then the default behavior is to cleanly shut down this Logr and
|
||
|
// call `panic(err)`.
|
||
|
OnPanic func(err interface{})
|
||
|
|
||
|
// EnqueueTimeout is the amount of time a log record can take to be queued.
|
||
|
// This only applies to blocking enqueue which happen after `logr.OnQueueFull`
|
||
|
// is called and returns false.
|
||
|
EnqueueTimeout time.Duration
|
||
|
|
||
|
// ShutdownTimeout is the amount of time `logr.Shutdown` can execute before
|
||
|
// timing out.
|
||
|
ShutdownTimeout time.Duration
|
||
|
|
||
|
// FlushTimeout is the amount of time `logr.Flush` can execute before
|
||
|
// timing out.
|
||
|
FlushTimeout time.Duration
|
||
|
|
||
|
// UseSyncMapLevelCache can be set to true before the first target is added
|
||
|
// when high concurrency (e.g. >32 cores) is expected. This may improve
|
||
|
// performance with large numbers of cores - benchmark for your use case.
|
||
|
UseSyncMapLevelCache bool
|
||
|
|
||
|
// MaxPooledFormatBuffer determines the maximum size of a buffer that can be
|
||
|
// pooled. To reduce allocations, the buffers needed during formatting (etc)
|
||
|
// are pooled. A very large log item will grow a buffer that could stay in
|
||
|
// memory indefinitely. This settings lets you control how big a pooled buffer
|
||
|
// can be - anything larger will be garbage collected after use.
|
||
|
// Defaults to 1MB.
|
||
|
MaxPooledBuffer int
|
||
|
|
||
|
// DisableBufferPool when true disables the buffer pool. See MaxPooledBuffer.
|
||
|
DisableBufferPool bool
|
||
|
|
||
|
// MetricsUpdateFreqMillis determines how often polled metrics are updated
|
||
|
// when metrics are enabled.
|
||
|
MetricsUpdateFreqMillis int64
|
||
|
}
|
||
|
|
||
|
// Configure adds/removes targets via the supplied `Config`.
|
||
|
func (logr *Logr) Configure(config *cfg.Config) error {
|
||
|
// TODO
|
||
|
return fmt.Errorf("not implemented yet")
|
||
|
}
|
||
|
|
||
|
func (logr *Logr) ensureInit() {
|
||
|
logr.once.Do(func() {
|
||
|
defer func() {
|
||
|
go logr.start()
|
||
|
}()
|
||
|
|
||
|
logr.mux.Lock()
|
||
|
defer logr.mux.Unlock()
|
||
|
|
||
|
logr.maxQueueSizeActual = logr.MaxQueueSize
|
||
|
if logr.maxQueueSizeActual == 0 {
|
||
|
logr.maxQueueSizeActual = DefaultMaxQueueSize
|
||
|
}
|
||
|
|
||
|
if logr.maxQueueSizeActual < 0 {
|
||
|
logr.maxQueueSizeActual = 0
|
||
|
}
|
||
|
|
||
|
logr.in = make(chan *LogRec, logr.maxQueueSizeActual)
|
||
|
logr.done = make(chan struct{})
|
||
|
|
||
|
if logr.UseSyncMapLevelCache {
|
||
|
logr.lvlCache = &syncMapLevelCache{}
|
||
|
} else {
|
||
|
logr.lvlCache = &arrayLevelCache{}
|
||
|
}
|
||
|
|
||
|
if logr.MaxPooledBuffer == 0 {
|
||
|
logr.MaxPooledBuffer = DefaultMaxPooledBuffer
|
||
|
}
|
||
|
logr.bufferPool = sync.Pool{
|
||
|
New: func() interface{} {
|
||
|
return new(bytes.Buffer)
|
||
|
},
|
||
|
}
|
||
|
|
||
|
logr.lvlCache.setup()
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// AddTarget adds one or more targets to the logger which will receive
|
||
|
// log records for outputting.
|
||
|
func (logr *Logr) AddTarget(targets ...Target) error {
|
||
|
if logr.IsShutdown() {
|
||
|
return fmt.Errorf("AddTarget called after Logr shut down")
|
||
|
}
|
||
|
|
||
|
logr.ensureInit()
|
||
|
metrics := logr.getMetricsCollector()
|
||
|
defer logr.ResetLevelCache() // call this after tmux is released
|
||
|
|
||
|
logr.tmux.Lock()
|
||
|
defer logr.tmux.Unlock()
|
||
|
|
||
|
errs := merror.New()
|
||
|
for _, t := range targets {
|
||
|
if t == nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
logr.targets = append(logr.targets, t)
|
||
|
if metrics != nil {
|
||
|
if tm, ok := t.(TargetWithMetrics); ok {
|
||
|
if err := tm.EnableMetrics(metrics, logr.MetricsUpdateFreqMillis); err != nil {
|
||
|
errs.Append(err)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return errs.ErrorOrNil()
|
||
|
}
|
||
|
|
||
|
// NewLogger creates a Logger using defaults. A `Logger` is light-weight
|
||
|
// enough to create on-demand, but typically one or more Loggers are
|
||
|
// created and re-used.
|
||
|
func (logr *Logr) NewLogger() Logger {
|
||
|
logger := Logger{logr: logr}
|
||
|
return logger
|
||
|
}
|
||
|
|
||
|
var levelStatusDisabled = LevelStatus{}
|
||
|
|
||
|
// IsLevelEnabled returns true if at least one target has the specified
|
||
|
// level enabled. The result is cached so that subsequent checks are fast.
|
||
|
func (logr *Logr) IsLevelEnabled(lvl Level) LevelStatus {
|
||
|
status, ok := logr.isLevelEnabledFromCache(lvl)
|
||
|
if ok {
|
||
|
return status
|
||
|
}
|
||
|
|
||
|
// Check each target.
|
||
|
logr.tmux.RLock()
|
||
|
for _, t := range logr.targets {
|
||
|
e, s := t.IsLevelEnabled(lvl)
|
||
|
if e {
|
||
|
status.Enabled = true
|
||
|
if s {
|
||
|
status.Stacktrace = true
|
||
|
break // if both enabled then no sense checking more targets
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
logr.tmux.RUnlock()
|
||
|
|
||
|
// Cache and return the result.
|
||
|
if err := logr.updateLevelCache(lvl.ID, status); err != nil {
|
||
|
logr.ReportError(err)
|
||
|
return LevelStatus{}
|
||
|
}
|
||
|
return status
|
||
|
}
|
||
|
|
||
|
func (logr *Logr) isLevelEnabledFromCache(lvl Level) (LevelStatus, bool) {
|
||
|
logr.mux.RLock()
|
||
|
defer logr.mux.RUnlock()
|
||
|
|
||
|
// Don't accept new log records after shutdown.
|
||
|
if logr.shutdown {
|
||
|
return levelStatusDisabled, true
|
||
|
}
|
||
|
|
||
|
// Check cache. lvlCache may still be nil if no targets added.
|
||
|
if logr.lvlCache == nil {
|
||
|
return levelStatusDisabled, true
|
||
|
}
|
||
|
status, ok := logr.lvlCache.get(lvl.ID)
|
||
|
if ok {
|
||
|
return status, true
|
||
|
}
|
||
|
return LevelStatus{}, false
|
||
|
}
|
||
|
|
||
|
func (logr *Logr) updateLevelCache(id LevelID, status LevelStatus) error {
|
||
|
logr.mux.RLock()
|
||
|
defer logr.mux.RUnlock()
|
||
|
if logr.lvlCache != nil {
|
||
|
return logr.lvlCache.put(id, status)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// HasTargets returns true only if at least one target exists within the Logr.
|
||
|
func (logr *Logr) HasTargets() bool {
|
||
|
logr.tmux.RLock()
|
||
|
defer logr.tmux.RUnlock()
|
||
|
return len(logr.targets) > 0
|
||
|
}
|
||
|
|
||
|
// TargetInfo provides name and type for a Target.
|
||
|
type TargetInfo struct {
|
||
|
Name string
|
||
|
Type string
|
||
|
}
|
||
|
|
||
|
// TargetInfos enumerates all the targets added to this Logr.
|
||
|
// The resulting slice represents a snapshot at time of calling.
|
||
|
func (logr *Logr) TargetInfos() []TargetInfo {
|
||
|
logr.tmux.RLock()
|
||
|
defer logr.tmux.RUnlock()
|
||
|
|
||
|
infos := make([]TargetInfo, 0)
|
||
|
|
||
|
for _, t := range logr.targets {
|
||
|
inf := TargetInfo{
|
||
|
Name: fmt.Sprintf("%v", t),
|
||
|
Type: fmt.Sprintf("%T", t),
|
||
|
}
|
||
|
infos = append(infos, inf)
|
||
|
}
|
||
|
return infos
|
||
|
}
|
||
|
|
||
|
// RemoveTargets safely removes one or more targets based on the filtering method.
|
||
|
// f should return true to delete the target, false to keep it.
|
||
|
// When removing a target, best effort is made to write any queued log records before
|
||
|
// closing, with cxt determining how much time can be spent in total.
|
||
|
// Note, keep the timeout short since this method blocks certain logging operations.
|
||
|
func (logr *Logr) RemoveTargets(cxt context.Context, f func(ti TargetInfo) bool) error {
|
||
|
var removed bool
|
||
|
defer func() {
|
||
|
if removed {
|
||
|
// call this after tmux is released since
|
||
|
// it will lock mux and we don't want to
|
||
|
// introduce possible deadlock.
|
||
|
logr.ResetLevelCache()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
errs := merror.New()
|
||
|
|
||
|
logr.tmux.Lock()
|
||
|
defer logr.tmux.Unlock()
|
||
|
|
||
|
cp := make([]Target, 0)
|
||
|
|
||
|
for _, t := range logr.targets {
|
||
|
inf := TargetInfo{
|
||
|
Name: fmt.Sprintf("%v", t),
|
||
|
Type: fmt.Sprintf("%T", t),
|
||
|
}
|
||
|
if f(inf) {
|
||
|
if err := t.Shutdown(cxt); err != nil {
|
||
|
errs.Append(err)
|
||
|
}
|
||
|
removed = true
|
||
|
} else {
|
||
|
cp = append(cp, t)
|
||
|
}
|
||
|
}
|
||
|
logr.targets = cp
|
||
|
return errs.ErrorOrNil()
|
||
|
}
|
||
|
|
||
|
// ResetLevelCache resets the cached results of `IsLevelEnabled`. This is
|
||
|
// called any time a Target is added or a target's level is changed.
|
||
|
func (logr *Logr) ResetLevelCache() {
|
||
|
// Write lock so that new cache entries cannot be stored while we
|
||
|
// clear the cache.
|
||
|
logr.mux.Lock()
|
||
|
defer logr.mux.Unlock()
|
||
|
logr.resetLevelCache()
|
||
|
}
|
||
|
|
||
|
// resetLevelCache empties the level cache without locking.
|
||
|
// mux.Lock must be held before calling this function.
|
||
|
func (logr *Logr) resetLevelCache() {
|
||
|
// lvlCache may still be nil if no targets added.
|
||
|
if logr.lvlCache != nil {
|
||
|
logr.lvlCache.clear()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// enqueue adds a log record to the logr queue. If the queue is full then
|
||
|
// this function either blocks or the log record is dropped, depending on
|
||
|
// the result of calling `OnQueueFull`.
|
||
|
func (logr *Logr) enqueue(rec *LogRec) {
|
||
|
if logr.in == nil {
|
||
|
logr.ReportError(fmt.Errorf("AddTarget or Configure must be called before enqueue"))
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case logr.in <- rec:
|
||
|
default:
|
||
|
if logr.OnQueueFull != nil && logr.OnQueueFull(rec, logr.maxQueueSizeActual) {
|
||
|
return // drop the record
|
||
|
}
|
||
|
select {
|
||
|
case <-time.After(logr.enqueueTimeout()):
|
||
|
logr.ReportError(fmt.Errorf("enqueue timed out for log rec [%v]", rec))
|
||
|
case logr.in <- rec: // block until success or timeout
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// exit is called by one of the FatalXXX style APIS. If `logr.OnExit` is not nil
|
||
|
// then that method is called, otherwise the default behavior is to shut down this
|
||
|
// Logr cleanly then call `os.Exit(code)`.
|
||
|
func (logr *Logr) exit(code int) {
|
||
|
if logr.OnExit != nil {
|
||
|
logr.OnExit(code)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if err := logr.Shutdown(); err != nil {
|
||
|
logr.ReportError(err)
|
||
|
}
|
||
|
os.Exit(code)
|
||
|
}
|
||
|
|
||
|
// panic is called by one of the PanicXXX style APIS. If `logr.OnPanic` is not nil
|
||
|
// then that method is called, otherwise the default behavior is to shut down this
|
||
|
// Logr cleanly then call `panic(err)`.
|
||
|
func (logr *Logr) panic(err interface{}) {
|
||
|
if logr.OnPanic != nil {
|
||
|
logr.OnPanic(err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if err := logr.Shutdown(); err != nil {
|
||
|
logr.ReportError(err)
|
||
|
}
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
// Flush blocks while flushing the logr queue and all target queues, by
|
||
|
// writing existing log records to valid targets.
|
||
|
// Any attempts to add new log records will block until flush is complete.
|
||
|
// `logr.FlushTimeout` determines how long flush can execute before
|
||
|
// timing out. Use `IsTimeoutError` to determine if the returned error is
|
||
|
// due to a timeout.
|
||
|
func (logr *Logr) Flush() error {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), logr.flushTimeout())
|
||
|
defer cancel()
|
||
|
return logr.FlushWithTimeout(ctx)
|
||
|
}
|
||
|
|
||
|
// Flush blocks while flushing the logr queue and all target queues, by
|
||
|
// writing existing log records to valid targets.
|
||
|
// Any attempts to add new log records will block until flush is complete.
|
||
|
// Use `IsTimeoutError` to determine if the returned error is
|
||
|
// due to a timeout.
|
||
|
func (logr *Logr) FlushWithTimeout(ctx context.Context) error {
|
||
|
if !logr.HasTargets() {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
if logr.IsShutdown() {
|
||
|
return errors.New("Flush called on shut down Logr")
|
||
|
}
|
||
|
|
||
|
rec := newFlushLogRec(logr.NewLogger())
|
||
|
logr.enqueue(rec)
|
||
|
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return newTimeoutError("logr queue shutdown timeout")
|
||
|
case <-rec.flush:
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// IsShutdown returns true if this Logr instance has been shut down.
|
||
|
// No further log records can be enqueued and no targets added after
|
||
|
// shutdown.
|
||
|
func (logr *Logr) IsShutdown() bool {
|
||
|
logr.mux.Lock()
|
||
|
defer logr.mux.Unlock()
|
||
|
return logr.shutdown
|
||
|
}
|
||
|
|
||
|
// Shutdown cleanly stops the logging engine after making best efforts
|
||
|
// to flush all targets. Call this function right before application
|
||
|
// exit - logr cannot be restarted once shut down.
|
||
|
// `logr.ShutdownTimeout` determines how long shutdown can execute before
|
||
|
// timing out. Use `IsTimeoutError` to determine if the returned error is
|
||
|
// due to a timeout.
|
||
|
func (logr *Logr) Shutdown() error {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), logr.shutdownTimeout())
|
||
|
defer cancel()
|
||
|
return logr.ShutdownWithTimeout(ctx)
|
||
|
}
|
||
|
|
||
|
// Shutdown cleanly stops the logging engine after making best efforts
|
||
|
// to flush all targets. Call this function right before application
|
||
|
// exit - logr cannot be restarted once shut down.
|
||
|
// Use `IsTimeoutError` to determine if the returned error is due to a
|
||
|
// timeout.
|
||
|
func (logr *Logr) ShutdownWithTimeout(ctx context.Context) error {
|
||
|
logr.mux.Lock()
|
||
|
if logr.shutdown {
|
||
|
logr.mux.Unlock()
|
||
|
return errors.New("Shutdown called again after shut down")
|
||
|
}
|
||
|
logr.shutdown = true
|
||
|
logr.resetLevelCache()
|
||
|
logr.mux.Unlock()
|
||
|
|
||
|
logr.metricsCloseOnce.Do(func() {
|
||
|
if logr.metricsDone != nil {
|
||
|
close(logr.metricsDone)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
errs := merror.New()
|
||
|
|
||
|
// close the incoming channel and wait for read loop to exit.
|
||
|
if logr.in != nil {
|
||
|
close(logr.in)
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
errs.Append(newTimeoutError("logr queue shutdown timeout"))
|
||
|
case <-logr.done:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// logr.in channel should now be drained to targets and no more log records
|
||
|
// can be added.
|
||
|
logr.tmux.RLock()
|
||
|
defer logr.tmux.RUnlock()
|
||
|
for _, t := range logr.targets {
|
||
|
err := t.Shutdown(ctx)
|
||
|
if err != nil {
|
||
|
errs.Append(err)
|
||
|
}
|
||
|
}
|
||
|
return errs.ErrorOrNil()
|
||
|
}
|
||
|
|
||
|
// ReportError is used to notify the host application of any internal logging errors.
|
||
|
// If `OnLoggerError` is not nil, it is called with the error, otherwise the error is
|
||
|
// output to `os.Stderr`.
|
||
|
func (logr *Logr) ReportError(err interface{}) {
|
||
|
logr.incErrorCounter()
|
||
|
|
||
|
if logr.OnLoggerError == nil {
|
||
|
fmt.Fprintln(os.Stderr, err)
|
||
|
return
|
||
|
}
|
||
|
logr.OnLoggerError(fmt.Errorf("%v", err))
|
||
|
}
|
||
|
|
||
|
// BorrowBuffer borrows a buffer from the pool. Release the buffer to reduce garbage collection.
|
||
|
func (logr *Logr) BorrowBuffer() *bytes.Buffer {
|
||
|
if logr.DisableBufferPool {
|
||
|
return &bytes.Buffer{}
|
||
|
}
|
||
|
return logr.bufferPool.Get().(*bytes.Buffer)
|
||
|
}
|
||
|
|
||
|
// ReleaseBuffer returns a buffer to the pool to reduce garbage collection. The buffer is only
|
||
|
// retained if less than MaxPooledBuffer.
|
||
|
func (logr *Logr) ReleaseBuffer(buf *bytes.Buffer) {
|
||
|
if !logr.DisableBufferPool && buf.Cap() < logr.MaxPooledBuffer {
|
||
|
buf.Reset()
|
||
|
logr.bufferPool.Put(buf)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// enqueueTimeout returns amount of time a log record can take to be queued.
|
||
|
// This only applies to blocking enqueue which happen after `logr.OnQueueFull` is called
|
||
|
// and returns false.
|
||
|
func (logr *Logr) enqueueTimeout() time.Duration {
|
||
|
if logr.EnqueueTimeout == 0 {
|
||
|
return DefaultEnqueueTimeout
|
||
|
}
|
||
|
return logr.EnqueueTimeout
|
||
|
}
|
||
|
|
||
|
// shutdownTimeout returns the timeout duration for `logr.Shutdown`.
|
||
|
func (logr *Logr) shutdownTimeout() time.Duration {
|
||
|
if logr.ShutdownTimeout == 0 {
|
||
|
return DefaultShutdownTimeout
|
||
|
}
|
||
|
return logr.ShutdownTimeout
|
||
|
}
|
||
|
|
||
|
// flushTimeout returns the timeout duration for `logr.Flush`.
|
||
|
func (logr *Logr) flushTimeout() time.Duration {
|
||
|
if logr.FlushTimeout == 0 {
|
||
|
return DefaultFlushTimeout
|
||
|
}
|
||
|
return logr.FlushTimeout
|
||
|
}
|
||
|
|
||
|
// start selects on incoming log records until done channel signals.
|
||
|
// Incoming log records are fanned out to all log targets.
|
||
|
func (logr *Logr) start() {
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
logr.ReportError(r)
|
||
|
go logr.start()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
for rec := range logr.in {
|
||
|
if rec.flush != nil {
|
||
|
logr.flush(rec.flush)
|
||
|
} else {
|
||
|
rec.prep()
|
||
|
logr.fanout(rec)
|
||
|
}
|
||
|
}
|
||
|
close(logr.done)
|
||
|
}
|
||
|
|
||
|
// startMetricsUpdater updates the metrics for any polled values every `MetricsUpdateFreqSecs` seconds until
|
||
|
// logr is closed.
|
||
|
func (logr *Logr) startMetricsUpdater() {
|
||
|
for {
|
||
|
updateFreq := logr.getMetricsUpdateFreqMillis()
|
||
|
if updateFreq == 0 {
|
||
|
updateFreq = DefMetricsUpdateFreqMillis
|
||
|
}
|
||
|
if updateFreq < 250 {
|
||
|
updateFreq = 250 // don't peg the CPU
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case <-logr.metricsDone:
|
||
|
return
|
||
|
case <-time.After(time.Duration(updateFreq) * time.Millisecond):
|
||
|
logr.setQueueSizeGauge(float64(len(logr.in)))
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (logr *Logr) getMetricsUpdateFreqMillis() int64 {
|
||
|
logr.mux.RLock()
|
||
|
defer logr.mux.RUnlock()
|
||
|
return logr.MetricsUpdateFreqMillis
|
||
|
}
|
||
|
|
||
|
// fanout pushes a LogRec to all targets.
|
||
|
func (logr *Logr) fanout(rec *LogRec) {
|
||
|
var target Target
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
logr.ReportError(fmt.Errorf("fanout failed for target %s, %v", target, r))
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
var logged bool
|
||
|
defer func() {
|
||
|
if logged {
|
||
|
logr.incLoggedCounter() // call this after tmux is released
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
logr.tmux.RLock()
|
||
|
defer logr.tmux.RUnlock()
|
||
|
for _, target = range logr.targets {
|
||
|
if enabled, _ := target.IsLevelEnabled(rec.Level()); enabled {
|
||
|
target.Log(rec)
|
||
|
logged = true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// flush drains the queue and notifies when done.
|
||
|
func (logr *Logr) flush(done chan<- struct{}) {
|
||
|
// first drain the logr queue.
|
||
|
loop:
|
||
|
for {
|
||
|
var rec *LogRec
|
||
|
select {
|
||
|
case rec = <-logr.in:
|
||
|
if rec.flush == nil {
|
||
|
rec.prep()
|
||
|
logr.fanout(rec)
|
||
|
}
|
||
|
default:
|
||
|
break loop
|
||
|
}
|
||
|
}
|
||
|
|
||
|
logger := logr.NewLogger()
|
||
|
|
||
|
// drain all the targets; block until finished.
|
||
|
logr.tmux.RLock()
|
||
|
defer logr.tmux.RUnlock()
|
||
|
for _, target := range logr.targets {
|
||
|
rec := newFlushLogRec(logger)
|
||
|
target.Log(rec)
|
||
|
<-rec.flush
|
||
|
}
|
||
|
done <- struct{}{}
|
||
|
}
|