Make DNS queue a bit more buffered. Add metrics

This commit is contained in:
Oleksii Shevchuk 2018-02-01 16:26:27 +02:00
parent a05b912d0c
commit effb902e20
2 changed files with 84 additions and 20 deletions

View File

@ -7,6 +7,7 @@ import (
"time"
dns "github.com/miekg/dns"
rc "github.com/paulbellamy/ratecounter"
log "github.com/sirupsen/logrus"
)
@ -58,11 +59,47 @@ func (p *DNSListener) messageReader(cherr chan error, chmsg chan []string) {
}
}
close(chmsg)
log.Debug("[3.] REMOTE READER CLOSED")
}
func (p *DNSListener) messageProcessor(
recvStrings chan []string, interrupt <-chan bool, closeNotify chan<- bool, decoderr chan<- error) {
func (p *DNSListener) responseProcessor(queue chan chan []string, recvStrings chan []string) {
for {
response := <-recvStrings
if response == nil {
break
}
rchan := <-queue
if rchan == nil {
break
}
rchan <- response
}
waitLoop:
for {
select {
case ignore := <-queue:
if ignore == nil {
break waitLoop
}
ignore <- []string{}
default:
break waitLoop
}
}
log.Debug("[5.] RESPONSE PROCESSOR CLOSED")
}
func (p *DNSListener) queryProcessor(
queue chan chan []string,
interrupt <-chan bool, closeNotify chan<- bool, decoderr chan<- error) {
ignore := false
notifySent := false
@ -83,7 +120,7 @@ func (p *DNSListener) messageProcessor(
interrupted = true
}
log.Debug("DNS. Wait done", r, ignore)
log.Debug("DNS. Wait done: ", r, ignore)
if r == nil || interrupted {
if !notifySent {
@ -112,19 +149,12 @@ func (p *DNSListener) messageProcessor(
log.Debug("Ignore 2")
ignore = true
continue
} else {
queue <- r.IPs
}
log.Debug("DNS. Wait for response or for interrupt")
select {
case ips := <-recvStrings:
r.IPs <- ips
case _ = <-interrupt:
r.IPs <- []string{}
ignore = true
}
log.Debug("DNS. Wait for response or for interrupt completed")
}
waitLoop:
for {
select {
case r := <-p.DNSRequests:
@ -132,27 +162,40 @@ func (p *DNSListener) messageProcessor(
r.IPs <- []string{}
}
default:
break
break waitLoop
}
}
log.Debug("[4.] Message processor closed")
}
func warnSlow(message string, now time.Time, max time.Duration) {
current := time.Now()
barrier := now.Add(max)
diff := current.Sub(now).Seconds()
if barrier.Before(current) {
log.Warning(fmt.Sprintf("%s: %.2fs", message, diff))
}
}
func (p *DNSListener) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
m := new(dns.Msg)
m.SetReply(r)
m.Compress = true
m.Authoritative = true
processed := true
now := time.Now()
result := make(chan []string)
log.Debug("START PROCESSING REQUEST")
defer log.Debug("END PROCESSING REQUEST")
p.dnsRequestsCounter.Incr(1)
defer p.dnsProcessedRequestsCounter.Incr(1)
p.processedRequests.Add(1)
defer p.processedRequests.Done()
defer close(result)
p.cacheLock.Lock()
for k, v := range p.DNSCache {
@ -183,17 +226,27 @@ func (p *DNSListener) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
if strings.HasSuffix(question, p.Domain) {
if p.active {
p.dnsRemoteRequestsCounter.Incr(1)
question = question[:len(question)-len(p.Domain)-1]
result := make(chan []string)
now2 := time.Now()
p.DNSRequests <- &DNSRequest{
Name: question,
IPs: result,
}
log.Debug("DNS: Send request: ", q.Name)
responses = <-result
log.Info("DNS: Response: ", q.Name, ": ", responses)
close(result)
warnSlow(fmt.Sprintf(
"DNS: Slow RR communication: (Rates: Remote=%dps Total=%dps Processed=%dps)",
p.dnsRemoteRequestsCounter.Rate()/10,
p.dnsRequestsCounter.Rate()/10,
p.dnsProcessedRequestsCounter.Rate()/10,
), now2, 1*time.Second)
}
if len(responses) > 0 {
@ -256,6 +309,10 @@ func NewDNSListener(conn net.Conn, domain string) *DNSListener {
},
DNSRequests: make(chan *DNSRequest),
dnsRequestsCounter: rc.NewRateCounter(10 * time.Second),
dnsRemoteRequestsCounter: rc.NewRateCounter(10 * time.Second),
dnsProcessedRequestsCounter: rc.NewRateCounter(10 * time.Second),
active: true,
}
@ -275,18 +332,20 @@ func (p *DNSListener) Serve() error {
recvErrors := make(chan error)
closeNotify := make(chan bool)
interruptNotify := make(chan bool)
responsesQueue := make(chan chan []string, 512)
defer close(tcperr)
defer close(udperr)
defer close(decoderr)
defer close(recvStrings)
defer close(recvErrors)
defer close(closeNotify)
defer close(responsesQueue)
go p.listenAndServeTCP(tcperr)
go p.listenAndServeUDP(udperr)
go p.messageReader(recvErrors, recvStrings)
go p.messageProcessor(recvStrings, interruptNotify, closeNotify, decoderr)
go p.queryProcessor(responsesQueue, interruptNotify, closeNotify, decoderr)
go p.responseProcessor(responsesQueue, recvStrings)
var err error

View File

@ -6,6 +6,7 @@ import (
"time"
dns "github.com/miekg/dns"
rc "github.com/paulbellamy/ratecounter"
)
type (
@ -60,6 +61,10 @@ type (
processedRequests sync.WaitGroup
dnsRequestsCounter *rc.RateCounter
dnsRemoteRequestsCounter *rc.RateCounter
dnsProcessedRequestsCounter *rc.RateCounter
cacheLock sync.Mutex
activeLock sync.Mutex