Connection throttling for dev-server.

And on by default, to keep us honest.

Change-Id: Ic4c1fdfd06b54ec9108bb13c05714e9a81b642cd
This commit is contained in:
Brad Fitzpatrick 2012-12-11 20:36:42 -08:00
parent 1118e8a52c
commit 7ca806661d
3 changed files with 156 additions and 7 deletions

View File

@ -14,12 +14,16 @@ my $closure_svn = "http://closure-library.googlecode.com/svn/trunk/";
chdir $Bin or die;
my $opt_wipe;
my $opt_tls;
my $opt_nobuild;
my $opt_KBps = 150; # if non-zero, kBps to throttle connections
my $opt_latency_ms = 90; # added latency in millisecond
my $opt_fast; # shortcut to disable throttling
my $opt_all; # listen on all interfaces
my $opt_staticres; # use static resources, not those on disk
my $opt_nobuild;
my $opt_offline; # don't use the network ("airplane mode")
my $opt_staticres; # use static resources, not those on disk
my $opt_tls;
my $opt_wipe;
# keep indexes in memory only. often used with --wipe, but not
# necessarily. if --wipe isn't used, all blobs are re-indexed
@ -38,6 +42,9 @@ GetOptions("wipe" => \$opt_wipe,
"mysql" => \$opt_mysql,
"staticres" => \$opt_staticres,
"offline" => \$opt_offline,
"KBps=i" => \$opt_KBps,
"latency_ms=i" => \$opt_latency_ms,
"fast" => \$opt_fast,
)
or usage();
@ -51,6 +58,11 @@ unless ($ENV{GOPATH}) {
$ENV{GOPATH} = "$Bin/gopath"
}
unless ($opt_fast) {
$ENV{DEV_THROTTLE_KBPS} = $opt_KBps;
$ENV{DEV_THROTTLE_LATENCY_MS} = $opt_latency_ms;
}
my $camlistored;
my $camdbinit;
if ($opt_nobuild) {

119
pkg/throttle/throttle.go Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package throttle
import (
"fmt"
"net"
"time"
)
const unitSize = 1400 // read/write chunk size. ~MTU size.
type Rate struct {
KBps int // or 0, to not rate-limit bandwidth
Latency time.Duration
}
// byteTime returns the time required to send n bytes.
func (r Rate) byteTime(n int) time.Duration {
if r.KBps == 0 {
return 0
}
return time.Duration(float64(n) / 1024 / float64(r.KBps)) * time.Second
}
type Listener struct {
net.Listener
Down Rate // server Writes to Client
Up Rate // server Reads from client
}
func (ln *Listener) Accept() (net.Conn, error) {
c, err := ln.Listener.Accept()
time.Sleep(ln.Up.Latency)
if err != nil {
return nil, err
}
tc := &conn{Conn: c, Down: ln.Down, Up: ln.Up}
tc.start()
return tc, nil
}
type nErr struct {
n int
err error
}
type writeReq struct {
writeAt time.Time
p []byte
resc chan nErr
}
type conn struct {
net.Conn
Down, Up Rate
wchan chan writeReq
}
func (c *conn) start() {
c.wchan = make(chan writeReq, 1024)
go c.writeLoop()
}
func (c *conn) writeLoop() {
for req := range c.wchan {
time.Sleep(req.writeAt.Sub(time.Now()))
var res nErr
for len(req.p) > 0 && res.err == nil {
writep := req.p
if len(writep) > unitSize {
writep = writep[:unitSize]
}
n, err := c.Conn.Write(writep)
time.Sleep(c.Down.byteTime(len(writep)))
res.n += n
res.err = err
req.p = req.p[n:]
}
req.resc <- res
}
}
func (c *conn) Close() error {
err := c.Conn.Close()
close(c.wchan)
return err
}
func (c *conn) Write(p []byte) (n int, err error) {
defer func() {
if e := recover(); e != nil {
n = 0
err = fmt.Errorf("%v", err)
return
}
}()
resc := make(chan nErr, 1)
c.wchan <- writeReq{time.Now().Add(c.Down.Latency), p, resc}
res := <-resc
return res.n, res.err
}
// TODO: Read throttling

View File

@ -26,9 +26,11 @@ import (
"net"
"net/http"
"os"
"strconv"
"strings"
"time"
"camlistore.org/pkg/throttle"
"camlistore.org/third_party/github.com/bradfitz/runsit/listen"
)
@ -45,8 +47,7 @@ type Server struct {
func New() *Server {
return &Server{
premux: make([]HandlerPicker, 0),
mux: http.NewServeMux(),
mux: http.NewServeMux(),
}
}
@ -140,12 +141,29 @@ func (s *Server) Listen(addr string) error {
return nil
}
func (s *Server) throttleListener() net.Listener {
kBps, _ := strconv.Atoi(os.Getenv("DEV_THROTTLE_KBPS"))
ms, _ := strconv.Atoi(os.Getenv("DEV_THROTTLE_LATENCY_MS"))
if kBps == 0 && ms == 0 {
return s.listener
}
rate := throttle.Rate{
KBps: kBps,
Latency: time.Duration(ms) * time.Millisecond,
}
return &throttle.Listener{
Listener: s.listener,
Down: rate,
Up: rate, // TODO: separate rates?
}
}
func (s *Server) Serve() {
if err := s.Listen(""); err != nil {
log.Fatalf("Listen error: %v", err)
}
go runTestHarnessIntegration(s.listener)
err := http.Serve(s.listener, s)
err := http.Serve(s.throttleListener(), s)
if err != nil {
log.Printf("Error in http server: %v\n", err)
os.Exit(1)