From effb902e2093258cc94987ae36785b5d2e782ff9 Mon Sep 17 00:00:00 2001 From: Oleksii Shevchuk Date: Thu, 1 Feb 2018 16:26:27 +0200 Subject: [PATCH] Make DNS queue a bit more buffered. Add metrics --- pupy/proxy/dns.go | 99 ++++++++++++++++++++++++++++++++++++--------- pupy/proxy/types.go | 5 +++ 2 files changed, 84 insertions(+), 20 deletions(-) diff --git a/pupy/proxy/dns.go b/pupy/proxy/dns.go index 32fb938d..9004280d 100644 --- a/pupy/proxy/dns.go +++ b/pupy/proxy/dns.go @@ -7,6 +7,7 @@ import ( "time" dns "github.com/miekg/dns" + rc "github.com/paulbellamy/ratecounter" log "github.com/sirupsen/logrus" ) @@ -58,11 +59,47 @@ func (p *DNSListener) messageReader(cherr chan error, chmsg chan []string) { } } + close(chmsg) log.Debug("[3.] REMOTE READER CLOSED") } -func (p *DNSListener) messageProcessor( - recvStrings chan []string, interrupt <-chan bool, closeNotify chan<- bool, decoderr chan<- error) { +func (p *DNSListener) responseProcessor(queue chan chan []string, recvStrings chan []string) { + for { + response := <-recvStrings + + if response == nil { + break + } + + rchan := <-queue + if rchan == nil { + break + } + + rchan <- response + } + +waitLoop: + for { + select { + case ignore := <-queue: + if ignore == nil { + break waitLoop + } + + ignore <- []string{} + default: + break waitLoop + } + } + + log.Debug("[5.] RESPONSE PROCESSOR CLOSED") +} + +func (p *DNSListener) queryProcessor( + queue chan chan []string, + interrupt <-chan bool, closeNotify chan<- bool, decoderr chan<- error) { + ignore := false notifySent := false @@ -83,7 +120,7 @@ func (p *DNSListener) messageProcessor( interrupted = true } - log.Debug("DNS. Wait done", r, ignore) + log.Debug("DNS. Wait done: ", r, ignore) if r == nil || interrupted { if !notifySent { @@ -112,19 +149,12 @@ func (p *DNSListener) messageProcessor( log.Debug("Ignore 2") ignore = true continue + } else { + queue <- r.IPs } - - log.Debug("DNS. Wait for response or for interrupt") - select { - case ips := <-recvStrings: - r.IPs <- ips - case _ = <-interrupt: - r.IPs <- []string{} - ignore = true - } - log.Debug("DNS. Wait for response or for interrupt completed") } +waitLoop: for { select { case r := <-p.DNSRequests: @@ -132,27 +162,40 @@ func (p *DNSListener) messageProcessor( r.IPs <- []string{} } default: - break + break waitLoop } } log.Debug("[4.] Message processor closed") } +func warnSlow(message string, now time.Time, max time.Duration) { + current := time.Now() + barrier := now.Add(max) + diff := current.Sub(now).Seconds() + + if barrier.Before(current) { + log.Warning(fmt.Sprintf("%s: %.2fs", message, diff)) + } +} + func (p *DNSListener) ServeDNS(w dns.ResponseWriter, r *dns.Msg) { m := new(dns.Msg) m.SetReply(r) m.Compress = true + m.Authoritative = true processed := true now := time.Now() + result := make(chan []string) - log.Debug("START PROCESSING REQUEST") - defer log.Debug("END PROCESSING REQUEST") + p.dnsRequestsCounter.Incr(1) + defer p.dnsProcessedRequestsCounter.Incr(1) p.processedRequests.Add(1) defer p.processedRequests.Done() + defer close(result) p.cacheLock.Lock() for k, v := range p.DNSCache { @@ -183,17 +226,27 @@ func (p *DNSListener) ServeDNS(w dns.ResponseWriter, r *dns.Msg) { if strings.HasSuffix(question, p.Domain) { if p.active { + p.dnsRemoteRequestsCounter.Incr(1) + question = question[:len(question)-len(p.Domain)-1] - result := make(chan []string) + + now2 := time.Now() p.DNSRequests <- &DNSRequest{ Name: question, IPs: result, } + log.Debug("DNS: Send request: ", q.Name) responses = <-result log.Info("DNS: Response: ", q.Name, ": ", responses) - close(result) + + warnSlow(fmt.Sprintf( + "DNS: Slow RR communication: (Rates: Remote=%dps Total=%dps Processed=%dps)", + p.dnsRemoteRequestsCounter.Rate()/10, + p.dnsRequestsCounter.Rate()/10, + p.dnsProcessedRequestsCounter.Rate()/10, + ), now2, 1*time.Second) } if len(responses) > 0 { @@ -256,6 +309,10 @@ func NewDNSListener(conn net.Conn, domain string) *DNSListener { }, DNSRequests: make(chan *DNSRequest), + dnsRequestsCounter: rc.NewRateCounter(10 * time.Second), + dnsRemoteRequestsCounter: rc.NewRateCounter(10 * time.Second), + dnsProcessedRequestsCounter: rc.NewRateCounter(10 * time.Second), + active: true, } @@ -275,18 +332,20 @@ func (p *DNSListener) Serve() error { recvErrors := make(chan error) closeNotify := make(chan bool) interruptNotify := make(chan bool) + responsesQueue := make(chan chan []string, 512) defer close(tcperr) defer close(udperr) defer close(decoderr) - defer close(recvStrings) defer close(recvErrors) defer close(closeNotify) + defer close(responsesQueue) go p.listenAndServeTCP(tcperr) go p.listenAndServeUDP(udperr) go p.messageReader(recvErrors, recvStrings) - go p.messageProcessor(recvStrings, interruptNotify, closeNotify, decoderr) + go p.queryProcessor(responsesQueue, interruptNotify, closeNotify, decoderr) + go p.responseProcessor(responsesQueue, recvStrings) var err error diff --git a/pupy/proxy/types.go b/pupy/proxy/types.go index 3967a56a..c0c09add 100644 --- a/pupy/proxy/types.go +++ b/pupy/proxy/types.go @@ -6,6 +6,7 @@ import ( "time" dns "github.com/miekg/dns" + rc "github.com/paulbellamy/ratecounter" ) type ( @@ -60,6 +61,10 @@ type ( processedRequests sync.WaitGroup + dnsRequestsCounter *rc.RateCounter + dnsRemoteRequestsCounter *rc.RateCounter + dnsProcessedRequestsCounter *rc.RateCounter + cacheLock sync.Mutex activeLock sync.Mutex