From 7ca806661d4899ac8a9b168a28c597b81a42581c Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 11 Dec 2012 20:36:42 -0800 Subject: [PATCH] Connection throttling for dev-server. And on by default, to keep us honest. Change-Id: Ic4c1fdfd06b54ec9108bb13c05714e9a81b642cd --- dev-server | 20 +++++-- pkg/throttle/throttle.go | 119 +++++++++++++++++++++++++++++++++++++ pkg/webserver/webserver.go | 24 +++++++- 3 files changed, 156 insertions(+), 7 deletions(-) create mode 100644 pkg/throttle/throttle.go diff --git a/dev-server b/dev-server index b1be101c0..ce9e3725f 100755 --- a/dev-server +++ b/dev-server @@ -14,12 +14,16 @@ my $closure_svn = "http://closure-library.googlecode.com/svn/trunk/"; chdir $Bin or die; -my $opt_wipe; -my $opt_tls; -my $opt_nobuild; +my $opt_KBps = 150; # if non-zero, kBps to throttle connections +my $opt_latency_ms = 90; # added latency in millisecond +my $opt_fast; # shortcut to disable throttling + my $opt_all; # listen on all interfaces -my $opt_staticres; # use static resources, not those on disk +my $opt_nobuild; my $opt_offline; # don't use the network ("airplane mode") +my $opt_staticres; # use static resources, not those on disk +my $opt_tls; +my $opt_wipe; # keep indexes in memory only. often used with --wipe, but not # necessarily. if --wipe isn't used, all blobs are re-indexed @@ -38,6 +42,9 @@ GetOptions("wipe" => \$opt_wipe, "mysql" => \$opt_mysql, "staticres" => \$opt_staticres, "offline" => \$opt_offline, + "KBps=i" => \$opt_KBps, + "latency_ms=i" => \$opt_latency_ms, + "fast" => \$opt_fast, ) or usage(); @@ -51,6 +58,11 @@ unless ($ENV{GOPATH}) { $ENV{GOPATH} = "$Bin/gopath" } +unless ($opt_fast) { + $ENV{DEV_THROTTLE_KBPS} = $opt_KBps; + $ENV{DEV_THROTTLE_LATENCY_MS} = $opt_latency_ms; +} + my $camlistored; my $camdbinit; if ($opt_nobuild) { diff --git a/pkg/throttle/throttle.go b/pkg/throttle/throttle.go new file mode 100644 index 000000000..b7e0b2949 --- /dev/null +++ b/pkg/throttle/throttle.go @@ -0,0 +1,119 @@ +/* +Copyright 2012 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package throttle + +import ( + "fmt" + "net" + "time" +) + +const unitSize = 1400 // read/write chunk size. ~MTU size. + +type Rate struct { + KBps int // or 0, to not rate-limit bandwidth + Latency time.Duration +} + +// byteTime returns the time required to send n bytes. +func (r Rate) byteTime(n int) time.Duration { + if r.KBps == 0 { + return 0 + } + return time.Duration(float64(n) / 1024 / float64(r.KBps)) * time.Second +} + +type Listener struct { + net.Listener + Down Rate // server Writes to Client + Up Rate // server Reads from client +} + +func (ln *Listener) Accept() (net.Conn, error) { + c, err := ln.Listener.Accept() + time.Sleep(ln.Up.Latency) + if err != nil { + return nil, err + } + tc := &conn{Conn: c, Down: ln.Down, Up: ln.Up} + tc.start() + return tc, nil +} + +type nErr struct { + n int + err error +} + +type writeReq struct { + writeAt time.Time + p []byte + resc chan nErr +} + +type conn struct { + net.Conn + Down, Up Rate + + wchan chan writeReq +} + +func (c *conn) start() { + c.wchan = make(chan writeReq, 1024) + go c.writeLoop() +} + +func (c *conn) writeLoop() { + for req := range c.wchan { + time.Sleep(req.writeAt.Sub(time.Now())) + var res nErr + for len(req.p) > 0 && res.err == nil { + writep := req.p + if len(writep) > unitSize { + writep = writep[:unitSize] + } + n, err := c.Conn.Write(writep) + time.Sleep(c.Down.byteTime(len(writep))) + res.n += n + res.err = err + req.p = req.p[n:] + } + req.resc <- res + } +} + +func (c *conn) Close() error { + err := c.Conn.Close() + close(c.wchan) + return err +} + +func (c *conn) Write(p []byte) (n int, err error) { + defer func() { + if e := recover(); e != nil { + n = 0 + err = fmt.Errorf("%v", err) + return + } + }() + resc := make(chan nErr, 1) + c.wchan <- writeReq{time.Now().Add(c.Down.Latency), p, resc} + res := <-resc + return res.n, res.err +} + +// TODO: Read throttling diff --git a/pkg/webserver/webserver.go b/pkg/webserver/webserver.go index ebd5550b8..cf9bbc08f 100644 --- a/pkg/webserver/webserver.go +++ b/pkg/webserver/webserver.go @@ -26,9 +26,11 @@ import ( "net" "net/http" "os" + "strconv" "strings" "time" + "camlistore.org/pkg/throttle" "camlistore.org/third_party/github.com/bradfitz/runsit/listen" ) @@ -45,8 +47,7 @@ type Server struct { func New() *Server { return &Server{ - premux: make([]HandlerPicker, 0), - mux: http.NewServeMux(), + mux: http.NewServeMux(), } } @@ -140,12 +141,29 @@ func (s *Server) Listen(addr string) error { return nil } +func (s *Server) throttleListener() net.Listener { + kBps, _ := strconv.Atoi(os.Getenv("DEV_THROTTLE_KBPS")) + ms, _ := strconv.Atoi(os.Getenv("DEV_THROTTLE_LATENCY_MS")) + if kBps == 0 && ms == 0 { + return s.listener + } + rate := throttle.Rate{ + KBps: kBps, + Latency: time.Duration(ms) * time.Millisecond, + } + return &throttle.Listener{ + Listener: s.listener, + Down: rate, + Up: rate, // TODO: separate rates? + } +} + func (s *Server) Serve() { if err := s.Listen(""); err != nil { log.Fatalf("Listen error: %v", err) } go runTestHarnessIntegration(s.listener) - err := http.Serve(s.listener, s) + err := http.Serve(s.throttleListener(), s) if err != nil { log.Printf("Error in http server: %v\n", err) os.Exit(1)