mirror of https://github.com/yandex/odyssey.git
341 lines
7.6 KiB
Go
341 lines
7.6 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/csv"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
_ "github.com/lib/pq"
|
|
"golang.org/x/term"
|
|
)
|
|
|
|
type IntArrayFlags []int
|
|
|
|
func newIntArrayFlags(defaults []int) *IntArrayFlags {
|
|
i := IntArrayFlags(defaults)
|
|
return &i
|
|
}
|
|
|
|
func (i *IntArrayFlags) String() string {
|
|
return fmt.Sprintf("%v", *i)
|
|
}
|
|
|
|
func (i *IntArrayFlags) Set(value string) error {
|
|
*i = nil
|
|
|
|
for _, n := range strings.FieldsFunc(value, func(r rune) bool { return r == ',' || r == ' ' }) {
|
|
v, err := strconv.Atoi(n)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*i = append(*i, v)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type DurationArrayFlags []time.Duration
|
|
|
|
func newDurationArrayFlags(defaults []time.Duration) *DurationArrayFlags {
|
|
d := DurationArrayFlags(defaults)
|
|
return &d
|
|
}
|
|
|
|
func (d *DurationArrayFlags) String() string {
|
|
return fmt.Sprintf("%v", *d)
|
|
}
|
|
|
|
func (d *DurationArrayFlags) Set(value string) error {
|
|
*d = nil
|
|
|
|
for _, n := range strings.FieldsFunc(value, func(r rune) bool { return r == ',' || r == ' ' }) {
|
|
v, err := time.ParseDuration(n)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*d = append(*d, v)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type Config struct {
|
|
ConnectionString string
|
|
BenchmarkDuration time.Duration
|
|
ClientParallels []int
|
|
ConnectTimeouts []time.Duration
|
|
SelectTimeouts []time.Duration
|
|
PauseDuration time.Duration
|
|
OutputFile string
|
|
}
|
|
|
|
type Result struct {
|
|
Connections int64
|
|
Selects int64
|
|
Errors int64
|
|
Clients int
|
|
TotalTime time.Duration
|
|
ConnectTimeout time.Duration
|
|
SelectTimeout time.Duration
|
|
}
|
|
|
|
var (
|
|
connectionsCounter int64 = 0
|
|
selectCounter int64 = 0
|
|
errorsCounter int64 = 0
|
|
)
|
|
|
|
func doConnect(cfg *Config, connectTimeout time.Duration) {
|
|
db, err := sql.Open("postgres", cfg.ConnectionString)
|
|
if err != nil {
|
|
fmt.Printf("connect error: %v\n", err)
|
|
atomic.AddInt64(&errorsCounter, 1)
|
|
return
|
|
}
|
|
|
|
defer db.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
|
|
defer cancel()
|
|
|
|
err = db.PingContext(ctx)
|
|
if err != nil {
|
|
fmt.Printf("ping error: %v\n", err)
|
|
atomic.AddInt64(&errorsCounter, 1)
|
|
return
|
|
}
|
|
|
|
atomic.AddInt64(&connectionsCounter, 1)
|
|
}
|
|
|
|
func doConnectInf(ctx context.Context, cfg *Config, connectTimeout time.Duration, wg *sync.WaitGroup, syncStart *sync.WaitGroup) {
|
|
syncStart.Wait()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
wg.Done()
|
|
return
|
|
default:
|
|
doConnect(cfg, connectTimeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
func doSelect(db *sql.DB, timeout time.Duration) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
var val int
|
|
err := db.QueryRowContext(ctx, "select 1 + 2").Scan(&val)
|
|
if err != nil {
|
|
fmt.Printf("select error: %v\n", err)
|
|
atomic.AddInt64(&errorsCounter, 1)
|
|
return
|
|
}
|
|
|
|
if val != 3 {
|
|
panic("wrong value")
|
|
}
|
|
|
|
atomic.AddInt64(&selectCounter, 1)
|
|
}
|
|
|
|
func doSelectInf(ctx context.Context, cfg *Config, selectTimeout time.Duration, wg *sync.WaitGroup) {
|
|
db, err := sql.Open("postgres", cfg.ConnectionString)
|
|
noerror(err)
|
|
|
|
defer db.Close()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
wg.Done()
|
|
return
|
|
default:
|
|
doSelect(db, selectTimeout)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func doMeasure(cfg *Config, nparallel int, connectTimeout time.Duration, selectTimeout time.Duration) *Result {
|
|
atomic.StoreInt64(&connectionsCounter, 0)
|
|
atomic.StoreInt64(&selectCounter, 0)
|
|
atomic.StoreInt64(&errorsCounter, 0)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), cfg.BenchmarkDuration)
|
|
defer cancel()
|
|
|
|
wg := &sync.WaitGroup{}
|
|
syncStart := &sync.WaitGroup{}
|
|
|
|
syncStart.Add(1)
|
|
|
|
for i := 0; i < nparallel; i++ {
|
|
wg.Add(1)
|
|
go doSelectInf(ctx, cfg, selectTimeout, wg)
|
|
}
|
|
|
|
for i := 0; i < nparallel; i++ {
|
|
wg.Add(1)
|
|
go doConnectInf(ctx, cfg, connectTimeout, wg, syncStart)
|
|
}
|
|
|
|
startTime := time.Now()
|
|
syncStart.Done()
|
|
wg.Wait()
|
|
totalTime := time.Since(startTime)
|
|
|
|
return &Result{
|
|
Connections: atomic.LoadInt64(&connectionsCounter),
|
|
Selects: atomic.LoadInt64(&selectCounter),
|
|
Errors: atomic.LoadInt64(&errorsCounter),
|
|
TotalTime: totalTime,
|
|
Clients: nparallel,
|
|
ConnectTimeout: connectTimeout,
|
|
SelectTimeout: selectTimeout,
|
|
}
|
|
}
|
|
|
|
func printResultHeaderLine() {
|
|
fmt.Printf("clients\ttime\tconn to\tc/s\ts/s\te/s\n")
|
|
}
|
|
|
|
func printResultLine(r *Result) {
|
|
fmt.Printf("%d\t%v\t%v\t%v\t%f\t%f\t%f\n",
|
|
r.Clients,
|
|
r.TotalTime,
|
|
r.ConnectTimeout,
|
|
r.SelectTimeout,
|
|
float64(r.Connections)/r.TotalTime.Seconds(),
|
|
float64(r.Selects)/r.TotalTime.Seconds(),
|
|
float64(r.Errors)/r.TotalTime.Seconds(),
|
|
)
|
|
}
|
|
|
|
func printResultConsole(results []*Result) {
|
|
printResultHeaderLine()
|
|
|
|
for _, r := range results {
|
|
printResultLine(r)
|
|
}
|
|
}
|
|
|
|
func printResultCsv(filename string, results []*Result) {
|
|
f, err := os.Create(filename)
|
|
noerror(err)
|
|
defer f.Close()
|
|
|
|
w := csv.NewWriter(f)
|
|
defer w.Flush()
|
|
|
|
w.Write([]string{"#", "clients", "time", "conn to", "select to", "c/s", "s/s", "e/s"})
|
|
for i, r := range results {
|
|
err := w.Write([]string{
|
|
strconv.Itoa(i + 1),
|
|
strconv.Itoa(r.Clients),
|
|
r.TotalTime.String(),
|
|
r.ConnectTimeout.String(),
|
|
r.SelectTimeout.String(),
|
|
fmt.Sprintf("%f", float64(r.Connections)/r.TotalTime.Seconds()),
|
|
fmt.Sprintf("%f", float64(r.Selects)/r.TotalTime.Seconds()),
|
|
fmt.Sprintf("%f", float64(r.Errors)/r.TotalTime.Seconds()),
|
|
})
|
|
noerror(err)
|
|
}
|
|
|
|
fmt.Printf("Results saved to %s\n", filename)
|
|
}
|
|
|
|
func printResults(cfg *Config, results []*Result) {
|
|
printResultConsole(results)
|
|
printResultCsv(cfg.OutputFile, results)
|
|
}
|
|
|
|
func runBenchmarks(cfg *Config) []*Result {
|
|
results := make([]*Result, 0, len(cfg.ClientParallels))
|
|
|
|
for _, np := range cfg.ClientParallels {
|
|
for _, ct := range cfg.ConnectTimeouts {
|
|
for _, st := range cfg.SelectTimeouts {
|
|
r := doMeasure(cfg, np, ct, st)
|
|
printResultLine(r)
|
|
results = append(results, r)
|
|
|
|
time.Sleep(cfg.PauseDuration)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
func readConfig() *Config {
|
|
host := flag.String("host", "", "odyssey host")
|
|
port := flag.Int("port", 6432, "odyssey port")
|
|
user := flag.String("user", "user1", "user to connect")
|
|
dbName := flag.String("db", "db1", "db to connect")
|
|
sslRoot := flag.String("sslroot", "./root.crt", "ssl root.crt file path")
|
|
benchDuration := flag.Duration("bench-duration", time.Second*5, "one benchmark run duration")
|
|
pauseDuration := flag.Duration("pause-duration", time.Second*3, "pause between runs")
|
|
outputFile := flag.String("output-file", "./result.csv", "result filename")
|
|
|
|
clients := newIntArrayFlags([]int{10, 20, 30, 40, 50, 60, 70, 80, 90, 100})
|
|
flag.Var(clients, "clients", "numbers of parallel connecting clients")
|
|
|
|
connectTimeouts := newDurationArrayFlags([]time.Duration{time.Second})
|
|
flag.Var(connectTimeouts, "connect-timeouts", "connect timeouts")
|
|
|
|
selectTimeouts := newDurationArrayFlags([]time.Duration{time.Second})
|
|
flag.Var(selectTimeouts, "select-timeouts", "select timeouts")
|
|
|
|
flag.Parse()
|
|
|
|
if len(*host) == 0 {
|
|
fmt.Printf("Error: the host parameter is empty\n")
|
|
flag.Usage()
|
|
return nil
|
|
}
|
|
|
|
fmt.Printf("%s's password (no echo):", *user)
|
|
password, err := term.ReadPassword(0)
|
|
noerror(err)
|
|
|
|
return &Config{
|
|
ConnectionString: fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=verify-full sslrootcert=%s", *host, *port, *user, string(password), *dbName, *sslRoot),
|
|
BenchmarkDuration: *benchDuration,
|
|
ClientParallels: *clients,
|
|
PauseDuration: *pauseDuration,
|
|
OutputFile: *outputFile,
|
|
ConnectTimeouts: *connectTimeouts,
|
|
SelectTimeouts: *selectTimeouts,
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
cfg := readConfig()
|
|
if cfg == nil {
|
|
return
|
|
}
|
|
|
|
results := runBenchmarks(cfg)
|
|
|
|
printResults(cfg, results)
|
|
}
|
|
|
|
func noerror(err error) {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|