diff options
author | Ahmad Sherif <ahmad.m.sherif@gmail.com> | 2016-12-19 17:48:17 +0300 |
---|---|---|
committer | Ahmad Sherif <ahmad.m.sherif@gmail.com> | 2016-12-19 17:48:17 +0300 |
commit | b549c388d3f72439f4f3653f2cd14f54a2c31eae (patch) | |
tree | cac8070cd2ed24d14cb4bd724c66339a992f878d | |
parent | 0313772717a6567f4bd0ba30a5cfeed2769a44f8 (diff) | |
parent | 318b174c3acb67fdeedc4b31dd7bc7736c092f7e (diff) |
Merge branch 'initial-server' into 'master'
Merge initial-server branch into master
See merge request !12
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | .gitlab-ci.yml | 5 | ||||
-rw-r--r-- | Makefile | 34 | ||||
-rw-r--r-- | client/client.go | 91 | ||||
-rw-r--r-- | client/client_test.go | 112 | ||||
-rw-r--r-- | cmd/client/main.go | 17 | ||||
-rw-r--r-- | cmd/server/main.go | 24 | ||||
-rw-r--r-- | messaging/conn.go | 67 | ||||
-rw-r--r-- | messaging/messages.pb.go | 336 | ||||
-rw-r--r-- | messaging/messages.proto | 31 | ||||
-rw-r--r-- | messaging/messaging.go | 67 | ||||
-rw-r--r-- | server/command_executor.go | 123 | ||||
-rw-r--r-- | server/command_executor_test.go | 55 | ||||
-rw-r--r-- | server/server.go | 118 | ||||
-rw-r--r-- | server/server_test.go | 42 | ||||
-rw-r--r-- | server/service.go | 20 | ||||
-rw-r--r-- | vendor/vendor.json | 13 |
17 files changed, 1160 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..c1c3d9944 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +client/testdata/data +_build +vendor/*/ +git-daemon-client +git-daemon-server diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 000000000..151aaea21 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,5 @@ +image: golang:1.7 + +test: + script: + - make test diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..bf61ec5da --- /dev/null +++ b/Makefile @@ -0,0 +1,34 @@ +PKG=gitlab.com/gitlab-org/git-access-daemon +BUILD_DIR=$(shell pwd) +CLIENT_BIN=git-daemon-client +SERVER_BIN=git-daemon-server + +export GOPATH=${BUILD_DIR}/_build +export PATH:=${GOPATH}/bin:$(PATH) + +.PHONY: ${BUILD_DIR}/_build + +all: test build + +${BUILD_DIR}/_build: + mkdir -p $@/src/${PKG} + tar -cf - --exclude _build --exclude .git . | (cd $@/src/${PKG} && tar -xf -) + touch $@ + +deps: ${BUILD_DIR}/_build + (which govendor) || go get -u github.com/kardianos/govendor + cd ${BUILD_DIR}/_build/src/${PKG} && govendor fetch +out + +build: deps + go build -o ${SERVER_BIN} cmd/server/main.go + go build -o ${CLIENT_BIN} cmd/client/main.go + +test: ${BUILD_DIR}/_build deps + cd ${BUILD_DIR}/_build/src/${PKG}/server && go test -v + cd ${BUILD_DIR}/_build/src/${PKG}/client && go test -v + +clean: + rm -rf ${BUILD_DIR}/_build + rm -rf client/testdata + [ -f ${CLIENT_BIN} ] && rm ${CLIENT_BIN} + [ -f ${SERVER_BIN} ] && rm ${SERVER_BIN} diff --git a/client/client.go b/client/client.go new file mode 100644 index 000000000..246771573 --- /dev/null +++ b/client/client.go @@ -0,0 +1,91 @@ +package client + +import ( + "bufio" + "bytes" + "io" + "log" + "net" + "os" + + "gitlab.com/gitlab-org/git-access-daemon/messaging" +) + +type Client struct { + messagesConn *messaging.MessagesConn +} + +func NewClient(serverAddress string) *Client { + conn, err := net.Dial("tcp", serverAddress) + if err != nil { + log.Fatalln(err) + } + + messagesConn := messaging.NewMessagesConn(conn) + return &Client{messagesConn} +} + +func (client *Client) Run(cmd []string) int { + rawMsg := messaging.NewCommandMessage(os.Environ(), cmd[0], cmd[1:]...) + + _, err := client.messagesConn.Write(rawMsg) + if err != nil { + log.Fatalln(err) + } + + go streamStdinToServer(client) + + for { + rawMsg, err := client.messagesConn.Read() + + if err != nil { + break + } + + msg, err := messaging.ParseMessage(rawMsg) + if err != nil { + break + } + + switch msg.Type { + case "stdout": + os.Stdout.Write(msg.GetOutput().Output) + case "stderr": + os.Stderr.Write(msg.GetOutput().Output) + case "exit": + return int(msg.GetExit().ExitStatus) + } + } + + return 255 +} + +func (client *Client) Close() { + client.messagesConn.Close() +} + +func streamStdinToServer(client *Client) { + finished := false + reader := bufio.NewReader(os.Stdin) + + for { + buffer := make([]byte, bytes.MinRead) + + n, err := reader.Read(buffer) + + if err == io.EOF { + finished = true + } + + if n < bytes.MinRead { + buffer = buffer[:n] + } + + rawMsg := messaging.NewInputMessage(buffer) + client.messagesConn.Write(rawMsg) + + if finished { + return + } + } +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 000000000..3b446a926 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,112 @@ +package client + +import ( + "bytes" + "log" + "os" + "os/exec" + "path" + "testing" + "time" + + serv "gitlab.com/gitlab-org/git-access-daemon/server" +) + +const serverAddress = "127.0.0.1:6667" +const testRepo = "group/test.git" +const testRepoRoot = "testdata/data" + +var origStdout = os.Stdout +var origStderr = os.Stderr + +func TestMain(m *testing.M) { + source := "https://gitlab.com/gitlab-org/gitlab-test.git" + clonePath := path.Join(testRepoRoot, testRepo) + if _, err := os.Stat(clonePath); err != nil { + testCmd := exec.Command("git", "clone", "--bare", source, clonePath) + testCmd.Stdout = os.Stdout + testCmd.Stderr = os.Stderr + if err := testCmd.Run(); err != nil { + log.Printf("Test setup: failed to run %v", testCmd) + os.Exit(-1) + } + } + server := serv.NewServer() + + go server.Serve(serverAddress, serv.CommandExecutor) + defer server.Stop() + + time.Sleep(10 * time.Millisecond) + os.Exit(m.Run()) +} + +func TestRunningGitCommandSuccessfully(t *testing.T) { + client := NewClient(serverAddress) + defer client.Close() + + stdout, _ := redirectOutputStreams() + exitStatus := client.Run([]string{ + "git", + "--git-dir", + path.Join(testRepoRoot, testRepo), + "rev-list", + "--count", + "b83d6e391c", + }) + restoreOutputStreams() + + expectedExitStatus := 0 + if exitStatus != expectedExitStatus { + t.Fatalf("Expected response exit status to equal %d, got %d", expectedExitStatus, exitStatus) + } + + expectedStdout := []byte("37\n") + gotStdout := make([]byte, len(expectedStdout)) + stdout.Read(gotStdout) + if !bytes.Equal(gotStdout, expectedStdout) { + t.Fatalf("Expected response stdout to be \"%s\", got \"%s\"", expectedStdout, gotStdout) + } +} + +func TestRunningGitCommandUnsuccessfully(t *testing.T) { + client := NewClient(serverAddress) + defer client.Close() + + _, stderr := redirectOutputStreams() + exitStatus := client.Run([]string{ + "git", + "--git-dir", + path.Join(testRepoRoot, testRepo), + "rev-list", + "--count", + "babecafe", + }) + restoreOutputStreams() + + expectedExitStatus := 128 + if exitStatus != expectedExitStatus { + t.Fatalf("Expected response exit status to equal %d, got %d", expectedExitStatus, exitStatus) + } + + expectedStderr := []byte("fatal: ambiguous argument 'babecafe': unknown revision or path not in the working tree.") + gotStderr := make([]byte, len(expectedStderr)) + stderr.Read(gotStderr) + if !bytes.Contains(gotStderr, expectedStderr) { + t.Fatalf("Expected stderr to contain \"%s\", found none in \"%s\"", expectedStderr, gotStderr) + } +} + +func redirectOutputStreams() (*os.File, *os.File) { + stdoutReader, stdoutWriter, _ := os.Pipe() + stderrReader, stderrWriter, _ := os.Pipe() + + os.Stdout = stdoutWriter + os.Stderr = stderrWriter + + return stdoutReader, stderrReader +} + +func restoreOutputStreams() { + os.Stdout = origStdout + os.Stderr = origStderr +} diff --git a/cmd/client/main.go b/cmd/client/main.go new file mode 100644 index 000000000..b00413fe6 --- /dev/null +++ b/cmd/client/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "os" + + "gitlab.com/gitlab-org/git-access-daemon/client" +) + +func main() { + client := client.NewClient("127.0.0.1:6666") + defer client.Close() + + os.Args[0] = "git" + + exitStatus := client.Run(os.Args) + os.Exit(exitStatus) +} diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 000000000..e37665464 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "os" + "os/signal" + "syscall" + + serv "gitlab.com/gitlab-org/git-access-daemon/server" +) + +func main() { + ch := make(chan os.Signal) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + + server := serv.NewServer() + go func() { + server.Serve("0.0.0.0:6666", serv.CommandExecutor) + }() + + select { + case <-ch: + server.Stop() + } +} diff --git a/messaging/conn.go b/messaging/conn.go new file mode 100644 index 000000000..16415a075 --- /dev/null +++ b/messaging/conn.go @@ -0,0 +1,67 @@ +package messaging + +import ( + "bufio" + "net" + "strconv" + "strings" +) + +type MessagesConn struct { + Conn net.Conn + readWriter *bufio.ReadWriter +} + +func NewMessagesConn(conn net.Conn) *MessagesConn { + reader := bufio.NewReader(conn) + writer := bufio.NewWriter(conn) + readWriter := bufio.NewReadWriter(reader, writer) + messagesConn := &MessagesConn{conn, readWriter} + + return messagesConn +} + +func (conn *MessagesConn) Read() ([]byte, error) { + lenStr, err := conn.readWriter.ReadString('$') + if err != nil { + return nil, err + } + + lenStr = strings.TrimRight(lenStr, "$") + payloadLen, err := strconv.Atoi(lenStr) + if err != nil { + return nil, err + } + + payload := make([]byte, 0) + for { + buffer := make([]byte, payloadLen-len(payload)) + n, err := conn.readWriter.Read(buffer) + if err != nil { + return nil, err + } + + payload = append(payload, buffer[:n]...) + + if len(payload) == payloadLen { + break + } + } + + return payload, nil +} + +func (conn *MessagesConn) Write(buffer []byte) (int, error) { + newBuffer := make([]byte, 0) + newBuffer = strconv.AppendInt(newBuffer, int64(len(buffer)), 10) + newBuffer = append(newBuffer, "$"...) + newBuffer = append(newBuffer, buffer...) + + n, err := conn.readWriter.Write(newBuffer) + conn.readWriter.Flush() + return n, err +} + +func (conn *MessagesConn) Close() { + conn.Conn.Close() +} diff --git a/messaging/messages.pb.go b/messaging/messages.pb.go new file mode 100644 index 000000000..ac5f24a39 --- /dev/null +++ b/messaging/messages.pb.go @@ -0,0 +1,336 @@ +// Code generated by protoc-gen-go. +// source: messaging/messages.proto +// DO NOT EDIT! + +/* +Package messaging is a generated protocol buffer package. + +It is generated from these files: + messaging/messages.proto + +It has these top-level messages: + Message + Command + Output + Input + Exit +*/ +package messaging + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Message struct { + Type string `protobuf:"bytes,1,opt,name=type" json:"type,omitempty"` + // Types that are valid to be assigned to Payload: + // *Message_Command + // *Message_Input + // *Message_Output + // *Message_Exit + Payload isMessage_Payload `protobuf_oneof:"payload"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type isMessage_Payload interface { + isMessage_Payload() +} + +type Message_Command struct { + Command *Command `protobuf:"bytes,2,opt,name=command,oneof"` +} +type Message_Input struct { + Input *Input `protobuf:"bytes,3,opt,name=input,oneof"` +} +type Message_Output struct { + Output *Output `protobuf:"bytes,4,opt,name=output,oneof"` +} +type Message_Exit struct { + Exit *Exit `protobuf:"bytes,5,opt,name=exit,oneof"` +} + +func (*Message_Command) isMessage_Payload() {} +func (*Message_Input) isMessage_Payload() {} +func (*Message_Output) isMessage_Payload() {} +func (*Message_Exit) isMessage_Payload() {} + +func (m *Message) GetPayload() isMessage_Payload { + if m != nil { + return m.Payload + } + return nil +} + +func (m *Message) GetType() string { + if m != nil { + return m.Type + } + return "" +} + +func (m *Message) GetCommand() *Command { + if x, ok := m.GetPayload().(*Message_Command); ok { + return x.Command + } + return nil +} + +func (m *Message) GetInput() *Input { + if x, ok := m.GetPayload().(*Message_Input); ok { + return x.Input + } + return nil +} + +func (m *Message) GetOutput() *Output { + if x, ok := m.GetPayload().(*Message_Output); ok { + return x.Output + } + return nil +} + +func (m *Message) GetExit() *Exit { + if x, ok := m.GetPayload().(*Message_Exit); ok { + return x.Exit + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*Message) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _Message_OneofMarshaler, _Message_OneofUnmarshaler, _Message_OneofSizer, []interface{}{ + (*Message_Command)(nil), + (*Message_Input)(nil), + (*Message_Output)(nil), + (*Message_Exit)(nil), + } +} + +func _Message_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*Message) + // payload + switch x := m.Payload.(type) { + case *Message_Command: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Command); err != nil { + return err + } + case *Message_Input: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Input); err != nil { + return err + } + case *Message_Output: + b.EncodeVarint(4<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Output); err != nil { + return err + } + case *Message_Exit: + b.EncodeVarint(5<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Exit); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("Message.Payload has unexpected type %T", x) + } + return nil +} + +func _Message_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*Message) + switch tag { + case 2: // payload.command + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Command) + err := b.DecodeMessage(msg) + m.Payload = &Message_Command{msg} + return true, err + case 3: // payload.input + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Input) + err := b.DecodeMessage(msg) + m.Payload = &Message_Input{msg} + return true, err + case 4: // payload.output + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Output) + err := b.DecodeMessage(msg) + m.Payload = &Message_Output{msg} + return true, err + case 5: // payload.exit + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Exit) + err := b.DecodeMessage(msg) + m.Payload = &Message_Exit{msg} + return true, err + default: + return false, nil + } +} + +func _Message_OneofSizer(msg proto.Message) (n int) { + m := msg.(*Message) + // payload + switch x := m.Payload.(type) { + case *Message_Command: + s := proto.Size(x.Command) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Message_Input: + s := proto.Size(x.Input) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Message_Output: + s := proto.Size(x.Output) + n += proto.SizeVarint(4<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Message_Exit: + s := proto.Size(x.Exit) + n += proto.SizeVarint(5<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type Command struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + Args []string `protobuf:"bytes,2,rep,name=args" json:"args,omitempty"` + Environ []string `protobuf:"bytes,3,rep,name=environ" json:"environ,omitempty"` +} + +func (m *Command) Reset() { *m = Command{} } +func (m *Command) String() string { return proto.CompactTextString(m) } +func (*Command) ProtoMessage() {} +func (*Command) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *Command) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Command) GetArgs() []string { + if m != nil { + return m.Args + } + return nil +} + +func (m *Command) GetEnviron() []string { + if m != nil { + return m.Environ + } + return nil +} + +type Output struct { + Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` +} + +func (m *Output) Reset() { *m = Output{} } +func (m *Output) String() string { return proto.CompactTextString(m) } +func (*Output) ProtoMessage() {} +func (*Output) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *Output) GetOutput() []byte { + if m != nil { + return m.Output + } + return nil +} + +type Input struct { + Stdin []byte `protobuf:"bytes,1,opt,name=stdin,proto3" json:"stdin,omitempty"` +} + +func (m *Input) Reset() { *m = Input{} } +func (m *Input) String() string { return proto.CompactTextString(m) } +func (*Input) ProtoMessage() {} +func (*Input) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *Input) GetStdin() []byte { + if m != nil { + return m.Stdin + } + return nil +} + +type Exit struct { + ExitStatus int32 `protobuf:"varint,1,opt,name=exit_status,json=exitStatus" json:"exit_status,omitempty"` +} + +func (m *Exit) Reset() { *m = Exit{} } +func (m *Exit) String() string { return proto.CompactTextString(m) } +func (*Exit) ProtoMessage() {} +func (*Exit) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *Exit) GetExitStatus() int32 { + if m != nil { + return m.ExitStatus + } + return 0 +} + +func init() { + proto.RegisterType((*Message)(nil), "messaging.Message") + proto.RegisterType((*Command)(nil), "messaging.Command") + proto.RegisterType((*Output)(nil), "messaging.Output") + proto.RegisterType((*Input)(nil), "messaging.Input") + proto.RegisterType((*Exit)(nil), "messaging.Exit") +} + +func init() { proto.RegisterFile("messaging/messages.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 280 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x4c, 0x91, 0xdf, 0x4a, 0xb5, 0x40, + 0x14, 0xc5, 0xf5, 0xf8, 0x0f, 0xf7, 0xf9, 0xe0, 0xab, 0x4d, 0xc4, 0xdc, 0x44, 0x22, 0x44, 0x42, + 0x60, 0x50, 0x6f, 0x50, 0x04, 0x46, 0x44, 0x60, 0x0f, 0x10, 0x53, 0x8a, 0x0c, 0xe4, 0x8c, 0x38, + 0x63, 0x9c, 0xf3, 0xa2, 0x3d, 0x4f, 0xec, 0x19, 0x15, 0xef, 0xd6, 0x5e, 0xeb, 0xa7, 0xac, 0xc5, + 0x00, 0xeb, 0x5b, 0xad, 0x79, 0x27, 0x64, 0x77, 0xeb, 0x54, 0xab, 0xcb, 0x61, 0x54, 0x46, 0x61, + 0xba, 0x26, 0xf9, 0xaf, 0x0f, 0xc9, 0xab, 0x4b, 0x11, 0x21, 0x34, 0xc7, 0xa1, 0x65, 0x7e, 0xe6, + 0x17, 0x69, 0x6d, 0x35, 0x96, 0x90, 0x7c, 0xa9, 0xbe, 0xe7, 0xb2, 0x61, 0xbb, 0xcc, 0x2f, 0xf6, + 0x77, 0x58, 0xae, 0x1f, 0x97, 0x8f, 0x2e, 0xa9, 0xbc, 0x7a, 0x81, 0xb0, 0x80, 0x48, 0xc8, 0x61, + 0x32, 0x2c, 0xb0, 0xf4, 0xc9, 0x86, 0x7e, 0x26, 0xbf, 0xf2, 0x6a, 0x07, 0xe0, 0x0d, 0xc4, 0x6a, + 0x32, 0x84, 0x86, 0x16, 0x3d, 0xdd, 0xa0, 0x6f, 0x36, 0xa8, 0xbc, 0x7a, 0x46, 0xf0, 0x0a, 0xc2, + 0xf6, 0x20, 0x0c, 0x8b, 0x2c, 0xfa, 0x7f, 0x83, 0x3e, 0x1d, 0x04, 0x81, 0x36, 0x7e, 0x48, 0x21, + 0x19, 0xf8, 0xf1, 0x5b, 0xf1, 0x26, 0x7f, 0x81, 0x64, 0xae, 0x47, 0xbb, 0x24, 0xef, 0xd7, 0x5d, + 0xa4, 0xc9, 0xe3, 0x63, 0xa7, 0xd9, 0x2e, 0x0b, 0xc8, 0x23, 0x8d, 0x0c, 0x92, 0x56, 0xfe, 0x88, + 0x51, 0x49, 0x16, 0x58, 0x7b, 0x39, 0xf3, 0x0c, 0x62, 0x57, 0x09, 0xcf, 0xd7, 0xd6, 0xf4, 0xb7, + 0x7f, 0x4b, 0xc1, 0xfc, 0x02, 0x22, 0xbb, 0x0f, 0xcf, 0x20, 0xd2, 0xa6, 0x11, 0x72, 0xce, 0xdd, + 0x91, 0x5f, 0x43, 0x48, 0x45, 0xf1, 0x12, 0xf6, 0x54, 0xf4, 0x43, 0x1b, 0x6e, 0x26, 0x6d, 0x99, + 0xa8, 0x06, 0xb2, 0xde, 0xad, 0xf3, 0x19, 0xdb, 0x17, 0xba, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, + 0x10, 0x99, 0x01, 0x7a, 0xbd, 0x01, 0x00, 0x00, +} diff --git a/messaging/messages.proto b/messaging/messages.proto new file mode 100644 index 000000000..fea82625d --- /dev/null +++ b/messaging/messages.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package messaging; + +message Message { + string type = 1; + oneof payload { + Command command = 2; + Input input = 3; + Output output = 4; + Exit exit = 5; + } +} + +message Command { + string name = 1; + repeated string args = 2; + repeated string environ = 3; +} + +message Output { + bytes output = 1; +} + +message Input { + bytes stdin = 1; +} + +message Exit { + int32 exit_status = 1; +} diff --git a/messaging/messaging.go b/messaging/messaging.go new file mode 100644 index 000000000..c40521232 --- /dev/null +++ b/messaging/messaging.go @@ -0,0 +1,67 @@ +package messaging + +import ( + "log" + + proto "github.com/golang/protobuf/proto" +) + +func ParseMessage(rawMsg []byte) (*Message, error) { + msg := &Message{} + + err := proto.Unmarshal(rawMsg, msg) + return msg, err +} + +func NewCommandMessage(environ []string, name string, args ...string) []byte { + msg := &Message{ + Type: "command", + Payload: &Message_Command{ + &Command{name, args, environ}, + }, + } + + return marshal(msg) +} + +func NewInputMessage(input []byte) []byte { + msg := &Message{ + Type: "stdin", + Payload: &Message_Input{ + &Input{input}, + }, + } + + return marshal(msg) +} + +func NewOutputMessage(outputType string, output []byte) []byte { + msg := &Message{ + Type: outputType, + Payload: &Message_Output{ + &Output{output}, + }, + } + + return marshal(msg) +} + +func NewExitMessage(exitStatus int32) []byte { + msg := &Message{ + Type: "exit", + Payload: &Message_Exit{ + &Exit{exitStatus}, + }, + } + + return marshal(msg) +} + +func marshal(msg *Message) []byte { + buffer, err := proto.Marshal(msg) + if err != nil { + log.Fatalln("Failed marshalling a Protobuf message") + } + + return buffer +} diff --git a/server/command_executor.go b/server/command_executor.go new file mode 100644 index 000000000..284c74644 --- /dev/null +++ b/server/command_executor.go @@ -0,0 +1,123 @@ +package server + +import ( + "bytes" + "io" + "log" + "os/exec" + "syscall" + + "gitlab.com/gitlab-org/git-access-daemon/messaging" +) + +func CommandExecutor(chans *commChans) { + rawMsg, ok := <-chans.inChan + if !ok { + return + } + + msg, err := messaging.ParseMessage(rawMsg) + if err != nil { + return + } + if msg.Type != "command" { + return + } + + runCommand(chans, msg.GetCommand()) +} + +func runCommand(chans *commChans, commandMsg *messaging.Command) { + name := commandMsg.Name + args := commandMsg.Args + + log.Println("Executing command:", name, "with args", args) + + stdinReader, stdinWriter := io.Pipe() + stdoutReader, stdoutWriter := io.Pipe() + stderrReader, stderrWriter := io.Pipe() + + go streamOut("stdout", stdoutReader, chans) + go streamOut("stderr", stderrReader, chans) + go streamIn(stdinWriter, chans) + + cmd := exec.Command(name, args...) + + // Start the command in its own process group (nice for signalling) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Env = commandMsg.Environ + cmd.Stdin = stdinReader + cmd.Stdout = stdoutWriter + cmd.Stderr = stderrWriter + + err := cmd.Run() + + if err != nil { + exitStatus := int32(extractExitStatusFromError(err.(*exec.ExitError))) + chans.outChan <- messaging.NewExitMessage(exitStatus) + return + } + + chans.outChan <- messaging.NewExitMessage(0) +} + +func extractExitStatusFromError(err *exec.ExitError) int { + processState := err.ProcessState + status := processState.Sys().(syscall.WaitStatus) + + if status.Exited() { + return status.ExitStatus() + } + + return 255 +} + +func streamOut(streamName string, streamPipe io.Reader, chans *commChans) { + // TODO: Move buffer out of the loop and use defer instead of finished + finished := false + + for { + buffer := make([]byte, bytes.MinRead) + + n, err := streamPipe.Read(buffer) + if err == io.EOF { + finished = true + } + + if n < bytes.MinRead { + buffer = buffer[:n] + } + + chans.outChan <- messaging.NewOutputMessage(streamName, buffer) + + if finished { + return + } + } +} + +func streamIn(streamPipe *io.PipeWriter, chans *commChans) { + defer streamPipe.Close() + + for { + rawMsg, ok := <-chans.inChan + if !ok { + return + } + + msg, err := messaging.ParseMessage(rawMsg) + if msg.Type != "stdin" { + continue + } + + stdin := msg.GetInput().Stdin + if len(stdin) == 0 { + return + } + + _, err = streamPipe.Write(stdin) + if err != nil { + return + } + } +} diff --git a/server/command_executor_test.go b/server/command_executor_test.go new file mode 100644 index 000000000..4fb1fd0cf --- /dev/null +++ b/server/command_executor_test.go @@ -0,0 +1,55 @@ +package server + +import ( + "testing" + + "gitlab.com/gitlab-org/git-access-daemon/messaging" +) + +func TestRunningCommandSuccessfully(t *testing.T) { + chans := newCommChans() + defer chans.Close() + + go CommandExecutor(chans) + + chans.inChan <- messaging.NewCommandMessage([]string{}, "ls", "-hal") + chans.inChan <- messaging.NewInputMessage([]byte{}) + + for { + rawMsg := <-chans.outChan + msg, _ := messaging.ParseMessage(rawMsg) + + switch msg.Type { + case "exit": + exit_status := msg.GetExit().ExitStatus + if exit_status != 0 { + t.Error("Expected exit status of 0, found %v", exit_status) + } + return + } + } +} + +func TestRunningCommandUnsuccessfully(t *testing.T) { + chans := newCommChans() + defer chans.Close() + + go CommandExecutor(chans) + + chans.inChan <- messaging.NewCommandMessage([]string{}, "ls", "/file-that-does-not-exist") + chans.inChan <- messaging.NewInputMessage([]byte{}) + + for { + rawMsg := <-chans.outChan + msg, _ := messaging.ParseMessage(rawMsg) + + switch msg.Type { + case "exit": + exit_status := msg.GetExit().ExitStatus + if exit_status == 0 { + t.Error("Expected a failure exit status, got 0") + } + return + } + } +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 000000000..83b88e67f --- /dev/null +++ b/server/server.go @@ -0,0 +1,118 @@ +package server + +import ( + "io" + "log" + "net" + "sync" + "time" + + "gitlab.com/gitlab-org/git-access-daemon/messaging" +) + +type Server struct { + ch chan bool + waitGroup *sync.WaitGroup +} + +func NewServer() *Server { + server := &Server{ + ch: make(chan bool), + waitGroup: &sync.WaitGroup{}, + } + server.waitGroup.Add(1) + return server +} + +func (s *Server) Serve(address string, service Service) { + listener, err := newListener(address) + if err != nil { + log.Fatalln(err) + } + defer s.waitGroup.Done() + log.Println("Listening on address", address) + for { + select { + case <-s.ch: + log.Println("Received shutdown message, stopping server on", listener.Addr()) + listener.Close() + return + default: + } + listener.SetDeadline(time.Now().Add(1e9)) + conn, err := listener.AcceptTCP() + if err != nil { + if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { + continue + } + log.Println(err) + } + log.Println("Client connected from ", conn.RemoteAddr()) + s.waitGroup.Add(1) + + chans := newCommChans() + go service(chans) + go s.serve(conn, chans) + } +} + +func newListener(address string) (*net.TCPListener, error) { + tcpAddress, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return &net.TCPListener{}, err + } + return net.ListenTCP("tcp", tcpAddress) +} + +func (s *Server) Stop() { + close(s.ch) + s.waitGroup.Wait() +} + +func (s *Server) serve(conn *net.TCPConn, chans *commChans) { + defer conn.Close() + defer s.waitGroup.Done() + + messagesConn := messaging.NewMessagesConn(conn) + + go func() { + for { + ret, ok := <-chans.outChan + if !ok { + return + } + + if _, err := messagesConn.Write(ret); nil != err { + log.Println(err) + return + } + } + }() + + for { + select { + case <-s.ch: + log.Println("Received shutdown message, disconnecting client from", conn.RemoteAddr()) + return + default: + } + + conn.SetDeadline(time.Now().Add(1e9)) + + buffer, err := messagesConn.Read() + if err != nil { + if err == io.EOF { + log.Println("Client", conn.RemoteAddr(), "closed the connection") + return + } + if opError, ok := err.(*net.OpError); ok && opError.Timeout() { + continue + } + log.Println(err) + } + + chans.inChan <- buffer + } + + chans.Close() +} diff --git a/server/server_test.go b/server/server_test.go new file mode 100644 index 000000000..81bb318e8 --- /dev/null +++ b/server/server_test.go @@ -0,0 +1,42 @@ +package server + +import ( + "fmt" + "net" + "testing" + "time" + + "gitlab.com/gitlab-org/git-access-daemon/messaging" +) + +func TestServerStandingUp(t *testing.T) { + server := NewServer() + address := "127.0.0.1:6666" + + go server.Serve(address, func(chans *commChans) { + a := (<-chans.inChan) + chans.outChan <- a + }) + defer server.Stop() + + // Give server a little time to start listening for connections + time.Sleep(10 * time.Millisecond) + + conn, err := net.Dial("tcp", address) + if err != nil { + t.Fatal(err) + } + messagesConn := messaging.NewMessagesConn(conn) + + if _, err := messagesConn.Write([]byte("hola hola!")); err != nil { + t.Error(err) + } + + buffer, err := messagesConn.Read() + if err != nil { + t.Error(err) + } + messagesConn.Close() + + fmt.Println("Received from server:", string(buffer)) +} diff --git a/server/service.go b/server/service.go new file mode 100644 index 000000000..1dea68435 --- /dev/null +++ b/server/service.go @@ -0,0 +1,20 @@ +package server + +type commChans struct { + inChan chan []byte + outChan chan []byte +} + +type Service func(*commChans) + +func newCommChans() *commChans { + return &commChans{ + inChan: make(chan []byte), + outChan: make(chan []byte), + } +} + +func (chans *commChans) Close() { + close(chans.inChan) + close(chans.outChan) +} diff --git a/vendor/vendor.json b/vendor/vendor.json new file mode 100644 index 000000000..19ecc88d2 --- /dev/null +++ b/vendor/vendor.json @@ -0,0 +1,13 @@ +{ + "comment": "", + "ignore": "test", + "package": [ + { + "checksumSHA1": "kBeNcaKk56FguvPSUCEaH6AxpRc=", + "path": "github.com/golang/protobuf/proto", + "revision": "8ee79997227bf9b34611aee7946ae64735e6fd93", + "revisionTime": "2016-11-17T03:31:26Z" + } + ], + "rootPath": "gitlab.com/gitlab-org/git-access-daemon" +} |