diff options
author | Pablo Carranza <pcarranza@gmail.com> | 2016-12-16 20:21:44 +0300 |
---|---|---|
committer | Pablo Carranza <pcarranza@gmail.com> | 2016-12-16 20:21:44 +0300 |
commit | 318b174c3acb67fdeedc4b31dd7bc7736c092f7e (patch) | |
tree | cac8070cd2ed24d14cb4bd724c66339a992f878d | |
parent | 29682e9670345e6b6a96e162f8ed10891ddf69ef (diff) | |
parent | 4e0be501d15e79fcef798ea1e384c743bd54fa14 (diff) |
Merge branch 'feature/restructure-server-client-around-protobuf' into 'initial-server'
Restructure client/server communication around Protobuf-serialized messages
Part of #7
See merge request !10
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Makefile | 15 | ||||
-rw-r--r-- | client/client.go | 87 | ||||
-rw-r--r-- | client/client_test.go | 70 | ||||
-rw-r--r-- | cmd/client/main.go | 10 | ||||
-rw-r--r-- | cmd/server/main.go | 8 | ||||
-rw-r--r-- | messaging/conn.go | 67 | ||||
-rw-r--r-- | messaging/messages.pb.go | 336 | ||||
-rw-r--r-- | messaging/messages.proto | 31 | ||||
-rw-r--r-- | messaging/messaging.go | 67 | ||||
-rw-r--r-- | server/command_executor.go | 133 | ||||
-rw-r--r-- | server/command_executor_test.go | 118 | ||||
-rw-r--r-- | server/server.go | 53 | ||||
-rw-r--r-- | server/server_test.go | 25 | ||||
-rw-r--r-- | server/service.go | 20 | ||||
-rw-r--r-- | vendor/vendor.json | 13 |
16 files changed, 818 insertions, 236 deletions
diff --git a/.gitignore b/.gitignore index 88e3b36c4..c1c3d9944 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ client/testdata/data _build +vendor/*/ git-daemon-client git-daemon-server @@ -4,21 +4,26 @@ CLIENT_BIN=git-daemon-client SERVER_BIN=git-daemon-server export GOPATH=${BUILD_DIR}/_build +export PATH:=${GOPATH}/bin:$(PATH) .PHONY: ${BUILD_DIR}/_build all: test build -build: - go build -o ${SERVER_BIN} cmd/server/main.go - go build -o ${CLIENT_BIN} cmd/client/main.go - ${BUILD_DIR}/_build: mkdir -p $@/src/${PKG} tar -cf - --exclude _build --exclude .git . | (cd $@/src/${PKG} && tar -xf -) touch $@ -test: ${BUILD_DIR}/_build +deps: ${BUILD_DIR}/_build + (which govendor) || go get -u github.com/kardianos/govendor + cd ${BUILD_DIR}/_build/src/${PKG} && govendor fetch +out + +build: deps + go build -o ${SERVER_BIN} cmd/server/main.go + go build -o ${CLIENT_BIN} cmd/client/main.go + +test: ${BUILD_DIR}/_build deps cd ${BUILD_DIR}/_build/src/${PKG}/server && go test -v cd ${BUILD_DIR}/_build/src/${PKG}/client && go test -v diff --git a/client/client.go b/client/client.go index fc943240f..246771573 100644 --- a/client/client.go +++ b/client/client.go @@ -2,65 +2,90 @@ package client import ( "bufio" - "encoding/json" + "bytes" + "io" "log" "net" + "os" - "gitlab.com/gitlab-org/git-access-daemon/server" + "gitlab.com/gitlab-org/git-access-daemon/messaging" ) type Client struct { - conn net.Conn + messagesConn *messaging.MessagesConn } -func NewClient(serviceAddress string) *Client { - conn, err := net.Dial("tcp", serviceAddress) +func NewClient(serverAddress string) *Client { + conn, err := net.Dial("tcp", serverAddress) if err != nil { log.Fatalln(err) } - return &Client{conn} + messagesConn := messaging.NewMessagesConn(conn) + return &Client{messagesConn} } -func (client *Client) Request(cmd []string) server.CmdResponse { - _, err := client.conn.Write(append(makeRequest(cmd), "\n"...)) +func (client *Client) Run(cmd []string) int { + rawMsg := messaging.NewCommandMessage(os.Environ(), cmd[0], cmd[1:]...) + + _, err := client.messagesConn.Write(rawMsg) if err != nil { log.Fatalln(err) } - reader := bufio.NewReader(client.conn) - buffer, err := reader.ReadBytes('\n') - if err != nil { - log.Fatalln(err) + go streamStdinToServer(client) + + for { + rawMsg, err := client.messagesConn.Read() + + if err != nil { + break + } + + msg, err := messaging.ParseMessage(rawMsg) + if err != nil { + break + } + + switch msg.Type { + case "stdout": + os.Stdout.Write(msg.GetOutput().Output) + case "stderr": + os.Stderr.Write(msg.GetOutput().Output) + case "exit": + return int(msg.GetExit().ExitStatus) + } } - return parseResponse(buffer) + return 255 } func (client *Client) Close() { - client.conn.Close() + client.messagesConn.Close() } -func makeRequest(cmd []string) []byte { - req := server.CmdRequest{ - Cmd: cmd, - } +func streamStdinToServer(client *Client) { + finished := false + reader := bufio.NewReader(os.Stdin) - buf, err := json.Marshal(&req) - if err != nil { - log.Fatalln("Failed marshalling a JSON request") - } + for { + buffer := make([]byte, bytes.MinRead) - return buf -} + n, err := reader.Read(buffer) -func parseResponse(rawResponse []byte) server.CmdResponse { - res := server.CmdResponse{} + if err == io.EOF { + finished = true + } - err := json.Unmarshal(rawResponse, &res) - if err != nil { - log.Fatalln("Failed parsing response") - } + if n < bytes.MinRead { + buffer = buffer[:n] + } + + rawMsg := messaging.NewInputMessage(buffer) + client.messagesConn.Write(rawMsg) - return res + if finished { + return + } + } } diff --git a/client/client_test.go b/client/client_test.go index 6bba82478..3b446a926 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,21 +1,24 @@ package client import ( + "bytes" "log" "os" "os/exec" "path" - "strings" "testing" "time" - "gitlab.com/gitlab-org/git-access-daemon/server" + serv "gitlab.com/gitlab-org/git-access-daemon/server" ) -const serviceAddress = "127.0.0.1:6667" +const serverAddress = "127.0.0.1:6667" const testRepo = "group/test.git" const testRepoRoot = "testdata/data" +var origStdout = os.Stdout +var origStderr = os.Stderr + func TestMain(m *testing.M) { source := "https://gitlab.com/gitlab-org/gitlab-test.git" clonePath := path.Join(testRepoRoot, testRepo) @@ -28,20 +31,21 @@ func TestMain(m *testing.M) { os.Exit(-1) } } - service := server.NewService() + server := serv.NewServer() - go service.Serve(serviceAddress, server.CommandExecutorCallback) - defer service.Stop() + go server.Serve(serverAddress, serv.CommandExecutor) + defer server.Stop() time.Sleep(10 * time.Millisecond) os.Exit(m.Run()) } func TestRunningGitCommandSuccessfully(t *testing.T) { - client := NewClient(serviceAddress) + client := NewClient(serverAddress) defer client.Close() - res := client.Request([]string{ + stdout, _ := redirectOutputStreams() + exitStatus := client.Run([]string{ "git", "--git-dir", path.Join(testRepoRoot, testRepo), @@ -49,23 +53,27 @@ func TestRunningGitCommandSuccessfully(t *testing.T) { "--count", "b83d6e391c", }) + restoreOutputStreams() - exit_status := 0 - if res.ExitStatus != exit_status { - t.Fatalf("Expected response exit status to equal %d, got %d", exit_status, res.ExitStatus) + expectedExitStatus := 0 + if exitStatus != expectedExitStatus { + t.Fatalf("Expected response exit status to equal %d, got %d", expectedExitStatus, exitStatus) } - msg := "37\n" - if res.Message != msg { - t.Fatalf("Expected response stdout to be \"%s\", got \"%s\"", msg, res.Message) + expectedStdout := []byte("37\n") + gotStdout := make([]byte, len(expectedStdout)) + stdout.Read(gotStdout) + if !bytes.Equal(gotStdout, expectedStdout) { + t.Fatalf("Expected response stdout to be \"%s\", got \"%s\"", expectedStdout, gotStdout) } } func TestRunningGitCommandUnsuccessfully(t *testing.T) { - client := NewClient(serviceAddress) + client := NewClient(serverAddress) defer client.Close() - res := client.Request([]string{ + _, stderr := redirectOutputStreams() + exitStatus := client.Run([]string{ "git", "--git-dir", path.Join(testRepoRoot, testRepo), @@ -73,14 +81,32 @@ func TestRunningGitCommandUnsuccessfully(t *testing.T) { "--count", "babecafe", }) + restoreOutputStreams() - exit_status := 128 - if res.ExitStatus != exit_status { - t.Fatalf("Expected response exit status to equal %d, got %d", exit_status, res.ExitStatus) + expectedExitStatus := 128 + if exitStatus != expectedExitStatus { + t.Fatalf("Expected response exit status to equal %d, got %d", expectedExitStatus, exitStatus) } - msg := "fatal: ambiguous argument 'babecafe': unknown revision or path not in the working tree." - if !strings.Contains(res.Message, msg) { - t.Fatalf("Expected stderr to contain \"%s\", found none in \"%s\"", msg, res.Message) + expectedStderr := []byte("fatal: ambiguous argument 'babecafe': unknown revision or path not in the working tree.") + gotStderr := make([]byte, len(expectedStderr)) + stderr.Read(gotStderr) + if !bytes.Contains(gotStderr, expectedStderr) { + t.Fatalf("Expected stderr to contain \"%s\", found none in \"%s\"", expectedStderr, gotStderr) } } + +func redirectOutputStreams() (*os.File, *os.File) { + stdoutReader, stdoutWriter, _ := os.Pipe() + stderrReader, stderrWriter, _ := os.Pipe() + + os.Stdout = stdoutWriter + os.Stderr = stderrWriter + + return stdoutReader, stderrReader +} + +func restoreOutputStreams() { + os.Stdout = origStdout + os.Stderr = origStderr +} diff --git a/cmd/client/main.go b/cmd/client/main.go index a0d645c6a..b00413fe6 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -11,13 +11,7 @@ func main() { defer client.Close() os.Args[0] = "git" - res := client.Request(os.Args) - if res.ExitStatus == 0 { - os.Stdout.Write([]byte(res.Message)) - } else { - os.Stderr.Write([]byte(res.Message)) - } - - os.Exit(res.ExitStatus) + exitStatus := client.Run(os.Args) + os.Exit(exitStatus) } diff --git a/cmd/server/main.go b/cmd/server/main.go index 83aa5b558..e37665464 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -5,20 +5,20 @@ import ( "os/signal" "syscall" - "gitlab.com/gitlab-org/git-access-daemon/server" + serv "gitlab.com/gitlab-org/git-access-daemon/server" ) func main() { ch := make(chan os.Signal) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - service := server.NewService() + server := serv.NewServer() go func() { - service.Serve("0.0.0.0:6666", server.CommandExecutorCallback) + server.Serve("0.0.0.0:6666", serv.CommandExecutor) }() select { case <-ch: - service.Stop() + server.Stop() } } diff --git a/messaging/conn.go b/messaging/conn.go new file mode 100644 index 000000000..16415a075 --- /dev/null +++ b/messaging/conn.go @@ -0,0 +1,67 @@ +package messaging + +import ( + "bufio" + "net" + "strconv" + "strings" +) + +type MessagesConn struct { + Conn net.Conn + readWriter *bufio.ReadWriter +} + +func NewMessagesConn(conn net.Conn) *MessagesConn { + reader := bufio.NewReader(conn) + writer := bufio.NewWriter(conn) + readWriter := bufio.NewReadWriter(reader, writer) + messagesConn := &MessagesConn{conn, readWriter} + + return messagesConn +} + +func (conn *MessagesConn) Read() ([]byte, error) { + lenStr, err := conn.readWriter.ReadString('$') + if err != nil { + return nil, err + } + + lenStr = strings.TrimRight(lenStr, "$") + payloadLen, err := strconv.Atoi(lenStr) + if err != nil { + return nil, err + } + + payload := make([]byte, 0) + for { + buffer := make([]byte, payloadLen-len(payload)) + n, err := conn.readWriter.Read(buffer) + if err != nil { + return nil, err + } + + payload = append(payload, buffer[:n]...) + + if len(payload) == payloadLen { + break + } + } + + return payload, nil +} + +func (conn *MessagesConn) Write(buffer []byte) (int, error) { + newBuffer := make([]byte, 0) + newBuffer = strconv.AppendInt(newBuffer, int64(len(buffer)), 10) + newBuffer = append(newBuffer, "$"...) + newBuffer = append(newBuffer, buffer...) + + n, err := conn.readWriter.Write(newBuffer) + conn.readWriter.Flush() + return n, err +} + +func (conn *MessagesConn) Close() { + conn.Conn.Close() +} diff --git a/messaging/messages.pb.go b/messaging/messages.pb.go new file mode 100644 index 000000000..ac5f24a39 --- /dev/null +++ b/messaging/messages.pb.go @@ -0,0 +1,336 @@ +// Code generated by protoc-gen-go. +// source: messaging/messages.proto +// DO NOT EDIT! + +/* +Package messaging is a generated protocol buffer package. + +It is generated from these files: + messaging/messages.proto + +It has these top-level messages: + Message + Command + Output + Input + Exit +*/ +package messaging + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Message struct { + Type string `protobuf:"bytes,1,opt,name=type" json:"type,omitempty"` + // Types that are valid to be assigned to Payload: + // *Message_Command + // *Message_Input + // *Message_Output + // *Message_Exit + Payload isMessage_Payload `protobuf_oneof:"payload"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type isMessage_Payload interface { + isMessage_Payload() +} + +type Message_Command struct { + Command *Command `protobuf:"bytes,2,opt,name=command,oneof"` +} +type Message_Input struct { + Input *Input `protobuf:"bytes,3,opt,name=input,oneof"` +} +type Message_Output struct { + Output *Output `protobuf:"bytes,4,opt,name=output,oneof"` +} +type Message_Exit struct { + Exit *Exit `protobuf:"bytes,5,opt,name=exit,oneof"` +} + +func (*Message_Command) isMessage_Payload() {} +func (*Message_Input) isMessage_Payload() {} +func (*Message_Output) isMessage_Payload() {} +func (*Message_Exit) isMessage_Payload() {} + +func (m *Message) GetPayload() isMessage_Payload { + if m != nil { + return m.Payload + } + return nil +} + +func (m *Message) GetType() string { + if m != nil { + return m.Type + } + return "" +} + +func (m *Message) GetCommand() *Command { + if x, ok := m.GetPayload().(*Message_Command); ok { + return x.Command + } + return nil +} + +func (m *Message) GetInput() *Input { + if x, ok := m.GetPayload().(*Message_Input); ok { + return x.Input + } + return nil +} + +func (m *Message) GetOutput() *Output { + if x, ok := m.GetPayload().(*Message_Output); ok { + return x.Output + } + return nil +} + +func (m *Message) GetExit() *Exit { + if x, ok := m.GetPayload().(*Message_Exit); ok { + return x.Exit + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*Message) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _Message_OneofMarshaler, _Message_OneofUnmarshaler, _Message_OneofSizer, []interface{}{ + (*Message_Command)(nil), + (*Message_Input)(nil), + (*Message_Output)(nil), + (*Message_Exit)(nil), + } +} + +func _Message_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*Message) + // payload + switch x := m.Payload.(type) { + case *Message_Command: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Command); err != nil { + return err + } + case *Message_Input: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Input); err != nil { + return err + } + case *Message_Output: + b.EncodeVarint(4<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Output); err != nil { + return err + } + case *Message_Exit: + b.EncodeVarint(5<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Exit); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("Message.Payload has unexpected type %T", x) + } + return nil +} + +func _Message_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*Message) + switch tag { + case 2: // payload.command + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Command) + err := b.DecodeMessage(msg) + m.Payload = &Message_Command{msg} + return true, err + case 3: // payload.input + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Input) + err := b.DecodeMessage(msg) + m.Payload = &Message_Input{msg} + return true, err + case 4: // payload.output + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Output) + err := b.DecodeMessage(msg) + m.Payload = &Message_Output{msg} + return true, err + case 5: // payload.exit + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Exit) + err := b.DecodeMessage(msg) + m.Payload = &Message_Exit{msg} + return true, err + default: + return false, nil + } +} + +func _Message_OneofSizer(msg proto.Message) (n int) { + m := msg.(*Message) + // payload + switch x := m.Payload.(type) { + case *Message_Command: + s := proto.Size(x.Command) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Message_Input: + s := proto.Size(x.Input) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Message_Output: + s := proto.Size(x.Output) + n += proto.SizeVarint(4<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Message_Exit: + s := proto.Size(x.Exit) + n += proto.SizeVarint(5<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type Command struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + Args []string `protobuf:"bytes,2,rep,name=args" json:"args,omitempty"` + Environ []string `protobuf:"bytes,3,rep,name=environ" json:"environ,omitempty"` +} + +func (m *Command) Reset() { *m = Command{} } +func (m *Command) String() string { return proto.CompactTextString(m) } +func (*Command) ProtoMessage() {} +func (*Command) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *Command) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Command) GetArgs() []string { + if m != nil { + return m.Args + } + return nil +} + +func (m *Command) GetEnviron() []string { + if m != nil { + return m.Environ + } + return nil +} + +type Output struct { + Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` +} + +func (m *Output) Reset() { *m = Output{} } +func (m *Output) String() string { return proto.CompactTextString(m) } +func (*Output) ProtoMessage() {} +func (*Output) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *Output) GetOutput() []byte { + if m != nil { + return m.Output + } + return nil +} + +type Input struct { + Stdin []byte `protobuf:"bytes,1,opt,name=stdin,proto3" json:"stdin,omitempty"` +} + +func (m *Input) Reset() { *m = Input{} } +func (m *Input) String() string { return proto.CompactTextString(m) } +func (*Input) ProtoMessage() {} +func (*Input) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *Input) GetStdin() []byte { + if m != nil { + return m.Stdin + } + return nil +} + +type Exit struct { + ExitStatus int32 `protobuf:"varint,1,opt,name=exit_status,json=exitStatus" json:"exit_status,omitempty"` +} + +func (m *Exit) Reset() { *m = Exit{} } +func (m *Exit) String() string { return proto.CompactTextString(m) } +func (*Exit) ProtoMessage() {} +func (*Exit) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *Exit) GetExitStatus() int32 { + if m != nil { + return m.ExitStatus + } + return 0 +} + +func init() { + proto.RegisterType((*Message)(nil), "messaging.Message") + proto.RegisterType((*Command)(nil), "messaging.Command") + proto.RegisterType((*Output)(nil), "messaging.Output") + proto.RegisterType((*Input)(nil), "messaging.Input") + proto.RegisterType((*Exit)(nil), "messaging.Exit") +} + +func init() { proto.RegisterFile("messaging/messages.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 280 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x4c, 0x91, 0xdf, 0x4a, 0xb5, 0x40, + 0x14, 0xc5, 0xf5, 0xf8, 0x0f, 0xf7, 0xf9, 0xe0, 0xab, 0x4d, 0xc4, 0xdc, 0x44, 0x22, 0x44, 0x42, + 0x60, 0x50, 0x6f, 0x50, 0x04, 0x46, 0x44, 0x60, 0x0f, 0x10, 0x53, 0x8a, 0x0c, 0xe4, 0x8c, 0x38, + 0x63, 0x9c, 0xf3, 0xa2, 0x3d, 0x4f, 0xec, 0x19, 0x15, 0xef, 0xd6, 0x5e, 0xeb, 0xa7, 0xac, 0xc5, + 0x00, 0xeb, 0x5b, 0xad, 0x79, 0x27, 0x64, 0x77, 0xeb, 0x54, 0xab, 0xcb, 0x61, 0x54, 0x46, 0x61, + 0xba, 0x26, 0xf9, 0xaf, 0x0f, 0xc9, 0xab, 0x4b, 0x11, 0x21, 0x34, 0xc7, 0xa1, 0x65, 0x7e, 0xe6, + 0x17, 0x69, 0x6d, 0x35, 0x96, 0x90, 0x7c, 0xa9, 0xbe, 0xe7, 0xb2, 0x61, 0xbb, 0xcc, 0x2f, 0xf6, + 0x77, 0x58, 0xae, 0x1f, 0x97, 0x8f, 0x2e, 0xa9, 0xbc, 0x7a, 0x81, 0xb0, 0x80, 0x48, 0xc8, 0x61, + 0x32, 0x2c, 0xb0, 0xf4, 0xc9, 0x86, 0x7e, 0x26, 0xbf, 0xf2, 0x6a, 0x07, 0xe0, 0x0d, 0xc4, 0x6a, + 0x32, 0x84, 0x86, 0x16, 0x3d, 0xdd, 0xa0, 0x6f, 0x36, 0xa8, 0xbc, 0x7a, 0x46, 0xf0, 0x0a, 0xc2, + 0xf6, 0x20, 0x0c, 0x8b, 0x2c, 0xfa, 0x7f, 0x83, 0x3e, 0x1d, 0x04, 0x81, 0x36, 0x7e, 0x48, 0x21, + 0x19, 0xf8, 0xf1, 0x5b, 0xf1, 0x26, 0x7f, 0x81, 0x64, 0xae, 0x47, 0xbb, 0x24, 0xef, 0xd7, 0x5d, + 0xa4, 0xc9, 0xe3, 0x63, 0xa7, 0xd9, 0x2e, 0x0b, 0xc8, 0x23, 0x8d, 0x0c, 0x92, 0x56, 0xfe, 0x88, + 0x51, 0x49, 0x16, 0x58, 0x7b, 0x39, 0xf3, 0x0c, 0x62, 0x57, 0x09, 0xcf, 0xd7, 0xd6, 0xf4, 0xb7, + 0x7f, 0x4b, 0xc1, 0xfc, 0x02, 0x22, 0xbb, 0x0f, 0xcf, 0x20, 0xd2, 0xa6, 0x11, 0x72, 0xce, 0xdd, + 0x91, 0x5f, 0x43, 0x48, 0x45, 0xf1, 0x12, 0xf6, 0x54, 0xf4, 0x43, 0x1b, 0x6e, 0x26, 0x6d, 0x99, + 0xa8, 0x06, 0xb2, 0xde, 0xad, 0xf3, 0x19, 0xdb, 0x17, 0xba, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, + 0x10, 0x99, 0x01, 0x7a, 0xbd, 0x01, 0x00, 0x00, +} diff --git a/messaging/messages.proto b/messaging/messages.proto new file mode 100644 index 000000000..fea82625d --- /dev/null +++ b/messaging/messages.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package messaging; + +message Message { + string type = 1; + oneof payload { + Command command = 2; + Input input = 3; + Output output = 4; + Exit exit = 5; + } +} + +message Command { + string name = 1; + repeated string args = 2; + repeated string environ = 3; +} + +message Output { + bytes output = 1; +} + +message Input { + bytes stdin = 1; +} + +message Exit { + int32 exit_status = 1; +} diff --git a/messaging/messaging.go b/messaging/messaging.go new file mode 100644 index 000000000..c40521232 --- /dev/null +++ b/messaging/messaging.go @@ -0,0 +1,67 @@ +package messaging + +import ( + "log" + + proto "github.com/golang/protobuf/proto" +) + +func ParseMessage(rawMsg []byte) (*Message, error) { + msg := &Message{} + + err := proto.Unmarshal(rawMsg, msg) + return msg, err +} + +func NewCommandMessage(environ []string, name string, args ...string) []byte { + msg := &Message{ + Type: "command", + Payload: &Message_Command{ + &Command{name, args, environ}, + }, + } + + return marshal(msg) +} + +func NewInputMessage(input []byte) []byte { + msg := &Message{ + Type: "stdin", + Payload: &Message_Input{ + &Input{input}, + }, + } + + return marshal(msg) +} + +func NewOutputMessage(outputType string, output []byte) []byte { + msg := &Message{ + Type: outputType, + Payload: &Message_Output{ + &Output{output}, + }, + } + + return marshal(msg) +} + +func NewExitMessage(exitStatus int32) []byte { + msg := &Message{ + Type: "exit", + Payload: &Message_Exit{ + &Exit{exitStatus}, + }, + } + + return marshal(msg) +} + +func marshal(msg *Message) []byte { + buffer, err := proto.Marshal(msg) + if err != nil { + log.Fatalln("Failed marshalling a Protobuf message") + } + + return buffer +} diff --git a/server/command_executor.go b/server/command_executor.go index 49d118025..284c74644 100644 --- a/server/command_executor.go +++ b/server/command_executor.go @@ -2,68 +2,63 @@ package server import ( "bytes" - "encoding/json" + "io" "log" "os/exec" "syscall" -) - -type CmdRequest struct { - Cmd []string `json:"cmd"` -} -type CmdResponse struct { - Status string `json:"status"` - Message string `json:"message"` - ExitStatus int `json:"exit_status"` -} - -func CommandExecutorCallback(input []byte) []byte { - req := CmdRequest{} + "gitlab.com/gitlab-org/git-access-daemon/messaging" +) - err := json.Unmarshal(input, &req) - if err != nil { - return errorResponse("Error parsing JSON request", 255) +func CommandExecutor(chans *commChans) { + rawMsg, ok := <-chans.inChan + if !ok { + return } - output, err := runCommand(req.Cmd[0], req.Cmd[1:]...) - + msg, err := messaging.ParseMessage(rawMsg) if err != nil { - return errorResponse( - string(output.Bytes()), - extractExitStatusFromError(err.(*exec.ExitError)), - ) + return + } + if msg.Type != "command" { + return } - return successResponse(string(output.Bytes())) + runCommand(chans, msg.GetCommand()) } -func runCommand(name string, args ...string) (bytes.Buffer, error) { - var stdoutBuf bytes.Buffer - var stderrBuf bytes.Buffer +func runCommand(chans *commChans, commandMsg *messaging.Command) { + name := commandMsg.Name + args := commandMsg.Args log.Println("Executing command:", name, "with args", args) - cmd := makeCommand(name, args...) - cmd.Stdout = &stdoutBuf - cmd.Stderr = &stderrBuf + stdinReader, stdinWriter := io.Pipe() + stdoutReader, stdoutWriter := io.Pipe() + stderrReader, stderrWriter := io.Pipe() - err := cmd.Run() - if err != nil { - return stderrBuf, err - } - - return stdoutBuf, nil -} + go streamOut("stdout", stdoutReader, chans) + go streamOut("stderr", stderrReader, chans) + go streamIn(stdinWriter, chans) -// Based on git.gitCommand from gitlab-workhorse -func makeCommand(name string, args ...string) *exec.Cmd { cmd := exec.Command(name, args...) // Start the command in its own process group (nice for signalling) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Env = commandMsg.Environ + cmd.Stdin = stdinReader + cmd.Stdout = stdoutWriter + cmd.Stderr = stderrWriter - return cmd + err := cmd.Run() + + if err != nil { + exitStatus := int32(extractExitStatusFromError(err.(*exec.ExitError))) + chans.outChan <- messaging.NewExitMessage(exitStatus) + return + } + + chans.outChan <- messaging.NewExitMessage(0) } func extractExitStatusFromError(err *exec.ExitError) int { @@ -77,24 +72,52 @@ func extractExitStatusFromError(err *exec.ExitError) int { return 255 } -func errorResponse(message string, exit_status int) []byte { - return makeResponse("error", message, exit_status) -} +func streamOut(streamName string, streamPipe io.Reader, chans *commChans) { + // TODO: Move buffer out of the loop and use defer instead of finished + finished := false -func successResponse(message string) []byte { - return makeResponse("success", message, 0) -} + for { + buffer := make([]byte, bytes.MinRead) -func makeResponse(status string, message string, exit_status int) []byte { - res := CmdResponse{status, message, exit_status} - tempBuf, err := json.Marshal(res) + n, err := streamPipe.Read(buffer) + if err == io.EOF { + finished = true + } - if err != nil { - log.Fatalln("Failed marshalling a JSON response") - } + if n < bytes.MinRead { + buffer = buffer[:n] + } + + chans.outChan <- messaging.NewOutputMessage(streamName, buffer) - buf := bytes.NewBuffer(tempBuf) - buf.WriteString("\n") + if finished { + return + } + } +} - return buf.Bytes() +func streamIn(streamPipe *io.PipeWriter, chans *commChans) { + defer streamPipe.Close() + + for { + rawMsg, ok := <-chans.inChan + if !ok { + return + } + + msg, err := messaging.ParseMessage(rawMsg) + if msg.Type != "stdin" { + continue + } + + stdin := msg.GetInput().Stdin + if len(stdin) == 0 { + return + } + + _, err = streamPipe.Write(stdin) + if err != nil { + return + } + } } diff --git a/server/command_executor_test.go b/server/command_executor_test.go index f983d9bd8..4fb1fd0cf 100644 --- a/server/command_executor_test.go +++ b/server/command_executor_test.go @@ -1,103 +1,55 @@ package server import ( - "bufio" - "bytes" - "encoding/json" - "net" - "os" - "strings" "testing" - "time" -) - -const serviceAddress = "127.0.0.1:6667" - -func TestMain(m *testing.M) { - service := NewService() - - go service.Serve(serviceAddress, CommandExecutorCallback) - defer service.Stop() - time.Sleep(10 * time.Millisecond) - os.Exit(m.Run()) -} + "gitlab.com/gitlab-org/git-access-daemon/messaging" +) func TestRunningCommandSuccessfully(t *testing.T) { - res := responseFromCommand(`{"cmd":["ls", "-hal"]}`, t) - - if res.ExitStatus != 0 { - t.Error("Expected exit status of 0, found %v", res.ExitStatus) - } - - if res.Status != "success" { // We should have these statuses as constants in the package - t.Error("Expected status of success, found %v", res.Status) - } -} - -func TestRunningCommandUnsuccessfully(t *testing.T) { - res := responseFromCommand(`{"cmd":["ls", "/file-that-does-not-exist"]}`, t) - - if res.ExitStatus == 0 { - t.Error("Expected a failure exit status, got 0") - } + chans := newCommChans() + defer chans.Close() - if res.Status != "error" { - t.Error("Expected error status, got %v", res.Status) - } + go CommandExecutor(chans) - if !(strings.Contains(res.Message, "cannot access") || - strings.Contains(res.Message, "No such file or directory")) { - t.Error("Expected ls error message, got %v", res.Message) - } -} - -func TestMalformedCommand(t *testing.T) { - res := responseFromCommand(`{"cmd":["ls", "/file-that-does-not-exist"}`, t) - - if res.Status != "error" { - t.Error("Expected error status, got %s", res.Status) - } + chans.inChan <- messaging.NewCommandMessage([]string{}, "ls", "-hal") + chans.inChan <- messaging.NewInputMessage([]byte{}) - if res.Message != "Error parsing JSON request" { - t.Error("Expected parsing json error message, got %s", res.Message) - } + for { + rawMsg := <-chans.outChan + msg, _ := messaging.ParseMessage(rawMsg) - if res.ExitStatus != 255 { - t.Error("Expected exit status 255, got %v", res.ExitStatus) + switch msg.Type { + case "exit": + exit_status := msg.GetExit().ExitStatus + if exit_status != 0 { + t.Error("Expected exit status of 0, found %v", exit_status) + } + return + } } } -// These 2 functions could be interesting to reuse in the client -// For this to happen we should remove the testing dependency and -// then move them to an accessible place. -func responseFromCommand(cmd string, t *testing.T) CmdResponse { - var response CmdResponse - buffer := bytesFromCommand(cmd, t) - err := json.Unmarshal(buffer, &response) - if err != nil { - t.Error(err) - } - return response -} +func TestRunningCommandUnsuccessfully(t *testing.T) { + chans := newCommChans() + defer chans.Close() -func bytesFromCommand(cmd string, t *testing.T) []byte { - conn, err := net.Dial("tcp", serviceAddress) - if err != nil { - t.Fatal(err) - } + go CommandExecutor(chans) - defer conn.Close() + chans.inChan <- messaging.NewCommandMessage([]string{}, "ls", "/file-that-does-not-exist") + chans.inChan <- messaging.NewInputMessage([]byte{}) - if _, err := conn.Write([]byte(cmd + "\n")); err != nil { - t.Error(err) - } + for { + rawMsg := <-chans.outChan + msg, _ := messaging.ParseMessage(rawMsg) - reader := bufio.NewReader(conn) - buffer, err := reader.ReadBytes('\n') - if err != nil { - t.Error(err) + switch msg.Type { + case "exit": + exit_status := msg.GetExit().ExitStatus + if exit_status == 0 { + t.Error("Expected a failure exit status, got 0") + } + return + } } - - return bytes.TrimSpace(buffer) } diff --git a/server/server.go b/server/server.go index 72a06c3e5..83b88e67f 100644 --- a/server/server.go +++ b/server/server.go @@ -1,31 +1,30 @@ package server import ( - "bufio" "io" "log" "net" "sync" "time" + + "gitlab.com/gitlab-org/git-access-daemon/messaging" ) -type Service struct { +type Server struct { ch chan bool waitGroup *sync.WaitGroup } -type Callback func([]byte) []byte - -func NewService() *Service { - service := &Service{ +func NewServer() *Server { + server := &Server{ ch: make(chan bool), waitGroup: &sync.WaitGroup{}, } - service.waitGroup.Add(1) - return service + server.waitGroup.Add(1) + return server } -func (s *Service) Serve(address string, cb Callback) { +func (s *Server) Serve(address string, service Service) { listener, err := newListener(address) if err != nil { log.Fatalln(err) @@ -50,7 +49,10 @@ func (s *Service) Serve(address string, cb Callback) { } log.Println("Client connected from ", conn.RemoteAddr()) s.waitGroup.Add(1) - go s.serve(conn, cb) + + chans := newCommChans() + go service(chans) + go s.serve(conn, chans) } } @@ -62,15 +64,31 @@ func newListener(address string) (*net.TCPListener, error) { return net.ListenTCP("tcp", tcpAddress) } -func (s *Service) Stop() { +func (s *Server) Stop() { close(s.ch) s.waitGroup.Wait() } -func (s *Service) serve(conn *net.TCPConn, cb Callback) { +func (s *Server) serve(conn *net.TCPConn, chans *commChans) { defer conn.Close() defer s.waitGroup.Done() + messagesConn := messaging.NewMessagesConn(conn) + + go func() { + for { + ret, ok := <-chans.outChan + if !ok { + return + } + + if _, err := messagesConn.Write(ret); nil != err { + log.Println(err) + return + } + } + }() + for { select { case <-s.ch: @@ -81,8 +99,7 @@ func (s *Service) serve(conn *net.TCPConn, cb Callback) { conn.SetDeadline(time.Now().Add(1e9)) - reader := bufio.NewReader(conn) - buffer, err := reader.ReadBytes('\n') + buffer, err := messagesConn.Read() if err != nil { if err == io.EOF { log.Println("Client", conn.RemoteAddr(), "closed the connection") @@ -94,10 +111,8 @@ func (s *Service) serve(conn *net.TCPConn, cb Callback) { log.Println(err) } - ret := cb(buffer) - if _, err := conn.Write(ret); nil != err { - log.Println(err) - return - } + chans.inChan <- buffer } + + chans.Close() } diff --git a/server/server_test.go b/server/server_test.go index 83f2f5453..81bb318e8 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1,35 +1,42 @@ package server import ( - "bufio" "fmt" "net" "testing" "time" + + "gitlab.com/gitlab-org/git-access-daemon/messaging" ) func TestServerStandingUp(t *testing.T) { - service := NewService() + server := NewServer() address := "127.0.0.1:6666" - go service.Serve(address, func(input []byte) []byte { return input }) - defer service.Stop() + go server.Serve(address, func(chans *commChans) { + a := (<-chans.inChan) + chans.outChan <- a + }) + defer server.Stop() - // Give service a little time to start listening for connections + // Give server a little time to start listening for connections time.Sleep(10 * time.Millisecond) conn, err := net.Dial("tcp", address) if err != nil { t.Fatal(err) } - if _, err := conn.Write([]byte("hola hola!\n")); err != nil { + messagesConn := messaging.NewMessagesConn(conn) + + if _, err := messagesConn.Write([]byte("hola hola!")); err != nil { t.Error(err) } - reader := bufio.NewReader(conn) - buffer, err := reader.ReadBytes('\n') + + buffer, err := messagesConn.Read() if err != nil { t.Error(err) } - conn.Close() + messagesConn.Close() + fmt.Println("Received from server:", string(buffer)) } diff --git a/server/service.go b/server/service.go new file mode 100644 index 000000000..1dea68435 --- /dev/null +++ b/server/service.go @@ -0,0 +1,20 @@ +package server + +type commChans struct { + inChan chan []byte + outChan chan []byte +} + +type Service func(*commChans) + +func newCommChans() *commChans { + return &commChans{ + inChan: make(chan []byte), + outChan: make(chan []byte), + } +} + +func (chans *commChans) Close() { + close(chans.inChan) + close(chans.outChan) +} diff --git a/vendor/vendor.json b/vendor/vendor.json new file mode 100644 index 000000000..19ecc88d2 --- /dev/null +++ b/vendor/vendor.json @@ -0,0 +1,13 @@ +{ + "comment": "", + "ignore": "test", + "package": [ + { + "checksumSHA1": "kBeNcaKk56FguvPSUCEaH6AxpRc=", + "path": "github.com/golang/protobuf/proto", + "revision": "8ee79997227bf9b34611aee7946ae64735e6fd93", + "revisionTime": "2016-11-17T03:31:26Z" + } + ], + "rootPath": "gitlab.com/gitlab-org/git-access-daemon" +} |