Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPablo Carranza <pcarranza@gmail.com>2016-12-16 20:21:44 +0300
committerPablo Carranza <pcarranza@gmail.com>2016-12-16 20:21:44 +0300
commit318b174c3acb67fdeedc4b31dd7bc7736c092f7e (patch)
treecac8070cd2ed24d14cb4bd724c66339a992f878d
parent29682e9670345e6b6a96e162f8ed10891ddf69ef (diff)
parent4e0be501d15e79fcef798ea1e384c743bd54fa14 (diff)
Merge branch 'feature/restructure-server-client-around-protobuf' into 'initial-server'
Restructure client/server communication around Protobuf-serialized messages Part of #7 See merge request !10
-rw-r--r--.gitignore1
-rw-r--r--Makefile15
-rw-r--r--client/client.go87
-rw-r--r--client/client_test.go70
-rw-r--r--cmd/client/main.go10
-rw-r--r--cmd/server/main.go8
-rw-r--r--messaging/conn.go67
-rw-r--r--messaging/messages.pb.go336
-rw-r--r--messaging/messages.proto31
-rw-r--r--messaging/messaging.go67
-rw-r--r--server/command_executor.go133
-rw-r--r--server/command_executor_test.go118
-rw-r--r--server/server.go53
-rw-r--r--server/server_test.go25
-rw-r--r--server/service.go20
-rw-r--r--vendor/vendor.json13
16 files changed, 818 insertions, 236 deletions
diff --git a/.gitignore b/.gitignore
index 88e3b36c4..c1c3d9944 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
client/testdata/data
_build
+vendor/*/
git-daemon-client
git-daemon-server
diff --git a/Makefile b/Makefile
index ba461f596..bf61ec5da 100644
--- a/Makefile
+++ b/Makefile
@@ -4,21 +4,26 @@ CLIENT_BIN=git-daemon-client
SERVER_BIN=git-daemon-server
export GOPATH=${BUILD_DIR}/_build
+export PATH:=${GOPATH}/bin:$(PATH)
.PHONY: ${BUILD_DIR}/_build
all: test build
-build:
- go build -o ${SERVER_BIN} cmd/server/main.go
- go build -o ${CLIENT_BIN} cmd/client/main.go
-
${BUILD_DIR}/_build:
mkdir -p $@/src/${PKG}
tar -cf - --exclude _build --exclude .git . | (cd $@/src/${PKG} && tar -xf -)
touch $@
-test: ${BUILD_DIR}/_build
+deps: ${BUILD_DIR}/_build
+ (which govendor) || go get -u github.com/kardianos/govendor
+ cd ${BUILD_DIR}/_build/src/${PKG} && govendor fetch +out
+
+build: deps
+ go build -o ${SERVER_BIN} cmd/server/main.go
+ go build -o ${CLIENT_BIN} cmd/client/main.go
+
+test: ${BUILD_DIR}/_build deps
cd ${BUILD_DIR}/_build/src/${PKG}/server && go test -v
cd ${BUILD_DIR}/_build/src/${PKG}/client && go test -v
diff --git a/client/client.go b/client/client.go
index fc943240f..246771573 100644
--- a/client/client.go
+++ b/client/client.go
@@ -2,65 +2,90 @@ package client
import (
"bufio"
- "encoding/json"
+ "bytes"
+ "io"
"log"
"net"
+ "os"
- "gitlab.com/gitlab-org/git-access-daemon/server"
+ "gitlab.com/gitlab-org/git-access-daemon/messaging"
)
type Client struct {
- conn net.Conn
+ messagesConn *messaging.MessagesConn
}
-func NewClient(serviceAddress string) *Client {
- conn, err := net.Dial("tcp", serviceAddress)
+func NewClient(serverAddress string) *Client {
+ conn, err := net.Dial("tcp", serverAddress)
if err != nil {
log.Fatalln(err)
}
- return &Client{conn}
+ messagesConn := messaging.NewMessagesConn(conn)
+ return &Client{messagesConn}
}
-func (client *Client) Request(cmd []string) server.CmdResponse {
- _, err := client.conn.Write(append(makeRequest(cmd), "\n"...))
+func (client *Client) Run(cmd []string) int {
+ rawMsg := messaging.NewCommandMessage(os.Environ(), cmd[0], cmd[1:]...)
+
+ _, err := client.messagesConn.Write(rawMsg)
if err != nil {
log.Fatalln(err)
}
- reader := bufio.NewReader(client.conn)
- buffer, err := reader.ReadBytes('\n')
- if err != nil {
- log.Fatalln(err)
+ go streamStdinToServer(client)
+
+ for {
+ rawMsg, err := client.messagesConn.Read()
+
+ if err != nil {
+ break
+ }
+
+ msg, err := messaging.ParseMessage(rawMsg)
+ if err != nil {
+ break
+ }
+
+ switch msg.Type {
+ case "stdout":
+ os.Stdout.Write(msg.GetOutput().Output)
+ case "stderr":
+ os.Stderr.Write(msg.GetOutput().Output)
+ case "exit":
+ return int(msg.GetExit().ExitStatus)
+ }
}
- return parseResponse(buffer)
+ return 255
}
func (client *Client) Close() {
- client.conn.Close()
+ client.messagesConn.Close()
}
-func makeRequest(cmd []string) []byte {
- req := server.CmdRequest{
- Cmd: cmd,
- }
+func streamStdinToServer(client *Client) {
+ finished := false
+ reader := bufio.NewReader(os.Stdin)
- buf, err := json.Marshal(&req)
- if err != nil {
- log.Fatalln("Failed marshalling a JSON request")
- }
+ for {
+ buffer := make([]byte, bytes.MinRead)
- return buf
-}
+ n, err := reader.Read(buffer)
-func parseResponse(rawResponse []byte) server.CmdResponse {
- res := server.CmdResponse{}
+ if err == io.EOF {
+ finished = true
+ }
- err := json.Unmarshal(rawResponse, &res)
- if err != nil {
- log.Fatalln("Failed parsing response")
- }
+ if n < bytes.MinRead {
+ buffer = buffer[:n]
+ }
+
+ rawMsg := messaging.NewInputMessage(buffer)
+ client.messagesConn.Write(rawMsg)
- return res
+ if finished {
+ return
+ }
+ }
}
diff --git a/client/client_test.go b/client/client_test.go
index 6bba82478..3b446a926 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -1,21 +1,24 @@
package client
import (
+ "bytes"
"log"
"os"
"os/exec"
"path"
- "strings"
"testing"
"time"
- "gitlab.com/gitlab-org/git-access-daemon/server"
+ serv "gitlab.com/gitlab-org/git-access-daemon/server"
)
-const serviceAddress = "127.0.0.1:6667"
+const serverAddress = "127.0.0.1:6667"
const testRepo = "group/test.git"
const testRepoRoot = "testdata/data"
+var origStdout = os.Stdout
+var origStderr = os.Stderr
+
func TestMain(m *testing.M) {
source := "https://gitlab.com/gitlab-org/gitlab-test.git"
clonePath := path.Join(testRepoRoot, testRepo)
@@ -28,20 +31,21 @@ func TestMain(m *testing.M) {
os.Exit(-1)
}
}
- service := server.NewService()
+ server := serv.NewServer()
- go service.Serve(serviceAddress, server.CommandExecutorCallback)
- defer service.Stop()
+ go server.Serve(serverAddress, serv.CommandExecutor)
+ defer server.Stop()
time.Sleep(10 * time.Millisecond)
os.Exit(m.Run())
}
func TestRunningGitCommandSuccessfully(t *testing.T) {
- client := NewClient(serviceAddress)
+ client := NewClient(serverAddress)
defer client.Close()
- res := client.Request([]string{
+ stdout, _ := redirectOutputStreams()
+ exitStatus := client.Run([]string{
"git",
"--git-dir",
path.Join(testRepoRoot, testRepo),
@@ -49,23 +53,27 @@ func TestRunningGitCommandSuccessfully(t *testing.T) {
"--count",
"b83d6e391c",
})
+ restoreOutputStreams()
- exit_status := 0
- if res.ExitStatus != exit_status {
- t.Fatalf("Expected response exit status to equal %d, got %d", exit_status, res.ExitStatus)
+ expectedExitStatus := 0
+ if exitStatus != expectedExitStatus {
+ t.Fatalf("Expected response exit status to equal %d, got %d", expectedExitStatus, exitStatus)
}
- msg := "37\n"
- if res.Message != msg {
- t.Fatalf("Expected response stdout to be \"%s\", got \"%s\"", msg, res.Message)
+ expectedStdout := []byte("37\n")
+ gotStdout := make([]byte, len(expectedStdout))
+ stdout.Read(gotStdout)
+ if !bytes.Equal(gotStdout, expectedStdout) {
+ t.Fatalf("Expected response stdout to be \"%s\", got \"%s\"", expectedStdout, gotStdout)
}
}
func TestRunningGitCommandUnsuccessfully(t *testing.T) {
- client := NewClient(serviceAddress)
+ client := NewClient(serverAddress)
defer client.Close()
- res := client.Request([]string{
+ _, stderr := redirectOutputStreams()
+ exitStatus := client.Run([]string{
"git",
"--git-dir",
path.Join(testRepoRoot, testRepo),
@@ -73,14 +81,32 @@ func TestRunningGitCommandUnsuccessfully(t *testing.T) {
"--count",
"babecafe",
})
+ restoreOutputStreams()
- exit_status := 128
- if res.ExitStatus != exit_status {
- t.Fatalf("Expected response exit status to equal %d, got %d", exit_status, res.ExitStatus)
+ expectedExitStatus := 128
+ if exitStatus != expectedExitStatus {
+ t.Fatalf("Expected response exit status to equal %d, got %d", expectedExitStatus, exitStatus)
}
- msg := "fatal: ambiguous argument 'babecafe': unknown revision or path not in the working tree."
- if !strings.Contains(res.Message, msg) {
- t.Fatalf("Expected stderr to contain \"%s\", found none in \"%s\"", msg, res.Message)
+ expectedStderr := []byte("fatal: ambiguous argument 'babecafe': unknown revision or path not in the working tree.")
+ gotStderr := make([]byte, len(expectedStderr))
+ stderr.Read(gotStderr)
+ if !bytes.Contains(gotStderr, expectedStderr) {
+ t.Fatalf("Expected stderr to contain \"%s\", found none in \"%s\"", expectedStderr, gotStderr)
}
}
+
+func redirectOutputStreams() (*os.File, *os.File) {
+ stdoutReader, stdoutWriter, _ := os.Pipe()
+ stderrReader, stderrWriter, _ := os.Pipe()
+
+ os.Stdout = stdoutWriter
+ os.Stderr = stderrWriter
+
+ return stdoutReader, stderrReader
+}
+
+func restoreOutputStreams() {
+ os.Stdout = origStdout
+ os.Stderr = origStderr
+}
diff --git a/cmd/client/main.go b/cmd/client/main.go
index a0d645c6a..b00413fe6 100644
--- a/cmd/client/main.go
+++ b/cmd/client/main.go
@@ -11,13 +11,7 @@ func main() {
defer client.Close()
os.Args[0] = "git"
- res := client.Request(os.Args)
- if res.ExitStatus == 0 {
- os.Stdout.Write([]byte(res.Message))
- } else {
- os.Stderr.Write([]byte(res.Message))
- }
-
- os.Exit(res.ExitStatus)
+ exitStatus := client.Run(os.Args)
+ os.Exit(exitStatus)
}
diff --git a/cmd/server/main.go b/cmd/server/main.go
index 83aa5b558..e37665464 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -5,20 +5,20 @@ import (
"os/signal"
"syscall"
- "gitlab.com/gitlab-org/git-access-daemon/server"
+ serv "gitlab.com/gitlab-org/git-access-daemon/server"
)
func main() {
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
- service := server.NewService()
+ server := serv.NewServer()
go func() {
- service.Serve("0.0.0.0:6666", server.CommandExecutorCallback)
+ server.Serve("0.0.0.0:6666", serv.CommandExecutor)
}()
select {
case <-ch:
- service.Stop()
+ server.Stop()
}
}
diff --git a/messaging/conn.go b/messaging/conn.go
new file mode 100644
index 000000000..16415a075
--- /dev/null
+++ b/messaging/conn.go
@@ -0,0 +1,67 @@
+package messaging
+
+import (
+ "bufio"
+ "net"
+ "strconv"
+ "strings"
+)
+
+type MessagesConn struct {
+ Conn net.Conn
+ readWriter *bufio.ReadWriter
+}
+
+func NewMessagesConn(conn net.Conn) *MessagesConn {
+ reader := bufio.NewReader(conn)
+ writer := bufio.NewWriter(conn)
+ readWriter := bufio.NewReadWriter(reader, writer)
+ messagesConn := &MessagesConn{conn, readWriter}
+
+ return messagesConn
+}
+
+func (conn *MessagesConn) Read() ([]byte, error) {
+ lenStr, err := conn.readWriter.ReadString('$')
+ if err != nil {
+ return nil, err
+ }
+
+ lenStr = strings.TrimRight(lenStr, "$")
+ payloadLen, err := strconv.Atoi(lenStr)
+ if err != nil {
+ return nil, err
+ }
+
+ payload := make([]byte, 0)
+ for {
+ buffer := make([]byte, payloadLen-len(payload))
+ n, err := conn.readWriter.Read(buffer)
+ if err != nil {
+ return nil, err
+ }
+
+ payload = append(payload, buffer[:n]...)
+
+ if len(payload) == payloadLen {
+ break
+ }
+ }
+
+ return payload, nil
+}
+
+func (conn *MessagesConn) Write(buffer []byte) (int, error) {
+ newBuffer := make([]byte, 0)
+ newBuffer = strconv.AppendInt(newBuffer, int64(len(buffer)), 10)
+ newBuffer = append(newBuffer, "$"...)
+ newBuffer = append(newBuffer, buffer...)
+
+ n, err := conn.readWriter.Write(newBuffer)
+ conn.readWriter.Flush()
+ return n, err
+}
+
+func (conn *MessagesConn) Close() {
+ conn.Conn.Close()
+}
diff --git a/messaging/messages.pb.go b/messaging/messages.pb.go
new file mode 100644
index 000000000..ac5f24a39
--- /dev/null
+++ b/messaging/messages.pb.go
@@ -0,0 +1,336 @@
+// Code generated by protoc-gen-go.
+// source: messaging/messages.proto
+// DO NOT EDIT!
+
+/*
+Package messaging is a generated protocol buffer package.
+
+It is generated from these files:
+ messaging/messages.proto
+
+It has these top-level messages:
+ Message
+ Command
+ Output
+ Input
+ Exit
+*/
+package messaging
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Message struct {
+ Type string `protobuf:"bytes,1,opt,name=type" json:"type,omitempty"`
+ // Types that are valid to be assigned to Payload:
+ // *Message_Command
+ // *Message_Input
+ // *Message_Output
+ // *Message_Exit
+ Payload isMessage_Payload `protobuf_oneof:"payload"`
+}
+
+func (m *Message) Reset() { *m = Message{} }
+func (m *Message) String() string { return proto.CompactTextString(m) }
+func (*Message) ProtoMessage() {}
+func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+type isMessage_Payload interface {
+ isMessage_Payload()
+}
+
+type Message_Command struct {
+ Command *Command `protobuf:"bytes,2,opt,name=command,oneof"`
+}
+type Message_Input struct {
+ Input *Input `protobuf:"bytes,3,opt,name=input,oneof"`
+}
+type Message_Output struct {
+ Output *Output `protobuf:"bytes,4,opt,name=output,oneof"`
+}
+type Message_Exit struct {
+ Exit *Exit `protobuf:"bytes,5,opt,name=exit,oneof"`
+}
+
+func (*Message_Command) isMessage_Payload() {}
+func (*Message_Input) isMessage_Payload() {}
+func (*Message_Output) isMessage_Payload() {}
+func (*Message_Exit) isMessage_Payload() {}
+
+func (m *Message) GetPayload() isMessage_Payload {
+ if m != nil {
+ return m.Payload
+ }
+ return nil
+}
+
+func (m *Message) GetType() string {
+ if m != nil {
+ return m.Type
+ }
+ return ""
+}
+
+func (m *Message) GetCommand() *Command {
+ if x, ok := m.GetPayload().(*Message_Command); ok {
+ return x.Command
+ }
+ return nil
+}
+
+func (m *Message) GetInput() *Input {
+ if x, ok := m.GetPayload().(*Message_Input); ok {
+ return x.Input
+ }
+ return nil
+}
+
+func (m *Message) GetOutput() *Output {
+ if x, ok := m.GetPayload().(*Message_Output); ok {
+ return x.Output
+ }
+ return nil
+}
+
+func (m *Message) GetExit() *Exit {
+ if x, ok := m.GetPayload().(*Message_Exit); ok {
+ return x.Exit
+ }
+ return nil
+}
+
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*Message) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
+ return _Message_OneofMarshaler, _Message_OneofUnmarshaler, _Message_OneofSizer, []interface{}{
+ (*Message_Command)(nil),
+ (*Message_Input)(nil),
+ (*Message_Output)(nil),
+ (*Message_Exit)(nil),
+ }
+}
+
+func _Message_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+ m := msg.(*Message)
+ // payload
+ switch x := m.Payload.(type) {
+ case *Message_Command:
+ b.EncodeVarint(2<<3 | proto.WireBytes)
+ if err := b.EncodeMessage(x.Command); err != nil {
+ return err
+ }
+ case *Message_Input:
+ b.EncodeVarint(3<<3 | proto.WireBytes)
+ if err := b.EncodeMessage(x.Input); err != nil {
+ return err
+ }
+ case *Message_Output:
+ b.EncodeVarint(4<<3 | proto.WireBytes)
+ if err := b.EncodeMessage(x.Output); err != nil {
+ return err
+ }
+ case *Message_Exit:
+ b.EncodeVarint(5<<3 | proto.WireBytes)
+ if err := b.EncodeMessage(x.Exit); err != nil {
+ return err
+ }
+ case nil:
+ default:
+ return fmt.Errorf("Message.Payload has unexpected type %T", x)
+ }
+ return nil
+}
+
+func _Message_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
+ m := msg.(*Message)
+ switch tag {
+ case 2: // payload.command
+ if wire != proto.WireBytes {
+ return true, proto.ErrInternalBadWireType
+ }
+ msg := new(Command)
+ err := b.DecodeMessage(msg)
+ m.Payload = &Message_Command{msg}
+ return true, err
+ case 3: // payload.input
+ if wire != proto.WireBytes {
+ return true, proto.ErrInternalBadWireType
+ }
+ msg := new(Input)
+ err := b.DecodeMessage(msg)
+ m.Payload = &Message_Input{msg}
+ return true, err
+ case 4: // payload.output
+ if wire != proto.WireBytes {
+ return true, proto.ErrInternalBadWireType
+ }
+ msg := new(Output)
+ err := b.DecodeMessage(msg)
+ m.Payload = &Message_Output{msg}
+ return true, err
+ case 5: // payload.exit
+ if wire != proto.WireBytes {
+ return true, proto.ErrInternalBadWireType
+ }
+ msg := new(Exit)
+ err := b.DecodeMessage(msg)
+ m.Payload = &Message_Exit{msg}
+ return true, err
+ default:
+ return false, nil
+ }
+}
+
+func _Message_OneofSizer(msg proto.Message) (n int) {
+ m := msg.(*Message)
+ // payload
+ switch x := m.Payload.(type) {
+ case *Message_Command:
+ s := proto.Size(x.Command)
+ n += proto.SizeVarint(2<<3 | proto.WireBytes)
+ n += proto.SizeVarint(uint64(s))
+ n += s
+ case *Message_Input:
+ s := proto.Size(x.Input)
+ n += proto.SizeVarint(3<<3 | proto.WireBytes)
+ n += proto.SizeVarint(uint64(s))
+ n += s
+ case *Message_Output:
+ s := proto.Size(x.Output)
+ n += proto.SizeVarint(4<<3 | proto.WireBytes)
+ n += proto.SizeVarint(uint64(s))
+ n += s
+ case *Message_Exit:
+ s := proto.Size(x.Exit)
+ n += proto.SizeVarint(5<<3 | proto.WireBytes)
+ n += proto.SizeVarint(uint64(s))
+ n += s
+ case nil:
+ default:
+ panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+ }
+ return n
+}
+
+type Command struct {
+ Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
+ Args []string `protobuf:"bytes,2,rep,name=args" json:"args,omitempty"`
+ Environ []string `protobuf:"bytes,3,rep,name=environ" json:"environ,omitempty"`
+}
+
+func (m *Command) Reset() { *m = Command{} }
+func (m *Command) String() string { return proto.CompactTextString(m) }
+func (*Command) ProtoMessage() {}
+func (*Command) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func (m *Command) GetName() string {
+ if m != nil {
+ return m.Name
+ }
+ return ""
+}
+
+func (m *Command) GetArgs() []string {
+ if m != nil {
+ return m.Args
+ }
+ return nil
+}
+
+func (m *Command) GetEnviron() []string {
+ if m != nil {
+ return m.Environ
+ }
+ return nil
+}
+
+type Output struct {
+ Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"`
+}
+
+func (m *Output) Reset() { *m = Output{} }
+func (m *Output) String() string { return proto.CompactTextString(m) }
+func (*Output) ProtoMessage() {}
+func (*Output) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+
+func (m *Output) GetOutput() []byte {
+ if m != nil {
+ return m.Output
+ }
+ return nil
+}
+
+type Input struct {
+ Stdin []byte `protobuf:"bytes,1,opt,name=stdin,proto3" json:"stdin,omitempty"`
+}
+
+func (m *Input) Reset() { *m = Input{} }
+func (m *Input) String() string { return proto.CompactTextString(m) }
+func (*Input) ProtoMessage() {}
+func (*Input) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
+
+func (m *Input) GetStdin() []byte {
+ if m != nil {
+ return m.Stdin
+ }
+ return nil
+}
+
+type Exit struct {
+ ExitStatus int32 `protobuf:"varint,1,opt,name=exit_status,json=exitStatus" json:"exit_status,omitempty"`
+}
+
+func (m *Exit) Reset() { *m = Exit{} }
+func (m *Exit) String() string { return proto.CompactTextString(m) }
+func (*Exit) ProtoMessage() {}
+func (*Exit) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
+
+func (m *Exit) GetExitStatus() int32 {
+ if m != nil {
+ return m.ExitStatus
+ }
+ return 0
+}
+
+func init() {
+ proto.RegisterType((*Message)(nil), "messaging.Message")
+ proto.RegisterType((*Command)(nil), "messaging.Command")
+ proto.RegisterType((*Output)(nil), "messaging.Output")
+ proto.RegisterType((*Input)(nil), "messaging.Input")
+ proto.RegisterType((*Exit)(nil), "messaging.Exit")
+}
+
+func init() { proto.RegisterFile("messaging/messages.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+ // 280 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x4c, 0x91, 0xdf, 0x4a, 0xb5, 0x40,
+ 0x14, 0xc5, 0xf5, 0xf8, 0x0f, 0xf7, 0xf9, 0xe0, 0xab, 0x4d, 0xc4, 0xdc, 0x44, 0x22, 0x44, 0x42,
+ 0x60, 0x50, 0x6f, 0x50, 0x04, 0x46, 0x44, 0x60, 0x0f, 0x10, 0x53, 0x8a, 0x0c, 0xe4, 0x8c, 0x38,
+ 0x63, 0x9c, 0xf3, 0xa2, 0x3d, 0x4f, 0xec, 0x19, 0x15, 0xef, 0xd6, 0x5e, 0xeb, 0xa7, 0xac, 0xc5,
+ 0x00, 0xeb, 0x5b, 0xad, 0x79, 0x27, 0x64, 0x77, 0xeb, 0x54, 0xab, 0xcb, 0x61, 0x54, 0x46, 0x61,
+ 0xba, 0x26, 0xf9, 0xaf, 0x0f, 0xc9, 0xab, 0x4b, 0x11, 0x21, 0x34, 0xc7, 0xa1, 0x65, 0x7e, 0xe6,
+ 0x17, 0x69, 0x6d, 0x35, 0x96, 0x90, 0x7c, 0xa9, 0xbe, 0xe7, 0xb2, 0x61, 0xbb, 0xcc, 0x2f, 0xf6,
+ 0x77, 0x58, 0xae, 0x1f, 0x97, 0x8f, 0x2e, 0xa9, 0xbc, 0x7a, 0x81, 0xb0, 0x80, 0x48, 0xc8, 0x61,
+ 0x32, 0x2c, 0xb0, 0xf4, 0xc9, 0x86, 0x7e, 0x26, 0xbf, 0xf2, 0x6a, 0x07, 0xe0, 0x0d, 0xc4, 0x6a,
+ 0x32, 0x84, 0x86, 0x16, 0x3d, 0xdd, 0xa0, 0x6f, 0x36, 0xa8, 0xbc, 0x7a, 0x46, 0xf0, 0x0a, 0xc2,
+ 0xf6, 0x20, 0x0c, 0x8b, 0x2c, 0xfa, 0x7f, 0x83, 0x3e, 0x1d, 0x04, 0x81, 0x36, 0x7e, 0x48, 0x21,
+ 0x19, 0xf8, 0xf1, 0x5b, 0xf1, 0x26, 0x7f, 0x81, 0x64, 0xae, 0x47, 0xbb, 0x24, 0xef, 0xd7, 0x5d,
+ 0xa4, 0xc9, 0xe3, 0x63, 0xa7, 0xd9, 0x2e, 0x0b, 0xc8, 0x23, 0x8d, 0x0c, 0x92, 0x56, 0xfe, 0x88,
+ 0x51, 0x49, 0x16, 0x58, 0x7b, 0x39, 0xf3, 0x0c, 0x62, 0x57, 0x09, 0xcf, 0xd7, 0xd6, 0xf4, 0xb7,
+ 0x7f, 0x4b, 0xc1, 0xfc, 0x02, 0x22, 0xbb, 0x0f, 0xcf, 0x20, 0xd2, 0xa6, 0x11, 0x72, 0xce, 0xdd,
+ 0x91, 0x5f, 0x43, 0x48, 0x45, 0xf1, 0x12, 0xf6, 0x54, 0xf4, 0x43, 0x1b, 0x6e, 0x26, 0x6d, 0x99,
+ 0xa8, 0x06, 0xb2, 0xde, 0xad, 0xf3, 0x19, 0xdb, 0x17, 0xba, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff,
+ 0x10, 0x99, 0x01, 0x7a, 0xbd, 0x01, 0x00, 0x00,
+}
diff --git a/messaging/messages.proto b/messaging/messages.proto
new file mode 100644
index 000000000..fea82625d
--- /dev/null
+++ b/messaging/messages.proto
@@ -0,0 +1,31 @@
+syntax = "proto3";
+
+package messaging;
+
+message Message {
+ string type = 1;
+ oneof payload {
+ Command command = 2;
+ Input input = 3;
+ Output output = 4;
+ Exit exit = 5;
+ }
+}
+
+message Command {
+ string name = 1;
+ repeated string args = 2;
+ repeated string environ = 3;
+}
+
+message Output {
+ bytes output = 1;
+}
+
+message Input {
+ bytes stdin = 1;
+}
+
+message Exit {
+ int32 exit_status = 1;
+}
diff --git a/messaging/messaging.go b/messaging/messaging.go
new file mode 100644
index 000000000..c40521232
--- /dev/null
+++ b/messaging/messaging.go
@@ -0,0 +1,67 @@
+package messaging
+
+import (
+ "log"
+
+ proto "github.com/golang/protobuf/proto"
+)
+
+func ParseMessage(rawMsg []byte) (*Message, error) {
+ msg := &Message{}
+
+ err := proto.Unmarshal(rawMsg, msg)
+ return msg, err
+}
+
+func NewCommandMessage(environ []string, name string, args ...string) []byte {
+ msg := &Message{
+ Type: "command",
+ Payload: &Message_Command{
+ &Command{name, args, environ},
+ },
+ }
+
+ return marshal(msg)
+}
+
+func NewInputMessage(input []byte) []byte {
+ msg := &Message{
+ Type: "stdin",
+ Payload: &Message_Input{
+ &Input{input},
+ },
+ }
+
+ return marshal(msg)
+}
+
+func NewOutputMessage(outputType string, output []byte) []byte {
+ msg := &Message{
+ Type: outputType,
+ Payload: &Message_Output{
+ &Output{output},
+ },
+ }
+
+ return marshal(msg)
+}
+
+func NewExitMessage(exitStatus int32) []byte {
+ msg := &Message{
+ Type: "exit",
+ Payload: &Message_Exit{
+ &Exit{exitStatus},
+ },
+ }
+
+ return marshal(msg)
+}
+
+func marshal(msg *Message) []byte {
+ buffer, err := proto.Marshal(msg)
+ if err != nil {
+ log.Fatalln("Failed marshalling a Protobuf message")
+ }
+
+ return buffer
+}
diff --git a/server/command_executor.go b/server/command_executor.go
index 49d118025..284c74644 100644
--- a/server/command_executor.go
+++ b/server/command_executor.go
@@ -2,68 +2,63 @@ package server
import (
"bytes"
- "encoding/json"
+ "io"
"log"
"os/exec"
"syscall"
-)
-
-type CmdRequest struct {
- Cmd []string `json:"cmd"`
-}
-type CmdResponse struct {
- Status string `json:"status"`
- Message string `json:"message"`
- ExitStatus int `json:"exit_status"`
-}
-
-func CommandExecutorCallback(input []byte) []byte {
- req := CmdRequest{}
+ "gitlab.com/gitlab-org/git-access-daemon/messaging"
+)
- err := json.Unmarshal(input, &req)
- if err != nil {
- return errorResponse("Error parsing JSON request", 255)
+func CommandExecutor(chans *commChans) {
+ rawMsg, ok := <-chans.inChan
+ if !ok {
+ return
}
- output, err := runCommand(req.Cmd[0], req.Cmd[1:]...)
-
+ msg, err := messaging.ParseMessage(rawMsg)
if err != nil {
- return errorResponse(
- string(output.Bytes()),
- extractExitStatusFromError(err.(*exec.ExitError)),
- )
+ return
+ }
+ if msg.Type != "command" {
+ return
}
- return successResponse(string(output.Bytes()))
+ runCommand(chans, msg.GetCommand())
}
-func runCommand(name string, args ...string) (bytes.Buffer, error) {
- var stdoutBuf bytes.Buffer
- var stderrBuf bytes.Buffer
+func runCommand(chans *commChans, commandMsg *messaging.Command) {
+ name := commandMsg.Name
+ args := commandMsg.Args
log.Println("Executing command:", name, "with args", args)
- cmd := makeCommand(name, args...)
- cmd.Stdout = &stdoutBuf
- cmd.Stderr = &stderrBuf
+ stdinReader, stdinWriter := io.Pipe()
+ stdoutReader, stdoutWriter := io.Pipe()
+ stderrReader, stderrWriter := io.Pipe()
- err := cmd.Run()
- if err != nil {
- return stderrBuf, err
- }
-
- return stdoutBuf, nil
-}
+ go streamOut("stdout", stdoutReader, chans)
+ go streamOut("stderr", stderrReader, chans)
+ go streamIn(stdinWriter, chans)
-// Based on git.gitCommand from gitlab-workhorse
-func makeCommand(name string, args ...string) *exec.Cmd {
cmd := exec.Command(name, args...)
// Start the command in its own process group (nice for signalling)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ cmd.Env = commandMsg.Environ
+ cmd.Stdin = stdinReader
+ cmd.Stdout = stdoutWriter
+ cmd.Stderr = stderrWriter
- return cmd
+ err := cmd.Run()
+
+ if err != nil {
+ exitStatus := int32(extractExitStatusFromError(err.(*exec.ExitError)))
+ chans.outChan <- messaging.NewExitMessage(exitStatus)
+ return
+ }
+
+ chans.outChan <- messaging.NewExitMessage(0)
}
func extractExitStatusFromError(err *exec.ExitError) int {
@@ -77,24 +72,52 @@ func extractExitStatusFromError(err *exec.ExitError) int {
return 255
}
-func errorResponse(message string, exit_status int) []byte {
- return makeResponse("error", message, exit_status)
-}
+func streamOut(streamName string, streamPipe io.Reader, chans *commChans) {
+ // TODO: Move buffer out of the loop and use defer instead of finished
+ finished := false
-func successResponse(message string) []byte {
- return makeResponse("success", message, 0)
-}
+ for {
+ buffer := make([]byte, bytes.MinRead)
-func makeResponse(status string, message string, exit_status int) []byte {
- res := CmdResponse{status, message, exit_status}
- tempBuf, err := json.Marshal(res)
+ n, err := streamPipe.Read(buffer)
+ if err == io.EOF {
+ finished = true
+ }
- if err != nil {
- log.Fatalln("Failed marshalling a JSON response")
- }
+ if n < bytes.MinRead {
+ buffer = buffer[:n]
+ }
+
+ chans.outChan <- messaging.NewOutputMessage(streamName, buffer)
- buf := bytes.NewBuffer(tempBuf)
- buf.WriteString("\n")
+ if finished {
+ return
+ }
+ }
+}
- return buf.Bytes()
+func streamIn(streamPipe *io.PipeWriter, chans *commChans) {
+ defer streamPipe.Close()
+
+ for {
+ rawMsg, ok := <-chans.inChan
+ if !ok {
+ return
+ }
+
+ msg, err := messaging.ParseMessage(rawMsg)
+ if msg.Type != "stdin" {
+ continue
+ }
+
+ stdin := msg.GetInput().Stdin
+ if len(stdin) == 0 {
+ return
+ }
+
+ _, err = streamPipe.Write(stdin)
+ if err != nil {
+ return
+ }
+ }
}
diff --git a/server/command_executor_test.go b/server/command_executor_test.go
index f983d9bd8..4fb1fd0cf 100644
--- a/server/command_executor_test.go
+++ b/server/command_executor_test.go
@@ -1,103 +1,55 @@
package server
import (
- "bufio"
- "bytes"
- "encoding/json"
- "net"
- "os"
- "strings"
"testing"
- "time"
-)
-
-const serviceAddress = "127.0.0.1:6667"
-
-func TestMain(m *testing.M) {
- service := NewService()
-
- go service.Serve(serviceAddress, CommandExecutorCallback)
- defer service.Stop()
- time.Sleep(10 * time.Millisecond)
- os.Exit(m.Run())
-}
+ "gitlab.com/gitlab-org/git-access-daemon/messaging"
+)
func TestRunningCommandSuccessfully(t *testing.T) {
- res := responseFromCommand(`{"cmd":["ls", "-hal"]}`, t)
-
- if res.ExitStatus != 0 {
- t.Error("Expected exit status of 0, found %v", res.ExitStatus)
- }
-
- if res.Status != "success" { // We should have these statuses as constants in the package
- t.Error("Expected status of success, found %v", res.Status)
- }
-}
-
-func TestRunningCommandUnsuccessfully(t *testing.T) {
- res := responseFromCommand(`{"cmd":["ls", "/file-that-does-not-exist"]}`, t)
-
- if res.ExitStatus == 0 {
- t.Error("Expected a failure exit status, got 0")
- }
+ chans := newCommChans()
+ defer chans.Close()
- if res.Status != "error" {
- t.Error("Expected error status, got %v", res.Status)
- }
+ go CommandExecutor(chans)
- if !(strings.Contains(res.Message, "cannot access") ||
- strings.Contains(res.Message, "No such file or directory")) {
- t.Error("Expected ls error message, got %v", res.Message)
- }
-}
-
-func TestMalformedCommand(t *testing.T) {
- res := responseFromCommand(`{"cmd":["ls", "/file-that-does-not-exist"}`, t)
-
- if res.Status != "error" {
- t.Error("Expected error status, got %s", res.Status)
- }
+ chans.inChan <- messaging.NewCommandMessage([]string{}, "ls", "-hal")
+ chans.inChan <- messaging.NewInputMessage([]byte{})
- if res.Message != "Error parsing JSON request" {
- t.Error("Expected parsing json error message, got %s", res.Message)
- }
+ for {
+ rawMsg := <-chans.outChan
+ msg, _ := messaging.ParseMessage(rawMsg)
- if res.ExitStatus != 255 {
- t.Error("Expected exit status 255, got %v", res.ExitStatus)
+ switch msg.Type {
+ case "exit":
+ exit_status := msg.GetExit().ExitStatus
+ if exit_status != 0 {
+ t.Error("Expected exit status of 0, found %v", exit_status)
+ }
+ return
+ }
}
}
-// These 2 functions could be interesting to reuse in the client
-// For this to happen we should remove the testing dependency and
-// then move them to an accessible place.
-func responseFromCommand(cmd string, t *testing.T) CmdResponse {
- var response CmdResponse
- buffer := bytesFromCommand(cmd, t)
- err := json.Unmarshal(buffer, &response)
- if err != nil {
- t.Error(err)
- }
- return response
-}
+func TestRunningCommandUnsuccessfully(t *testing.T) {
+ chans := newCommChans()
+ defer chans.Close()
-func bytesFromCommand(cmd string, t *testing.T) []byte {
- conn, err := net.Dial("tcp", serviceAddress)
- if err != nil {
- t.Fatal(err)
- }
+ go CommandExecutor(chans)
- defer conn.Close()
+ chans.inChan <- messaging.NewCommandMessage([]string{}, "ls", "/file-that-does-not-exist")
+ chans.inChan <- messaging.NewInputMessage([]byte{})
- if _, err := conn.Write([]byte(cmd + "\n")); err != nil {
- t.Error(err)
- }
+ for {
+ rawMsg := <-chans.outChan
+ msg, _ := messaging.ParseMessage(rawMsg)
- reader := bufio.NewReader(conn)
- buffer, err := reader.ReadBytes('\n')
- if err != nil {
- t.Error(err)
+ switch msg.Type {
+ case "exit":
+ exit_status := msg.GetExit().ExitStatus
+ if exit_status == 0 {
+ t.Error("Expected a failure exit status, got 0")
+ }
+ return
+ }
}
-
- return bytes.TrimSpace(buffer)
}
diff --git a/server/server.go b/server/server.go
index 72a06c3e5..83b88e67f 100644
--- a/server/server.go
+++ b/server/server.go
@@ -1,31 +1,30 @@
package server
import (
- "bufio"
"io"
"log"
"net"
"sync"
"time"
+
+ "gitlab.com/gitlab-org/git-access-daemon/messaging"
)
-type Service struct {
+type Server struct {
ch chan bool
waitGroup *sync.WaitGroup
}
-type Callback func([]byte) []byte
-
-func NewService() *Service {
- service := &Service{
+func NewServer() *Server {
+ server := &Server{
ch: make(chan bool),
waitGroup: &sync.WaitGroup{},
}
- service.waitGroup.Add(1)
- return service
+ server.waitGroup.Add(1)
+ return server
}
-func (s *Service) Serve(address string, cb Callback) {
+func (s *Server) Serve(address string, service Service) {
listener, err := newListener(address)
if err != nil {
log.Fatalln(err)
@@ -50,7 +49,10 @@ func (s *Service) Serve(address string, cb Callback) {
}
log.Println("Client connected from ", conn.RemoteAddr())
s.waitGroup.Add(1)
- go s.serve(conn, cb)
+
+ chans := newCommChans()
+ go service(chans)
+ go s.serve(conn, chans)
}
}
@@ -62,15 +64,31 @@ func newListener(address string) (*net.TCPListener, error) {
return net.ListenTCP("tcp", tcpAddress)
}
-func (s *Service) Stop() {
+func (s *Server) Stop() {
close(s.ch)
s.waitGroup.Wait()
}
-func (s *Service) serve(conn *net.TCPConn, cb Callback) {
+func (s *Server) serve(conn *net.TCPConn, chans *commChans) {
defer conn.Close()
defer s.waitGroup.Done()
+ messagesConn := messaging.NewMessagesConn(conn)
+
+ go func() {
+ for {
+ ret, ok := <-chans.outChan
+ if !ok {
+ return
+ }
+
+ if _, err := messagesConn.Write(ret); nil != err {
+ log.Println(err)
+ return
+ }
+ }
+ }()
+
for {
select {
case <-s.ch:
@@ -81,8 +99,7 @@ func (s *Service) serve(conn *net.TCPConn, cb Callback) {
conn.SetDeadline(time.Now().Add(1e9))
- reader := bufio.NewReader(conn)
- buffer, err := reader.ReadBytes('\n')
+ buffer, err := messagesConn.Read()
if err != nil {
if err == io.EOF {
log.Println("Client", conn.RemoteAddr(), "closed the connection")
@@ -94,10 +111,8 @@ func (s *Service) serve(conn *net.TCPConn, cb Callback) {
log.Println(err)
}
- ret := cb(buffer)
- if _, err := conn.Write(ret); nil != err {
- log.Println(err)
- return
- }
+ chans.inChan <- buffer
}
+
+ chans.Close()
}
diff --git a/server/server_test.go b/server/server_test.go
index 83f2f5453..81bb318e8 100644
--- a/server/server_test.go
+++ b/server/server_test.go
@@ -1,35 +1,42 @@
package server
import (
- "bufio"
"fmt"
"net"
"testing"
"time"
+
+ "gitlab.com/gitlab-org/git-access-daemon/messaging"
)
func TestServerStandingUp(t *testing.T) {
- service := NewService()
+ server := NewServer()
address := "127.0.0.1:6666"
- go service.Serve(address, func(input []byte) []byte { return input })
- defer service.Stop()
+ go server.Serve(address, func(chans *commChans) {
+ a := (<-chans.inChan)
+ chans.outChan <- a
+ })
+ defer server.Stop()
- // Give service a little time to start listening for connections
+ // Give server a little time to start listening for connections
time.Sleep(10 * time.Millisecond)
conn, err := net.Dial("tcp", address)
if err != nil {
t.Fatal(err)
}
- if _, err := conn.Write([]byte("hola hola!\n")); err != nil {
+ messagesConn := messaging.NewMessagesConn(conn)
+
+ if _, err := messagesConn.Write([]byte("hola hola!")); err != nil {
t.Error(err)
}
- reader := bufio.NewReader(conn)
- buffer, err := reader.ReadBytes('\n')
+
+ buffer, err := messagesConn.Read()
if err != nil {
t.Error(err)
}
- conn.Close()
+ messagesConn.Close()
+
fmt.Println("Received from server:", string(buffer))
}
diff --git a/server/service.go b/server/service.go
new file mode 100644
index 000000000..1dea68435
--- /dev/null
+++ b/server/service.go
@@ -0,0 +1,20 @@
+package server
+
+type commChans struct {
+ inChan chan []byte
+ outChan chan []byte
+}
+
+type Service func(*commChans)
+
+func newCommChans() *commChans {
+ return &commChans{
+ inChan: make(chan []byte),
+ outChan: make(chan []byte),
+ }
+}
+
+func (chans *commChans) Close() {
+ close(chans.inChan)
+ close(chans.outChan)
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
new file mode 100644
index 000000000..19ecc88d2
--- /dev/null
+++ b/vendor/vendor.json
@@ -0,0 +1,13 @@
+{
+ "comment": "",
+ "ignore": "test",
+ "package": [
+ {
+ "checksumSHA1": "kBeNcaKk56FguvPSUCEaH6AxpRc=",
+ "path": "github.com/golang/protobuf/proto",
+ "revision": "8ee79997227bf9b34611aee7946ae64735e6fd93",
+ "revisionTime": "2016-11-17T03:31:26Z"
+ }
+ ],
+ "rootPath": "gitlab.com/gitlab-org/git-access-daemon"
+}