From e872757e33f17a375135a093e738d4fe01f388f8 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 26 Nov 2023 16:35:23 +0700 Subject: Add Housekeeping to the transaction log entry This commit adds Housekeeping to the transaction log entry protobuf. Up to this point, the pack-refs housekeeping task is the only supported task. The task entry includes the list of pruned references. The packed-refs file will be copied from the transaction's staging directory over log entry and then the destination repository. That file's name is static. Hence, we don't need to store its content at the moment. When the log entry is applied, Gitaly replaces the existing packed-refs file by the new one in the log entry. It also clears mentioned loose references in the disk. --- proto/go/gitalypb/log.pb.go | 259 +++++++++++++++++++++++++++++++++++--------- proto/log.proto | 18 +++ 2 files changed, 223 insertions(+), 54 deletions(-) diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go index ccc1f4f1b..10f881f87 100644 --- a/proto/go/gitalypb/log.pb.go +++ b/proto/go/gitalypb/log.pb.go @@ -51,6 +51,8 @@ type LogEntry struct { RepositoryCreation *LogEntry_RepositoryCreation `protobuf:"bytes,7,opt,name=repository_creation,json=repositoryCreation,proto3" json:"repository_creation,omitempty"` // alternate_update records a change to the repository's 'objects/info/alternates' file. AlternateUpdate *LogEntry_AlternateUpdate `protobuf:"bytes,8,opt,name=alternate_update,json=alternateUpdate,proto3" json:"alternate_update,omitempty"` + // housekeeping, when set, indicates this log entry contains a housekeeping task. + Housekeeping *LogEntry_Housekeeping `protobuf:"bytes,9,opt,name=housekeeping,proto3" json:"housekeeping,omitempty"` } func (x *LogEntry) Reset() { @@ -141,6 +143,13 @@ func (x *LogEntry) GetAlternateUpdate() *LogEntry_AlternateUpdate { return nil } +func (x *LogEntry) GetHousekeeping() *LogEntry_Housekeeping { + if x != nil { + return x.Housekeeping + } + return nil +} + // LSN serializes a log sequence number. It's used for storing a partition's // applied LSN in the database. // @@ -482,6 +491,57 @@ func (x *LogEntry_AlternateUpdate) GetPath() string { return "" } +// Housekeeping models a housekeeping run. It is supposed to handle housekeeping tasks for repositories such as the +// cleanup of unneeded files and optimizations for the repository's data structures. It is a collection of smaller +// tasks. +type LogEntry_Housekeeping struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // pack_refs signifies if the housekeeping run includes a pack-refs task. + PackRefs *LogEntry_Housekeeping_PackRefs `protobuf:"bytes,1,opt,name=pack_refs,json=packRefs,proto3" json:"pack_refs,omitempty"` +} + +func (x *LogEntry_Housekeeping) Reset() { + *x = LogEntry_Housekeeping{} + if protoimpl.UnsafeEnabled { + mi := &file_log_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogEntry_Housekeeping) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry_Housekeeping) ProtoMessage() {} + +func (x *LogEntry_Housekeeping) ProtoReflect() protoreflect.Message { + mi := &file_log_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry_Housekeeping.ProtoReflect.Descriptor instead. +func (*LogEntry_Housekeeping) Descriptor() ([]byte, []int) { + return file_log_proto_rawDescGZIP(), []int{0, 6} +} + +func (x *LogEntry_Housekeeping) GetPackRefs() *LogEntry_Housekeeping_PackRefs { + if x != nil { + return x.PackRefs + } + return nil +} + // Change models a single reference change. type LogEntry_ReferenceTransaction_Change struct { state protoimpl.MessageState @@ -500,7 +560,7 @@ type LogEntry_ReferenceTransaction_Change struct { func (x *LogEntry_ReferenceTransaction_Change) Reset() { *x = LogEntry_ReferenceTransaction_Change{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[8] + mi := &file_log_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -513,7 +573,7 @@ func (x *LogEntry_ReferenceTransaction_Change) String() string { func (*LogEntry_ReferenceTransaction_Change) ProtoMessage() {} func (x *LogEntry_ReferenceTransaction_Change) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[8] + mi := &file_log_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -543,12 +603,63 @@ func (x *LogEntry_ReferenceTransaction_Change) GetNewOid() []byte { return nil } +// PackRefs models a pack-refs housekeeping task. This task is to pack loose references into a singular packed-refs +// file to optimize ref accessing time. In other words, it's a wrapper for git-pack-refs command. +type LogEntry_Housekeeping_PackRefs struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // pruned_refs is the list of fully qualified references to be pruned. Gitaly removes the loose reference files on + // the disk. They still stay intact in the packed-refs. + PrunedRefs [][]byte `protobuf:"bytes,1,rep,name=pruned_refs,json=prunedRefs,proto3" json:"pruned_refs,omitempty"` +} + +func (x *LogEntry_Housekeeping_PackRefs) Reset() { + *x = LogEntry_Housekeeping_PackRefs{} + if protoimpl.UnsafeEnabled { + mi := &file_log_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogEntry_Housekeeping_PackRefs) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry_Housekeeping_PackRefs) ProtoMessage() {} + +func (x *LogEntry_Housekeeping_PackRefs) ProtoReflect() protoreflect.Message { + mi := &file_log_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry_Housekeeping_PackRefs.ProtoReflect.Descriptor instead. +func (*LogEntry_Housekeeping_PackRefs) Descriptor() ([]byte, []int) { + return file_log_proto_rawDescGZIP(), []int{0, 6, 0} +} + +func (x *LogEntry_Housekeeping_PackRefs) GetPrunedRefs() [][]byte { + if x != nil { + return x.PrunedRefs + } + return nil +} + var File_log_proto protoreflect.FileDescriptor var file_log_proto_rawDesc = []byte{ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x22, 0x8b, 0x08, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x23, + 0x6f, 0x22, 0xd1, 0x09, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x5c, 0x0a, 0x16, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, @@ -585,40 +696,52 @@ var file_log_proto_rawDesc = []byte{ 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x41, 0x6c, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x0f, 0x61, 0x6c, 0x74, 0x65, 0x72, 0x6e, - 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x1a, 0xa8, 0x01, 0x0a, 0x14, 0x52, 0x65, - 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, - 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x67, - 0x65, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x1a, 0x48, 0x0a, 0x06, 0x43, 0x68, - 0x61, 0x6e, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, - 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, - 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, - 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, - 0x77, 0x4f, 0x69, 0x64, 0x1a, 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, - 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, - 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, - 0x6d, 0x65, 0x1a, 0x3d, 0x0a, 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, - 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, 0x6f, - 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, 0x61, - 0x72, 0x1a, 0x4f, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x43, - 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x0d, 0x6f, 0x62, 0x6a, 0x65, 0x63, - 0x74, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x46, 0x6f, - 0x72, 0x6d, 0x61, 0x74, 0x52, 0x0c, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x46, 0x6f, 0x72, 0x6d, - 0x61, 0x74, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x25, 0x0a, 0x0f, 0x41, 0x6c, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, - 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x22, - 0x1b, 0x0a, 0x03, 0x4c, 0x53, 0x4e, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x34, 0x5a, 0x32, - 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, - 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x41, 0x0a, 0x0c, 0x68, 0x6f, 0x75, + 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x52, 0x0c, + 0x68, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x1a, 0xa8, 0x01, 0x0a, + 0x14, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, + 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x68, + 0x61, 0x6e, 0x67, 0x65, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x1a, 0x48, 0x0a, + 0x06, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, + 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, + 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, + 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, + 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, + 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, + 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, + 0x73, 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, + 0x73, 0x54, 0x61, 0x72, 0x1a, 0x4f, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, + 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x0d, 0x6f, 0x62, + 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x14, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4f, 0x62, 0x6a, 0x65, 0x63, + 0x74, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x52, 0x0c, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x46, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, + 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x25, 0x0a, 0x0f, 0x41, + 0x6c, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x12, + 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, + 0x74, 0x68, 0x1a, 0x80, 0x01, 0x0a, 0x0c, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, + 0x69, 0x6e, 0x67, 0x12, 0x43, 0x0a, 0x09, 0x70, 0x61, 0x63, 0x6b, 0x5f, 0x72, 0x65, 0x66, 0x73, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, + 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, 0x52, 0x08, + 0x70, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, 0x1a, 0x2b, 0x0a, 0x08, 0x50, 0x61, 0x63, 0x6b, + 0x52, 0x65, 0x66, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x72, 0x75, 0x6e, 0x65, 0x64, 0x5f, 0x72, + 0x65, 0x66, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, 0x70, 0x72, 0x75, 0x6e, 0x65, + 0x64, 0x52, 0x65, 0x66, 0x73, 0x22, 0x1b, 0x0a, 0x03, 0x4c, 0x53, 0x4e, 0x12, 0x14, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -633,7 +756,7 @@ func file_log_proto_rawDescGZIP() []byte { return file_log_proto_rawDescData } -var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_log_proto_goTypes = []interface{}{ (*LogEntry)(nil), // 0: gitaly.LogEntry (*LSN)(nil), // 1: gitaly.LSN @@ -643,23 +766,27 @@ var file_log_proto_goTypes = []interface{}{ (*LogEntry_RepositoryCreation)(nil), // 5: gitaly.LogEntry.RepositoryCreation (*LogEntry_RepositoryDeletion)(nil), // 6: gitaly.LogEntry.RepositoryDeletion (*LogEntry_AlternateUpdate)(nil), // 7: gitaly.LogEntry.AlternateUpdate - (*LogEntry_ReferenceTransaction_Change)(nil), // 8: gitaly.LogEntry.ReferenceTransaction.Change - (ObjectFormat)(0), // 9: gitaly.ObjectFormat + (*LogEntry_Housekeeping)(nil), // 8: gitaly.LogEntry.Housekeeping + (*LogEntry_ReferenceTransaction_Change)(nil), // 9: gitaly.LogEntry.ReferenceTransaction.Change + (*LogEntry_Housekeeping_PackRefs)(nil), // 10: gitaly.LogEntry.Housekeeping.PackRefs + (ObjectFormat)(0), // 11: gitaly.ObjectFormat } var file_log_proto_depIdxs = []int32{ - 2, // 0: gitaly.LogEntry.reference_transactions:type_name -> gitaly.LogEntry.ReferenceTransaction - 3, // 1: gitaly.LogEntry.default_branch_update:type_name -> gitaly.LogEntry.DefaultBranchUpdate - 4, // 2: gitaly.LogEntry.custom_hooks_update:type_name -> gitaly.LogEntry.CustomHooksUpdate - 6, // 3: gitaly.LogEntry.repository_deletion:type_name -> gitaly.LogEntry.RepositoryDeletion - 5, // 4: gitaly.LogEntry.repository_creation:type_name -> gitaly.LogEntry.RepositoryCreation - 7, // 5: gitaly.LogEntry.alternate_update:type_name -> gitaly.LogEntry.AlternateUpdate - 8, // 6: gitaly.LogEntry.ReferenceTransaction.changes:type_name -> gitaly.LogEntry.ReferenceTransaction.Change - 9, // 7: gitaly.LogEntry.RepositoryCreation.object_format:type_name -> gitaly.ObjectFormat - 8, // [8:8] is the sub-list for method output_type - 8, // [8:8] is the sub-list for method input_type - 8, // [8:8] is the sub-list for extension type_name - 8, // [8:8] is the sub-list for extension extendee - 0, // [0:8] is the sub-list for field type_name + 2, // 0: gitaly.LogEntry.reference_transactions:type_name -> gitaly.LogEntry.ReferenceTransaction + 3, // 1: gitaly.LogEntry.default_branch_update:type_name -> gitaly.LogEntry.DefaultBranchUpdate + 4, // 2: gitaly.LogEntry.custom_hooks_update:type_name -> gitaly.LogEntry.CustomHooksUpdate + 6, // 3: gitaly.LogEntry.repository_deletion:type_name -> gitaly.LogEntry.RepositoryDeletion + 5, // 4: gitaly.LogEntry.repository_creation:type_name -> gitaly.LogEntry.RepositoryCreation + 7, // 5: gitaly.LogEntry.alternate_update:type_name -> gitaly.LogEntry.AlternateUpdate + 8, // 6: gitaly.LogEntry.housekeeping:type_name -> gitaly.LogEntry.Housekeeping + 9, // 7: gitaly.LogEntry.ReferenceTransaction.changes:type_name -> gitaly.LogEntry.ReferenceTransaction.Change + 11, // 8: gitaly.LogEntry.RepositoryCreation.object_format:type_name -> gitaly.ObjectFormat + 10, // 9: gitaly.LogEntry.Housekeeping.pack_refs:type_name -> gitaly.LogEntry.Housekeeping.PackRefs + 10, // [10:10] is the sub-list for method output_type + 10, // [10:10] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_log_proto_init() } @@ -766,6 +893,18 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LogEntry_Housekeeping); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_log_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*LogEntry_ReferenceTransaction_Change); i { case 0: return &v.state @@ -777,6 +916,18 @@ func file_log_proto_init() { return nil } } + file_log_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LogEntry_Housekeeping_PackRefs); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -784,7 +935,7 @@ func file_log_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_log_proto_rawDesc, NumEnums: 0, - NumMessages: 9, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/log.proto b/proto/log.proto index 1df859a7e..5339a725a 100644 --- a/proto/log.proto +++ b/proto/log.proto @@ -60,6 +60,22 @@ message LogEntry { string path = 1; } + // Housekeeping models a housekeeping run. It is supposed to handle housekeeping tasks for repositories such as the + // cleanup of unneeded files and optimizations for the repository's data structures. It is a collection of smaller + // tasks. + message Housekeeping { + // PackRefs models a pack-refs housekeeping task. This task is to pack loose references into a singular packed-refs + // file to optimize ref accessing time. In other words, it's a wrapper for git-pack-refs command. + message PackRefs { + // pruned_refs is the list of fully qualified references to be pruned. Gitaly removes the loose reference files on + // the disk. They still stay intact in the packed-refs. + repeated bytes pruned_refs = 1; + } + + // pack_refs signifies if the housekeeping run includes a pack-refs task. + PackRefs pack_refs = 1; + } + // relative_path is the relative path of the repository the changes in the // log entry are targeting. string relative_path = 1; @@ -82,6 +98,8 @@ message LogEntry { RepositoryCreation repository_creation = 7; // alternate_update records a change to the repository's 'objects/info/alternates' file. AlternateUpdate alternate_update = 8; + // housekeeping, when set, indicates this log entry contains a housekeeping task. + Housekeeping housekeeping = 9; } // LSN serializes a log sequence number. It's used for storing a partition's -- cgit v1.2.3 From 081014df5deb8522ff7fa452eeb898d79ef5397d Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Wed, 6 Dec 2023 17:05:46 +0700 Subject: Add support for custom setup to transaction manager test At the moment, all the sub-tests of the transaction manager share the same test setup. In some later commits, we'll need to implement a new set of tests with totally different setup. This commit lays the foundation for custom setup. --- .../gitaly/storage/storagemgr/testhelper_test.go | 1 + .../storage/storagemgr/transaction_manager_test.go | 161 +++++++++++---------- 2 files changed, 84 insertions(+), 78 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 28e8aee9d..2f8cb9234 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -399,6 +399,7 @@ type steps []any type transactionTestCase struct { desc string steps steps + customSetup func(*testing.T, context.Context, partitionID, string) testTransactionSetup expectedState StateAssertion } diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go index a4b613497..2a2b4e41e 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go @@ -153,99 +153,99 @@ func reverseIndexFileDirectoryEntry(cfg config.Cfg) testhelper.DirectoryEntry { } } -func TestTransactionManager(t *testing.T) { - t.Parallel() +func setupTest(t *testing.T, ctx context.Context, testPartitionID partitionID, relativePath string) testTransactionSetup { + t.Helper() + + cfg := testcfg.Build(t) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: relativePath, + }) + + firstCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents()) + secondCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(firstCommitOID)) + thirdCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(secondCommitOID)) + divergingCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(firstCommitOID), gittest.WithMessage("diverging commit")) + + cmdFactory := gittest.NewCommandFactory(t, cfg) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + logger := testhelper.NewLogger(t) + locator := config.NewLocator(cfg) + localRepo := localrepo.New( + logger, + locator, + cmdFactory, + catfileCache, + repo, + ) - ctx := testhelper.Context(t) + objectHash, err := localRepo.ObjectHash(ctx) + require.NoError(t, err) - // testPartitionID is the partition ID used in the tests for the TransactionManager. - const testPartitionID partitionID = 1 + hasher := objectHash.Hash() + _, err = hasher.Write([]byte("content does not matter")) + require.NoError(t, err) + nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil))) + require.NoError(t, err) - setupTest := func(t *testing.T, relativePath string) testTransactionSetup { + packCommit := func(oid git.ObjectID) []byte { t.Helper() - cfg := testcfg.Build(t) - - repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - RelativePath: relativePath, - }) - - firstCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents()) - secondCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(firstCommitOID)) - thirdCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(secondCommitOID)) - divergingCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(firstCommitOID), gittest.WithMessage("diverging commit")) - - cmdFactory := gittest.NewCommandFactory(t, cfg) - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - logger := testhelper.NewLogger(t) - locator := config.NewLocator(cfg) - localRepo := localrepo.New( - logger, - locator, - cmdFactory, - catfileCache, - repo, + var pack bytes.Buffer + require.NoError(t, + localRepo.PackObjects(ctx, strings.NewReader(oid.String()), &pack), ) - objectHash, err := localRepo.ObjectHash(ctx) - require.NoError(t, err) + return pack.Bytes() + } - hasher := objectHash.Hash() - _, err = hasher.Write([]byte("content does not matter")) - require.NoError(t, err) - nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil))) - require.NoError(t, err) + return testTransactionSetup{ + PartitionID: testPartitionID, + RelativePath: relativePath, + RepositoryPath: repoPath, + Repo: localRepo, + Config: cfg, + ObjectHash: objectHash, + CommandFactory: cmdFactory, + RepositoryFactory: localrepo.NewFactory(logger, locator, cmdFactory, catfileCache), + NonExistentOID: nonExistentOID, + Commits: testTransactionCommits{ + First: testTransactionCommit{ + OID: firstCommitOID, + Pack: packCommit(firstCommitOID), + }, + Second: testTransactionCommit{ + OID: secondCommitOID, + Pack: packCommit(secondCommitOID), + }, + Third: testTransactionCommit{ + OID: thirdCommitOID, + Pack: packCommit(thirdCommitOID), + }, + Diverging: testTransactionCommit{ + OID: divergingCommitOID, + Pack: packCommit(divergingCommitOID), + }, + }, + } +} - packCommit := func(oid git.ObjectID) []byte { - t.Helper() +func TestTransactionManager(t *testing.T) { + t.Parallel() - var pack bytes.Buffer - require.NoError(t, - localRepo.PackObjects(ctx, strings.NewReader(oid.String()), &pack), - ) + ctx := testhelper.Context(t) - return pack.Bytes() - } - - return testTransactionSetup{ - PartitionID: testPartitionID, - RelativePath: relativePath, - RepositoryPath: repoPath, - Repo: localRepo, - Config: cfg, - ObjectHash: objectHash, - CommandFactory: cmdFactory, - RepositoryFactory: localrepo.NewFactory(logger, locator, cmdFactory, catfileCache), - NonExistentOID: nonExistentOID, - Commits: testTransactionCommits{ - First: testTransactionCommit{ - OID: firstCommitOID, - Pack: packCommit(firstCommitOID), - }, - Second: testTransactionCommit{ - OID: secondCommitOID, - Pack: packCommit(secondCommitOID), - }, - Third: testTransactionCommit{ - OID: thirdCommitOID, - Pack: packCommit(thirdCommitOID), - }, - Diverging: testTransactionCommit{ - OID: divergingCommitOID, - Pack: packCommit(divergingCommitOID), - }, - }, - } - } + // testPartitionID is the partition ID used in the tests for the TransactionManager. + const testPartitionID partitionID = 1 // A clean repository is setup for each test. We build a setup ahead of the tests here once to // get deterministic commit IDs, relative path and object hash we can use to build the declarative // test cases. relativePath := gittest.NewRepositoryName(t) - setup := setupTest(t, relativePath) + setup := setupTest(t, ctx, testPartitionID, relativePath) var testCases []transactionTestCase subTests := [][]transactionTestCase{ @@ -268,7 +268,12 @@ func TestTransactionManager(t *testing.T) { t.Parallel() // Setup the repository with the exact same state as what was used to build the test cases. - setup := setupTest(t, relativePath) + var setup testTransactionSetup + if tc.customSetup != nil { + setup = tc.customSetup(t, ctx, testPartitionID, relativePath) + } else { + setup = setupTest(t, ctx, testPartitionID, relativePath) + } runTransactionTest(t, ctx, tc, setup) }) } -- cgit v1.2.3 From 90622c7f1f80692acb4763c5590c8482b637b983 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 4 Dec 2023 12:59:25 +0700 Subject: Add keeparound entries to Transaction Manager Some types of transactions, such as housekeeping, operate on a snapshot repository. There is a gap between the transaction doing its work and the time when it is committed. They need to verify if concurrent operations can cause conflict. This commit lets the transaction manager maintain a list of keeparound log entries. These log entries are still kept around even after they are applied. They are removed when no active readers are accessing the corresponding snapshots. --- .../gitaly/storage/storagemgr/testhelper_test.go | 5 + .../storage/storagemgr/transaction_manager.go | 87 ++++++- .../storage/storagemgr/transaction_manager_test.go | 254 +++++++++++++++++++++ 3 files changed, 345 insertions(+), 1 deletion(-) diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 2f8cb9234..0618b1146 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -392,6 +392,9 @@ type StateAssertion struct { Repositories RepositoryStates } +// AdhocAssertion allows a test to add some custom assertions apart from the built-in assertions above. +type AdhocAssertion func(*testing.T, context.Context, *TransactionManager) + // steps defines execution steps in a test. Each test case can define multiple steps to exercise // more complex behavior. type steps []any @@ -694,6 +697,8 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas }), ) }, step.Repositories) + case AdhocAssertion: + step(t, ctx, transactionManager) default: t.Fatalf("unhandled step type: %T", step) } diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 584cd56fa..bc42e05d6 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -2,6 +2,7 @@ package storagemgr import ( "bytes" + "container/list" "context" "encoding/binary" "errors" @@ -259,6 +260,16 @@ func (mgr *TransactionManager) Begin(ctx context.Context, relativePath string, s mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Add(1) defer mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Done() readReady := mgr.snapshotLocks[txn.snapshotLSN].applied + + var entry *committedEntry + if !txn.readOnly { + var err error + entry, err = mgr.updateCommittedEntry(txn.snapshotLSN) + if err != nil { + return nil, err + } + } + mgr.mutex.Unlock() txn.finish = func() error { @@ -270,6 +281,12 @@ func (mgr *TransactionManager) Begin(ctx context.Context, relativePath string, s } } + if !txn.readOnly { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + mgr.cleanCommittedEntry(entry) + } + return nil } @@ -599,6 +616,17 @@ type snapshotLock struct { activeSnapshotters sync.WaitGroup } +// committedEntry is a wrapper for a log entry. It is used to keep track of entries in which their snapshots are still +// accessed by other transactions. +type committedEntry struct { + // lsn is the associated LSN of the entry + lsn LSN + // entry is the pointer to the corresponding log entry. + entry *gitalypb.LogEntry + // snapshotReaders accounts for the number of transaction readers of the snapshot. + snapshotReaders int +} + // TransactionManager is responsible for transaction management of a single repository. Each repository has // a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from // the admissionQueue. Each admitted write is processed in three steps: @@ -683,7 +711,8 @@ type TransactionManager struct { // Run and Begin which are ran in different goroutines. mutex sync.Mutex - // snapshotLocks contains state used for synchronizing snapshotters with the log application. + // snapshotLocks contains state used for synchronizing snapshotters with the log application. The + // lock is released after the corresponding log entry is applied. snapshotLocks map[LSN]*snapshotLock // appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log. @@ -697,6 +726,12 @@ type TransactionManager struct { // the partition. It's keyed by the LSN the transaction is waiting to be applied and the // value is the resultChannel that is waiting the result. awaitingTransactions map[LSN]resultChannel + // committedEntries keeps some latest appended log entries around. Some types of transactions, such as + // housekeeping, operate on snapshot repository. There is a gap between transaction doing its work and the time + // when it is committed. They need to verify if concurrent operations can cause conflict. These log entries are + // still kept around even after they are applied. They are removed when there are no active readers accessing + // the corresponding snapshots. + committedEntries *list.List } // NewTransactionManager returns a new TransactionManager for the given repository. @@ -730,6 +765,7 @@ func NewTransactionManager( stagingDirectory: stagingDir, housekeepingManager: housekeepingManager, awaitingTransactions: make(map[LSN]resultChannel), + committedEntries: list.New(), } } @@ -1713,6 +1749,10 @@ func (mgr *TransactionManager) appendLogEntry(nextLSN LSN, logEntry *gitalypb.Lo mgr.mutex.Lock() mgr.appendedLSN = nextLSN mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})} + mgr.committedEntries.PushBack(&committedEntry{ + lsn: nextLSN, + entry: logEntry, + }) mgr.mutex.Unlock() return nil @@ -2154,6 +2194,51 @@ func (mgr *TransactionManager) deleteKey(key []byte) error { }) } +// updateCommittedEntry updates the reader counter of the committed entry of the snapshot that this transaction depends on. +func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN LSN) (*committedEntry, error) { + // Since the goroutine doing this is holding the lock, the snapshotLSN shouldn't change and no new transactions + // can be committed or added. That should guarantee .Back() is always the latest transaction and the one we're + // using to base our snapshot on. + if elm := mgr.committedEntries.Back(); elm != nil { + entry := elm.Value.(*committedEntry) + entry.snapshotReaders++ + return entry, nil + } + + entry := &committedEntry{ + lsn: snapshotLSN, + snapshotReaders: 1, + // The log entry is left nil. This doesn't matter as the conflict checking only + // needs it when checking for conflicts with transactions committed after we took + // our snapshot. + // + // This `committedEntry` only exists to record the `snapshotReaders` at this LSN. + } + + mgr.committedEntries.PushBack(entry) + + return entry, nil +} + +// cleanCommittedEntry reduces the snapshot readers counter of the committed entry. It also removes entries with no more +// readers at the head of the list. +func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) { + entry.snapshotReaders-- + + elm := mgr.committedEntries.Front() + for elm != nil { + front := mgr.committedEntries.Front().Value.(*committedEntry) + if front.snapshotReaders > 0 { + // If the first entry had still some snapshot readers, that means + // our transaction was not the oldest reader. We can't remove any entries + // as they'll still be needed for conlict checking the older transactions. + return + } + mgr.committedEntries.Remove(elm) + elm = mgr.committedEntries.Front() + } +} + // keyAppliedLSN returns the database key storing a partition's last applied log entry's LSN. func keyAppliedLSN(ptnID partitionID) []byte { return []byte(fmt.Sprintf("partition/%s/applied_lsn", ptnID.MarshalBinary())) diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go index 2a2b4e41e..eed2ff37b 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go @@ -3,6 +3,7 @@ package storagemgr import ( "archive/tar" "bytes" + "container/list" "context" "encoding/hex" "errors" @@ -250,6 +251,7 @@ func TestTransactionManager(t *testing.T) { var testCases []transactionTestCase subTests := [][]transactionTestCase{ generateCommonTests(t, ctx, setup), + generateCommittedEntriesTests(t, setup), generateInvalidReferencesTests(t, setup), generateModifyReferencesTests(t, setup), generateCreateRepositoryTests(t, setup), @@ -1508,6 +1510,258 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio } } +func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { + assertCommittedEntries := func(t *testing.T, expected []*committedEntry, actualList *list.List) { + require.Equal(t, len(expected), actualList.Len()) + + i := 0 + for elm := actualList.Front(); elm != nil; elm = elm.Next() { + actual := elm.Value.(*committedEntry) + require.Equal(t, expected[i].lsn, actual.lsn) + require.Equal(t, expected[i].snapshotReaders, actual.snapshotReaders) + testhelper.ProtoEqual(t, expected[i].entry, actual.entry) + i++ + } + } + + refChangeLogEntry := func(ref string, oid git.ObjectID) *gitalypb.LogEntry { + return &gitalypb.LogEntry{ + RelativePath: setup.RelativePath, + ReferenceTransactions: []*gitalypb.LogEntry_ReferenceTransaction{ + { + Changes: []*gitalypb.LogEntry_ReferenceTransaction_Change{ + { + ReferenceName: []byte(ref), + NewOid: []byte(oid), + }, + }, + }, + }, + } + } + + return []transactionTestCase{ + { + desc: "manager has just initialized", + steps: steps{ + StartManager{}, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + }), + }, + }, + { + desc: "a transaction has one reader", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{ + { + lsn: 0, + snapshotReaders: 1, + }, + }, tm.committedEntries) + }), + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + }), + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{ + { + lsn: 1, + snapshotReaders: 1, + }, + }, tm.committedEntries) + }), + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + }), + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: string(setup.Commits.First.OID)}, + {Name: "refs/heads/main", Target: string(setup.Commits.First.OID)}, + }, + }, + }, + }, + }, + { + desc: "a transaction has multiple readers", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + Begin{ + TransactionID: 3, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{ + { + lsn: 1, + snapshotReaders: 2, + }, + }, tm.committedEntries) + }), + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{ + { + lsn: 1, + snapshotReaders: 1, + }, + { + lsn: 2, + entry: refChangeLogEntry("refs/heads/branch-1", setup.Commits.First.OID), + }, + }, tm.committedEntries) + }), + Begin{ + TransactionID: 4, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 2, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{ + { + lsn: 1, + snapshotReaders: 1, + }, + { + lsn: 2, + snapshotReaders: 1, + entry: refChangeLogEntry("refs/heads/branch-1", setup.Commits.First.OID), + }, + }, tm.committedEntries) + }), + Commit{ + TransactionID: 3, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-2": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{ + { + lsn: 2, + entry: refChangeLogEntry("refs/heads/branch-1", setup.Commits.First.OID), + snapshotReaders: 1, + }, + { + lsn: 3, + entry: refChangeLogEntry("refs/heads/branch-2", setup.Commits.First.OID), + }, + }, tm.committedEntries) + }), + Rollback{ + TransactionID: 4, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + }), + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(3).toProto(), + }, + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: string(setup.Commits.First.OID)}, + {Name: "refs/heads/branch-2", Target: string(setup.Commits.First.OID)}, + {Name: "refs/heads/main", Target: string(setup.Commits.First.OID)}, + }, + }, + }, + }, + }, + { + desc: "committed read-only transaction are not kept", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + ReadOnly: true, + }, + Commit{ + TransactionID: 1, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + }), + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + ReadOnly: true, + }, + Commit{ + TransactionID: 2, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + }), + }, + expectedState: StateAssertion{ + Database: DatabaseState{}, + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + }, + }, + }, + }, + } +} + // BenchmarkTransactionManager benchmarks the transaction throughput of the TransactionManager at various levels // of concurrency and transaction sizes. func BenchmarkTransactionManager(b *testing.B) { -- cgit v1.2.3 From 1b6d98de7877a286e068e2a1c985071d5da20386 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 4 Dec 2023 20:58:14 +0700 Subject: Add pack-refs housekeeping task support to the transaction manager This MR adds support for the pack-refs housekeeping task to the transaction manager. The caller calls (*Transactionmanager).PackRefs() to signify a pack-refs task. The task spans different states of the transaction: - When the transaction is committed, the manager runs `git-pack-refs` command. The artifacts are collected and attached to the transaction's staging directory. - When the transaction is admitted, the manager verifies whether the result of `git-pack-refs` conflicts with the current repository state. If everything is fine, the manager appends a log entry with the corresponding housekeeping sub-entry. The transaction's staging directory is renamed to the WAL entry's files. - When the log entry is applied, the manager copies the packed-refs file to the repository via hard-linking and removes all associated loose references. --- .../gitaly/storage/storagemgr/testhelper_test.go | 94 ++ .../storage/storagemgr/transaction_manager.go | 379 +++++- .../transaction_manager_housekeeping_test.go | 1214 ++++++++++++++++++++ .../storage/storagemgr/transaction_manager_test.go | 11 + 4 files changed, 1679 insertions(+), 19 deletions(-) create mode 100644 internal/gitaly/storage/storagemgr/transaction_manager_housekeeping_test.go diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 0618b1146..73b2cdd71 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -3,12 +3,14 @@ package storagemgr import ( "bytes" "context" + "errors" "fmt" "io/fs" "os" "path/filepath" "reflect" "sort" + "strings" "sync" "testing" @@ -48,6 +50,17 @@ type RepositoryState struct { Objects []git.ObjectID // Alternate is the content of 'objects/info/alternates'. Alternate string + // PackedRefs is the expected state of the packed-refs and loose references. + PackedRefs *PackedRefsState +} + +// PackedRefsState describes the asserted state of packed-refs and loose references. It's mostly used for verifying +// pack-refs housekeeping task. +type PackedRefsState struct { + // PackedRefsContent is the content of pack-refs file, line by line + PackedRefsContent []string + // LooseReferences is the exact list of loose references outside packed-refs. + LooseReferences map[git.ReferenceName]git.ObjectID } // RequireRepositoryState asserts the given repository matches the expected state. @@ -63,6 +76,27 @@ func RequireRepositoryState(tb testing.TB, ctx context.Context, cfg config.Cfg, actualReferences, err := repo.GetReferences(ctx) require.NoError(tb, err) + actualPackedRefsState, err := collectPackedRefsState(tb, expected, repoPath) + require.NoError(tb, err) + + // Assert if there is any empty directory in the refs hierarchy excepts for heads and tags + rootRefsDir := filepath.Join(repoPath, "refs") + ignoredDirs := map[string]struct{}{ + rootRefsDir: {}, + filepath.Join(rootRefsDir, "heads"): {}, + filepath.Join(rootRefsDir, "tags"): {}, + } + require.NoError(tb, filepath.WalkDir(rootRefsDir, func(path string, entry fs.DirEntry, err error) error { + if entry.IsDir() { + if _, exist := ignoredDirs[path]; !exist { + isEmpty, err := isDirEmpty(path) + require.NoError(tb, err) + require.Falsef(tb, isEmpty, "there shouldn't be any empty directory in the refs hierarchy %s", path) + } + } + return nil + })) + expectedObjects := []git.ObjectID{} if expected.Objects != nil { expectedObjects = expected.Objects @@ -90,17 +124,57 @@ func RequireRepositoryState(tb testing.TB, ctx context.Context, cfg config.Cfg, References: expected.References, Objects: expectedObjects, Alternate: expected.Alternate, + PackedRefs: expected.PackedRefs, }, RepositoryState{ DefaultBranch: headReference, References: actualReferences, Objects: actualObjects, Alternate: string(alternate), + PackedRefs: actualPackedRefsState, }, ) testhelper.RequireDirectoryState(tb, filepath.Join(repoPath, repoutil.CustomHooksDir), "", expected.CustomHooks) } +func collectPackedRefsState(tb testing.TB, expected RepositoryState, repoPath string) (*PackedRefsState, error) { + if expected.PackedRefs == nil { + return nil, nil + } + + packRefsFile, err := os.ReadFile(filepath.Join(repoPath, "packed-refs")) + if errors.Is(err, os.ErrNotExist) { + // Treat missing packed-refs file as empty. + packRefsFile = nil + } else { + require.NoError(tb, err) + } + // Walk and collect loose refs. + looseReferences := map[git.ReferenceName]git.ObjectID{} + refsPath := filepath.Join(repoPath, "refs") + require.NoError(tb, filepath.WalkDir(refsPath, func(path string, entry fs.DirEntry, err error) error { + if err != nil { + return err + } + if !entry.IsDir() { + ref, err := filepath.Rel(repoPath, path) + if err != nil { + return fmt.Errorf("extracting ref name: %w", err) + } + oid, err := os.ReadFile(path) + require.NoError(tb, err) + + looseReferences[git.ReferenceName(ref)] = git.ObjectID(strings.TrimSpace(string(oid))) + } + return nil + })) + + return &PackedRefsState{ + PackedRefsContent: strings.Split(strings.TrimSpace(string(packRefsFile)), "\n"), + LooseReferences: looseReferences, + }, nil +} + type repositoryBuilder func(relativePath string) *localrepo.Repo // RepositoryStates describes the state of repositories in a storage. The key is the relative path of a repository that @@ -210,6 +284,11 @@ type testTransactionCommit struct { Pack []byte } +type testTransactionTag struct { + Name string + OID git.ObjectID +} + type testTransactionCommits struct { First testTransactionCommit Second testTransactionCommit @@ -228,6 +307,7 @@ type testTransactionSetup struct { ObjectHash git.ObjectHash NonExistentOID git.ObjectID Commits testTransactionCommits + AnnotatedTags []testTransactionTag } type testTransactionHooks struct { @@ -307,6 +387,12 @@ type CreateRepository struct { Alternate string } +// RunPackRefs calls pack-refs housekeeping task on a transaction. +type RunPackRefs struct { + // TransactionID is the transaction for which the pack-refs task runs. + TransactionID int +} + // Commit calls Commit on a transaction. type Commit struct { // TransactionID identifies the transaction to commit. @@ -681,6 +767,11 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas }, repoutil.WithObjectHash(setup.ObjectHash), )) + case RunPackRefs: + require.Contains(t, openTransactions, step.TransactionID, "test error: pack-refs housekeeping task aborted on committed before beginning it") + + transaction := openTransactions[step.TransactionID] + transaction.PackRefs() case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] @@ -727,6 +818,9 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas setup.Commits.Third.OID, setup.Commits.Diverging.OID, } + for _, tag := range setup.AnnotatedTags { + state.Objects = append(state.Objects, tag.OID) + } } if state.DefaultBranch == "" { diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index bc42e05d6..59dce61cd 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -57,6 +57,25 @@ var ( // errAlternateAlreadyLinked is returned when attempting to set an alternate on a repository that // already has one. errAlternateAlreadyLinked = errors.New("repository already has an alternate") + // errConflictRepositoryDeletion is returned when an operation conflicts with repository deletion in another + // transaction. + errConflictRepositoryDeletion = errors.New("detected an update conflicting with repository deletion") + // errPackRefsConflictRefDeletion is returned when there is a committed ref deletion before pack-refs + // task is committed. The transaction should be aborted. + errPackRefsConflictRefDeletion = errors.New("detected a conflict with reference deletion when committing packed-refs") + // errHousekeepingConflictOtherUpdates is returned when the transaction includes housekeeping alongside + // with other updates. + errHousekeepingConflictOtherUpdates = errors.New("housekeeping in the same transaction with other updates") + // errHousekeepingConflictConcurrent is returned when there are another concurrent housekeeping task. + errHousekeepingConflictConcurrent = errors.New("conflict with another concurrent housekeeping task") + + // Below errors are used to error out in cases when updates have been staged in a read-only transaction. + errReadOnlyReferenceUpdates = errors.New("reference updates staged in a read-only transaction") + errReadOnlyDefaultBranchUpdate = errors.New("default branch update staged in a read-only transaction") + errReadOnlyCustomHooksUpdate = errors.New("custom hooks update staged in a read-only transaction") + errReadOnlyRepositoryDeletion = errors.New("repository deletion staged in a read-only transaction") + errReadOnlyObjectsIncluded = errors.New("objects staged in a read-only transaction") + errReadOnlyHousekeeping = errors.New("housekeeping in a read-only transaction") ) // InvalidReferenceFormatError is returned when a reference name was invalid. @@ -127,6 +146,19 @@ type repositoryCreation struct { objectHash git.ObjectHash } +// runHousekeeping models housekeeping tasks. It is supposed to handle housekeeping tasks for repositories +// such as the cleanup of unneeded files and optimizations for the repository's data structures. +type runHousekeeping struct { + packRefs *runPackRefs +} + +// runPackRefs models refs packing housekeeping task. It packs heads and tags for efficient repository access. +type runPackRefs struct { + // PrunedRefs contain a list of references pruned by the `git-pack-refs` command. They are used + // for comparing to the ref list of the destination repository + PrunedRefs map[git.ReferenceName]struct{} +} + // ReferenceUpdates contains references to update. Reference name is used as the key and the value // is the expected old tip and the desired new tip. type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate @@ -214,6 +246,7 @@ type Transaction struct { deleteRepository bool includedObjects map[git.ObjectID]struct{} alternateUpdate *alternateUpdate + runHousekeeping *runHousekeeping } // Begin opens a new transaction. The caller must call either Commit or Rollback to release @@ -381,15 +414,6 @@ func (txn *Transaction) updateState(newState transactionState) error { } } -// Below errors are used to error out in cases when updates have been staged in a read-only transaction. -var ( - errReadOnlyReferenceUpdates = errors.New("reference updates staged in a read-only transaction") - errReadOnlyDefaultBranchUpdate = errors.New("default branch update staged in a read-only transaction") - errReadOnlyCustomHooksUpdate = errors.New("custom hooks update staged in a read-only transaction") - errReadOnlyRepositoryDeletion = errors.New("repository deletion staged in a read-only transaction") - errReadOnlyObjectsIncluded = errors.New("objects staged in a read-only transaction") -) - // Commit performs the changes. If no error is returned, the transaction was successful and the changes // have been performed. If an error was returned, the transaction may or may not be persisted. func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { @@ -418,11 +442,21 @@ func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { return errReadOnlyRepositoryDeletion case txn.includedObjects != nil: return errReadOnlyObjectsIncluded + case txn.runHousekeeping != nil: + return errReadOnlyHousekeeping default: return nil } } + if txn.runHousekeeping != nil && (txn.referenceUpdates != nil || + txn.defaultBranchUpdate != nil || + txn.customHooksUpdate != nil || + txn.deleteRepository || + txn.includedObjects != nil) { + return errHousekeepingConflictOtherUpdates + } + return txn.commit(ctx, txn) } @@ -579,6 +613,17 @@ func (txn *Transaction) SetCustomHooks(customHooksTAR []byte) { txn.customHooksUpdate = &CustomHooksUpdate{CustomHooksTAR: customHooksTAR} } +// PackRefs sets pack-refs housekeeping task as a part of the transaction. The transaction can only runs other +// housekeeping tasks in the same transaction. No other updates are allowed. +func (txn *Transaction) PackRefs() { + if txn.runHousekeeping == nil { + txn.runHousekeeping = &runHousekeeping{} + } + txn.runHousekeeping.packRefs = &runPackRefs{ + PrunedRefs: map[git.ReferenceName]struct{}{}, + } +} + // IncludeObject includes the given object and its dependencies in the transaction's logged pack file even // if the object is unreachable from the references. func (txn *Transaction) IncludeObject(oid git.ObjectID) { @@ -790,6 +835,11 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact } } + // Create a directory to store all staging files. + if err := os.Mkdir(transaction.walFilesPath(), perm.PrivateDir); err != nil { + return fmt.Errorf("create wal files directory: %w", err) + } + if err := mgr.setupStagingRepository(ctx, transaction); err != nil { return fmt.Errorf("setup staging repository: %w", err) } @@ -802,6 +852,10 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact return fmt.Errorf("pack objects: %w", err) } + if err := mgr.prepareHousekeeping(ctx, transaction); err != nil { + return fmt.Errorf("preparing housekeeping: %w", err) + } + select { case mgr.admissionQueue <- transaction: transaction.admitted = true @@ -1053,10 +1107,6 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra group.Go(func() (returnedErr error) { defer packReader.CloseWithError(returnedErr) - if err := os.Mkdir(transaction.walFilesPath(), perm.PrivateDir); err != nil { - return fmt.Errorf("create wal files directory: %w", err) - } - // index-pack places the pack, index, and reverse index into the repository's object directory. // The staging repository is configured with a quarantine so we execute it there. var stdout, stderr bytes.Buffer @@ -1088,6 +1138,92 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra return group.Wait() } +// prepareHousekeeping composes and prepares necessary steps on the staging repository before the changes are staged and +// applied. All commands run in the scope of the staging repository. Thus, we can avoid any impact on other concurrent +// transactions. +func (mgr *TransactionManager) prepareHousekeeping(ctx context.Context, transaction *Transaction) error { + if transaction.runHousekeeping == nil { + return nil + } + if err := mgr.preparePackRefs(ctx, transaction); err != nil { + return err + } + return nil +} + +// preparePackRefs runs git-pack-refs command against the snapshot repository. It collects the resulting packed-refs +// file and the list of pruned references. Unfortunately, git-pack-refs doesn't output which refs are pruned. So, we +// performed two ref walkings before and after running the command. The difference between the two walks is the list of +// pruned refs. This workaround works but is not performant on large repositories with huge amount of loose references. +// Smaller repositories or ones that run housekeeping frequent won't have this issue. +// The work of adding pruned refs dump to `git-pack-refs` is tracked here: +// https://gitlab.com/gitlab-org/git/-/issues/222 +func (mgr *TransactionManager) preparePackRefs(ctx context.Context, transaction *Transaction) error { + if transaction.runHousekeeping.packRefs == nil { + return nil + } + + runPackRefs := transaction.runHousekeeping.packRefs + repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + + if err := mgr.removePackedRefsLocks(mgr.ctx, repoPath); err != nil { + return fmt.Errorf("remove stale packed-refs locks: %w", err) + } + // First walk to collect the list of loose refs. + looseReferences := make(map[git.ReferenceName]struct{}) + if err := filepath.WalkDir(filepath.Join(repoPath, "refs"), func(path string, entry fs.DirEntry, err error) error { + if err != nil { + return err + } + if !entry.IsDir() { + // Get fully qualified refs. + ref, err := filepath.Rel(repoPath, path) + if err != nil { + return fmt.Errorf("extracting ref name: %w", err) + } + looseReferences[git.ReferenceName(ref)] = struct{}{} + } + return nil + }); err != nil { + return fmt.Errorf("initial walking refs directory: %w", err) + } + + // Execute git-pack-refs command. The command runs in the scope of the snapshot repository. Thus, we can + // let it prune the ref references without causing any impact to other concurrent transactions. + var stderr bytes.Buffer + if err := transaction.snapshotRepository.ExecAndWait(ctx, git.Command{ + Name: "pack-refs", + Flags: []git.Option{git.Flag{Name: "--all"}}, + }, git.WithStderr(&stderr)); err != nil { + return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String()) + } + + // Copy the resulting packed-refs file to the WAL directory. + if err := os.Link( + filepath.Join(filepath.Join(repoPath, "packed-refs")), + filepath.Join(transaction.walFilesPath(), "packed-refs"), + ); err != nil { + return fmt.Errorf("copying packed-refs file to WAL directory: %w", err) + } + if err := safe.NewSyncer().Sync(transaction.walFilesPath()); err != nil { + return fmt.Errorf("sync: %w", err) + } + + // Second walk and compare with the initial list of loose references. Any disappeared refs are pruned. + for ref := range looseReferences { + _, err := os.Stat(filepath.Join(repoPath, ref.String())) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + runPackRefs.PrunedRefs[ref] = struct{}{} + } else { + return fmt.Errorf("second walk refs directory: %w", err) + } + } + } + + return nil +} + // unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller. func unwrapExpectedError(err error) error { // The manager controls its own execution context and it is canceled only when Stop is called. @@ -1217,9 +1353,27 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } nextLSN := mgr.appendedLSN + 1 + var shouldStoreWALFiles bool + if transaction.packPrefix != "" { + shouldStoreWALFiles = true logEntry.PackPrefix = transaction.packPrefix + } + + if transaction.deleteRepository { + logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} + } + + if transaction.runHousekeeping != nil { + shouldStoreWALFiles = true + housekeepingEntry, err := mgr.verifyHousekeeping(mgr.ctx, transaction) + if err != nil { + return fmt.Errorf("verifying pack refs: %w", err) + } + logEntry.Housekeeping = housekeepingEntry + } + if shouldStoreWALFiles { removeFiles, err := mgr.storeWALFiles(mgr.ctx, nextLSN, transaction) cleanUps = append(cleanUps, func() error { // The transaction's files might have been moved successfully in to the log. @@ -1239,10 +1393,6 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } } - if transaction.deleteRepository { - logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} - } - return mgr.appendLogEntry(nextLSN, logEntry) }(); err != nil { transaction.result <- err @@ -1439,7 +1589,7 @@ func (mgr *TransactionManager) storeWALFiles(ctx context.Context, lsn LSN, trans } removeFiles = func() error { - if err := os.Remove(destinationPath); err != nil { + if err := os.RemoveAll(destinationPath); err != nil { return fmt.Errorf("remove wal files: %w", err) } @@ -1649,6 +1799,95 @@ func (mgr *TransactionManager) verifyDefaultBranchUpdate(ctx context.Context, tr return nil } +// verifyHousekeeping verifies if all included housekeeping tasks can be performed. Although it's feasible for multiple +// housekeeping tasks running at the same time, it's not guaranteed they are conflict-free. So, we need to ensure there +// is no other concurrent housekeeping task. Each sub-task also needs specific verification. +func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transaction *Transaction) (*gitalypb.LogEntry_Housekeeping, error) { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest appended LSN. + elm := mgr.committedEntries.Front() + for elm != nil { + entry := elm.Value.(*committedEntry) + if entry.lsn > transaction.snapshotLSN && entry.entry.RelativePath == transaction.relativePath { + if entry.entry.GetHousekeeping() != nil { + return nil, errHousekeepingConflictConcurrent + } + if entry.entry.GetRepositoryDeletion() != nil { + return nil, errConflictRepositoryDeletion + } + } + elm = elm.Next() + } + + packRefsEntry, err := mgr.verifyPackRefs(mgr.ctx, transaction) + if err != nil { + return nil, fmt.Errorf("verifying pack refs: %w", err) + } + + return &gitalypb.LogEntry_Housekeeping{ + PackRefs: packRefsEntry, + }, nil +} + +// verifyPackRefs verifies if the pack-refs housekeeping task can be logged. Ideally, we can just apply the packed-refs +// file and prune the loose references. Unfortunately, there could be a ref modification between the time the pack-refs +// command runs and the time this transaction is logged. Thus, we need to verify if the transaction conflicts with the +// current state of the repository. +// +// There are three cases when a reference is modified: +// - Reference creation: this is the easiest case. The new reference exists as a loose reference on disk and shadows the +// one in the packed-ref. +// - Reference update: similarly, the loose reference shadows the one in packed-refs with the new OID. However, we need +// to remove it from the list of pruned references. Otherwise, the repository continues to use the old OID. +// - Reference deletion. When a reference is deleted, both loose reference and the entry in the packed-refs file are +// removed. The reflogs are also removed. In addition, we don't use reflogs in Gitaly as core.logAllRefUpdates defaults +// to false in bare repositories. It could of course be that an admin manually enabled it by modifying the config +// on-disk directly. There is no way to extract reference deletion between two states. +// +// In theory, if there is any reference deletion, it can be removed from the packed-refs file. However, it requires +// parsing and regenerating the packed-refs file. So, let's settle down with a conflict error at this point. +func (mgr *TransactionManager) verifyPackRefs(ctx context.Context, transaction *Transaction) (*gitalypb.LogEntry_Housekeeping_PackRefs, error) { + if transaction.runHousekeeping.packRefs == nil { + return nil, nil + } + + objectHash, err := transaction.stagingRepository.ObjectHash(ctx) + if err != nil { + return nil, fmt.Errorf("object hash: %w", err) + } + packRefs := transaction.runHousekeeping.packRefs + + // Check for any concurrent ref deletion between this transaction's snapshot LSN to the end. + elm := mgr.committedEntries.Front() + for elm != nil { + entry := elm.Value.(*committedEntry) + if entry.lsn > transaction.snapshotLSN && entry.entry.RelativePath == transaction.relativePath { + for _, refTransaction := range entry.entry.ReferenceTransactions { + for _, change := range refTransaction.Changes { + if objectHash.IsZeroOID(git.ObjectID(change.GetNewOid())) { + // Oops, there is a reference deletion. Bail out. + return nil, errPackRefsConflictRefDeletion + } + // Ref update. Remove the updated ref from the list of pruned refs so that the + // new OID in loose reference shadows the outdated OID in packed-refs. + delete(packRefs.PrunedRefs, git.ReferenceName(change.GetReferenceName())) + } + } + } + elm = elm.Next() + } + + var prunedRefs [][]byte + for ref := range packRefs.PrunedRefs { + prunedRefs = append(prunedRefs, []byte(ref)) + } + return &gitalypb.LogEntry_Housekeeping_PackRefs{ + PrunedRefs: prunedRefs, + }, nil +} + // applyDefaultBranchUpdate applies the default branch update to the repository from the log entry. func (mgr *TransactionManager) applyDefaultBranchUpdate(ctx context.Context, logEntry *gitalypb.LogEntry) error { if logEntry.DefaultBranchUpdate == nil { @@ -1807,6 +2046,10 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn LSN) error if err := mgr.applyCustomHooks(ctx, logEntry); err != nil { return fmt.Errorf("apply custom hooks: %w", err) } + + if err := mgr.applyHousekeeping(ctx, lsn, logEntry); err != nil { + return fmt.Errorf("apply housekeeping: %w", err) + } } if err := mgr.storeAppliedLSN(lsn); err != nil { @@ -2126,6 +2369,104 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logEntry *g return nil } +// applyHousekeeping applies housekeeping results to the target repository. +func (mgr *TransactionManager) applyHousekeeping(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry) error { + if logEntry.Housekeeping == nil { + return nil + } + repositoryPath := mgr.getAbsolutePath(logEntry.RelativePath) + if logEntry.Housekeeping.PackRefs != nil { + // Remove packed-refs lock. While we shouldn't be producing any new stale locks, it makes sense to have + // this for historic state until we're certain none of the repositories contain stale locks anymore. + // This clean up is not needed afterward. + if err := mgr.removePackedRefsLocks(ctx, repositoryPath); err != nil { + return fmt.Errorf("applying pack-refs: %w", err) + } + + packedRefsPath := filepath.Join(repositoryPath, "packed-refs") + // Replace the packed-refs file. + if err := os.Remove(packedRefsPath); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("removing existing pack-refs: %w", err) + } + } + if err := os.Link( + filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), "packed-refs"), + packedRefsPath, + ); err != nil { + return fmt.Errorf("linking new packed-refs: %w", err) + } + + modifiedDirs := map[string]struct{}{} + // Prune loose references. The log entry carries the list of fully qualified references to prune. + for _, ref := range logEntry.Housekeeping.PackRefs.PrunedRefs { + path := filepath.Join(repositoryPath, string(ref)) + if err := os.Remove(path); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return structerr.New("pruning loose reference: %w", err).WithMetadata("ref", path) + } + } + modifiedDirs[filepath.Dir(path)] = struct{}{} + } + + syncer := safe.NewSyncer() + // Traverse all modified dirs back to the root "refs" dir of the repository. Remove any empty directory + // along the way. It prevents leaving empty dirs around after a loose ref is pruned. `git-pack-refs` + // command does dir removal for us, but in staginge repository during preparation stage. In the actual + // repository, we need to do it ourselves. + rootRefDir := filepath.Join(repositoryPath, "refs") + for dir := range modifiedDirs { + for dir != rootRefDir { + if isEmpty, err := isDirEmpty(dir); err != nil { + // If a dir does not exist, it properly means a directory may already be deleted by a + // previous interrupted attempt on applying the log entry. We simply ignore the error + // and move up the directory hierarchy. + if errors.Is(err, fs.ErrNotExist) { + dir = filepath.Dir(dir) + continue + } else { + return fmt.Errorf("checking empty ref dir: %w", err) + } + } else if !isEmpty { + break + } + + if err := os.Remove(dir); err != nil { + return fmt.Errorf("removing empty ref dir: %w", err) + } + dir = filepath.Dir(dir) + } + // If there is any empty dir along the way, it's removed and dir pointer moves up until the dir + // is not empty or reaching the root dir. That one should be fsynced to flush the dir removal. + // If there is no empty dir, it stays at the dir of pruned refs, which also needs a flush. + if err := syncer.Sync(dir); err != nil { + return fmt.Errorf("sync dir: %w", err) + } + } + + // Sync the root of the repository to flush packed-refs replacement. + if err := syncer.SyncParent(packedRefsPath); err != nil { + return fmt.Errorf("sync parent: %w", err) + } + } + return nil +} + +// isDirEmpty checks if a directory is empty. +func isDirEmpty(dir string) (bool, error) { + f, err := os.Open(dir) + if err != nil { + return false, err + } + defer f.Close() + + // Read at most one entry from the directory. If we get EOF, the directory is empty + if _, err = f.Readdirnames(1); errors.Is(err, io.EOF) { + return true, nil + } + return false, err +} + // deleteLogEntry deletes the log entry at the given LSN from the log. func (mgr *TransactionManager) deleteLogEntry(lsn LSN) error { return mgr.deleteKey(keyLogEntry(mgr.partitionID, lsn)) @@ -2227,7 +2568,7 @@ func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) { elm := mgr.committedEntries.Front() for elm != nil { - front := mgr.committedEntries.Front().Value.(*committedEntry) + front := elm.Value.(*committedEntry) if front.snapshotReaders > 0 { // If the first entry had still some snapshot readers, that means // our transaction was not the oldest reader. We can't remove any entries diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_housekeeping_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_housekeeping_test.go new file mode 100644 index 000000000..a2b5ad5ea --- /dev/null +++ b/internal/gitaly/storage/storagemgr/transaction_manager_housekeeping_test.go @@ -0,0 +1,1214 @@ +package storagemgr + +import ( + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func generateHousekeepingTests(t *testing.T, ctx context.Context, testPartitionID partitionID, relativePath string) []transactionTestCase { + customSetup := func(t *testing.T, ctx context.Context, testPartitionID partitionID, relativePath string) testTransactionSetup { + setup := setupTest(t, ctx, testPartitionID, relativePath) + gittest.WriteRef(t, setup.Config, setup.RepositoryPath, "refs/heads/main", setup.Commits.First.OID) + gittest.WriteRef(t, setup.Config, setup.RepositoryPath, "refs/heads/branch-1", setup.Commits.Second.OID) + gittest.WriteRef(t, setup.Config, setup.RepositoryPath, "refs/heads/branch-2", setup.Commits.Third.OID) + + gittest.WriteTag(t, setup.Config, setup.RepositoryPath, "v1.0.0", setup.Commits.Diverging.OID.Revision()) + annotatedTag := gittest.WriteTag(t, setup.Config, setup.RepositoryPath, "v2.0.0", setup.Commits.Diverging.OID.Revision(), gittest.WriteTagConfig{ + Message: "annotated tag", + }) + setup.AnnotatedTags = append(setup.AnnotatedTags, testTransactionTag{ + Name: "v2.0.0", + OID: annotatedTag, + }) + + return setup + } + setup := customSetup(t, ctx, testPartitionID, relativePath) + lightweightTag := setup.Commits.Diverging.OID + annotatedTag := setup.AnnotatedTags[0] + + directoryStateWithPackedRefs := func(lsn LSN) testhelper.DirectoryState { + return testhelper.DirectoryState{ + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + // LSN is when a log entry is appended, it's different from transaction ID. + fmt.Sprintf("/wal/%d", lsn): {Mode: fs.ModeDir | perm.PrivateDir}, + fmt.Sprintf("/wal/%s/packed-refs", lsn): packRefsDirectoryEntry(setup.Config), + } + } + + defaultRefs := []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: lightweightTag.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + } + + return []transactionTestCase{ + { + desc: "run pack-refs on a repository without packed-refs", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Second.OID}, + }, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: directoryStateWithPackedRefs(1), + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + // `main` points to the second commit now + {Name: "refs/heads/main", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: lightweightTag.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), + // But `main` in packed-refs file points to the first + // commit. + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{ + // It's shadowed by the loose reference. + "refs/heads/main": setup.Commits.Second.OID, + }, + }, + }, + }, + }, + }, + { + desc: "run pack-refs on a repository with an existing packed-refs", + customSetup: customSetup, + steps: steps{ + StartManager{ + ModifyStorage: func(tb testing.TB, cfg config.Cfg, storagePath string) { + repoPath := filepath.Join(storagePath, setup.RelativePath) + // Execute pack-refs command without going through transaction manager + gittest.Exec(tb, cfg, "-C", repoPath, "pack-refs", "--all") + + // Add artifactual packed-refs.lock. The pack-refs task should ignore + // the lock and move on. + require.NoError(t, os.WriteFile( + filepath.Join(repoPath, "packed-refs.lock"), + []byte{}, + perm.PrivateFile, + )) + require.NoError(t, os.WriteFile( + filepath.Join(repoPath, "packed-refs.new"), + []byte{}, + perm.PrivateFile, + )) + }, + }, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Second.OID}, + "refs/heads/branch-3": {OldOID: gittest.DefaultObjectHash.ZeroOID, NewOID: setup.Commits.Diverging.OID}, + }, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + RunPackRefs{ + TransactionID: 2, + }, + Commit{ + TransactionID: 2, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: directoryStateWithPackedRefs(2), + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/branch-3", Target: setup.Commits.Diverging.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: lightweightTag.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + // All refs are packed to the packed-refs file. + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), + fmt.Sprintf("%s refs/heads/branch-3", setup.Commits.Diverging.OID.String()), + fmt.Sprintf("%s refs/heads/main", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + }, + }, + }, + }, + { + desc: "run pack-refs, all refs outside refs/heads and refs/tags are packed", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/keep-around/1": {OldOID: gittest.DefaultObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + "refs/merge-requests/1": {OldOID: gittest.DefaultObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + "refs/very/deep/nested/ref": {OldOID: gittest.DefaultObjectHash.ZeroOID, NewOID: setup.Commits.Third.OID}, + }, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + RunPackRefs{ + TransactionID: 2, + }, + Commit{ + TransactionID: 2, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: directoryStateWithPackedRefs(2), + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + {Name: "refs/keep-around/1", Target: setup.Commits.First.OID.String()}, + {Name: "refs/merge-requests/1", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: lightweightTag.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + {Name: "refs/very/deep/nested/ref", Target: setup.Commits.Third.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + fmt.Sprintf("%s refs/keep-around/1", setup.Commits.First.OID.String()), + fmt.Sprintf("%s refs/merge-requests/1", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + fmt.Sprintf("%s refs/very/deep/nested/ref", setup.Commits.Third.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + }, + }, + }, + }, + { + desc: "concurrent ref creation before pack-refs task is committed", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-3": {OldOID: gittest.DefaultObjectHash.ZeroOID, NewOID: setup.Commits.Diverging.OID}, + "refs/keep-around/1": {OldOID: gittest.DefaultObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + Commit{ + TransactionID: 1, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: directoryStateWithPackedRefs(2), + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/branch-3", Target: setup.Commits.Diverging.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + {Name: "refs/keep-around/1", Target: setup.Commits.First.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: lightweightTag.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{ + // Although ref creation commits beforehand, pack-refs + // task is unaware of these new refs. It keeps them as + // loose refs. + "refs/heads/branch-3": setup.Commits.Diverging.OID, + "refs/keep-around/1": setup.Commits.First.OID, + }, + }, + }, + }, + }, + }, + { + desc: "concurrent ref creation after pack-refs task is committed", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 1, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-3": {OldOID: gittest.DefaultObjectHash.ZeroOID, NewOID: setup.Commits.Diverging.OID}, + "refs/keep-around/1": {OldOID: gittest.DefaultObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: directoryStateWithPackedRefs(1), + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/branch-3", Target: setup.Commits.Diverging.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + {Name: "refs/keep-around/1", Target: setup.Commits.First.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: lightweightTag.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{ + // pack-refs task is unaware of these new refs. It keeps + // them as loose refs. + "refs/heads/branch-3": setup.Commits.Diverging.OID, + "refs/keep-around/1": setup.Commits.First.OID, + }, + }, + }, + }, + }, + }, + { + desc: "concurrent ref updates before pack-refs task is committed", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Second.OID}, + "refs/heads/branch-1": {OldOID: setup.Commits.Second.OID, NewOID: setup.Commits.Third.OID}, + "refs/heads/branch-2": {OldOID: setup.Commits.Third.OID, NewOID: setup.Commits.Diverging.OID}, + "refs/tags/v1.0.0": {OldOID: setup.Commits.Diverging.OID, NewOID: setup.Commits.First.OID}, + }, + }, + Commit{ + TransactionID: 1, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: directoryStateWithPackedRefs(2), + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Diverging.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: setup.Commits.First.OID.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), // Outdated + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), // Outdated + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), // Outdated + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), // Outdated + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), // Still up-to-date + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{ + // Updated refs shadow the ones in the packed-refs file. + "refs/heads/main": setup.Commits.Second.OID, + "refs/heads/branch-1": setup.Commits.Third.OID, + "refs/heads/branch-2": setup.Commits.Diverging.OID, + "refs/tags/v1.0.0": setup.Commits.First.OID, + }, + }, + }, + }, + }, + }, + { + desc: "concurrent ref updates after pack-refs task is committed", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 1, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Second.OID}, + "refs/heads/branch-1": {OldOID: setup.Commits.Second.OID, NewOID: setup.Commits.Third.OID}, + "refs/heads/branch-2": {OldOID: setup.Commits.Third.OID, NewOID: setup.Commits.Diverging.OID}, + "refs/tags/v1.0.0": {OldOID: setup.Commits.Diverging.OID, NewOID: setup.Commits.First.OID}, + }, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: directoryStateWithPackedRefs(1), + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Diverging.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: setup.Commits.First.OID.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), // Outdated + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), // Outdated + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), // Outdated + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), // Outdated + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{ + "refs/heads/main": setup.Commits.Second.OID, + "refs/heads/branch-1": setup.Commits.Third.OID, + "refs/heads/branch-2": setup.Commits.Diverging.OID, + "refs/tags/v1.0.0": setup.Commits.First.OID, + }, + }, + }, + }, + }, + }, + { + desc: "concurrent ref deletion before pack-refs is committed", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.Commits.Second.OID, NewOID: gittest.DefaultObjectHash.ZeroOID}, + "refs/tags/v1.0.0": {OldOID: lightweightTag, NewOID: gittest.DefaultObjectHash.ZeroOID}, + }, + }, + Commit{ + TransactionID: 1, + ExpectedError: errPackRefsConflictRefDeletion, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(1).toProto(), + }, + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + // Empty packed-refs. It means the pack-refs task is not + // executed. + PackedRefsContent: []string{""}, + // Deleted refs went away. + LooseReferences: map[git.ReferenceName]git.ObjectID{ + "refs/heads/branch-2": setup.Commits.Third.OID, + "refs/heads/main": setup.Commits.First.OID, + "refs/tags/v2.0.0": annotatedTag.OID, + }, + }, + }, + }, + }, + }, + { + desc: "concurrent ref deletion before pack-refs is committed", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID}, + }, + }, + Begin{ + TransactionID: 3, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + Commit{ + TransactionID: 3, + }, + Commit{ + TransactionID: 1, + ExpectedError: errPackRefsConflictRefDeletion, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-1", Target: setup.Commits.Second.OID.String()}, + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/tags/v1.0.0", Target: lightweightTag.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{""}, + LooseReferences: map[git.ReferenceName]git.ObjectID{ + "refs/heads/branch-1": setup.Commits.Second.OID, + "refs/heads/branch-2": setup.Commits.Third.OID, + "refs/tags/v1.0.0": lightweightTag, + "refs/tags/v2.0.0": annotatedTag.OID, + }, + }, + }, + }, + }, + }, + { + desc: "concurrent ref deletion in other repository of a pool", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: "pool", + }, + CreateRepository{ + TransactionID: 1, + References: map[git.ReferenceName]git.ObjectID{ + "refs/heads/main": setup.Commits.First.OID, + }, + Packs: [][]byte{setup.Commits.First.Pack}, + }, + Commit{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: "member", + ExpectedSnapshotLSN: 1, + }, + CreateRepository{ + TransactionID: 2, + Alternate: "../../pool/objects", + }, + Commit{ + TransactionID: 2, + }, + Begin{ + TransactionID: 3, + RelativePath: "member", + ExpectedSnapshotLSN: 2, + }, + Commit{ + TransactionID: 3, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + Begin{ + TransactionID: 4, + RelativePath: "member", + ExpectedSnapshotLSN: 3, + }, + Begin{ + TransactionID: 5, + RelativePath: "pool", + ExpectedSnapshotLSN: 3, + }, + RunPackRefs{ + TransactionID: 5, + }, + Commit{ + TransactionID: 4, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.Commits.First.OID, NewOID: gittest.DefaultObjectHash.ZeroOID}, + }, + }, + Commit{ + TransactionID: 5, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(5).toProto(), + }, + Repositories: RepositoryStates{ + "pool": { + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + }, + "member": { + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + Alternate: "../../pool/objects", + }, + }, + Directory: testhelper.DirectoryState{ + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), + "/wal/1/objects.pack": packFileDirectoryEntry( + setup.Config, + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + "/wal/1/objects.rev": reverseIndexFileDirectoryEntry(setup.Config), + "/wal/5": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/5/packed-refs": packRefsDirectoryEntry(setup.Config), + }, + }, + }, + { + desc: "concurrent ref deletion after pack-refs is committed", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 1, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.Commits.Second.OID, NewOID: gittest.DefaultObjectHash.ZeroOID}, + "refs/tags/v1.0.0": {OldOID: lightweightTag, NewOID: gittest.DefaultObjectHash.ZeroOID}, + }, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: directoryStateWithPackedRefs(1), + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/branch-2", Target: setup.Commits.Third.OID.String()}, + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + {Name: "refs/tags/v2.0.0", Target: annotatedTag.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + }, + }, + }, + }, + { + desc: "empty directories are pruned after interrupted log application", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/empty-dir/parent/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + CloseManager{}, + StartManager{ + Hooks: testTransactionHooks{ + BeforeStoreAppliedLSN: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + RunPackRefs{ + TransactionID: 2, + }, + Commit{ + TransactionID: 2, + ExpectedError: ErrTransactionProcessingStopped, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + StartManager{ + ModifyStorage: func(tb testing.TB, cfg config.Cfg, storagePath string) { + // Create the directory that was removed already by the pack-refs task. + // This way we can assert reapplying the log entry will successfully remove + // the all directories even if the reference deletion was already applied. + require.NoError(tb, os.MkdirAll( + filepath.Join(storagePath, setup.RelativePath, "refs", "heads", "empty-dir"), + perm.PrivateDir, + )) + }, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/2": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/2/packed-refs": packRefsDirectoryEntry(setup.Config), + }, + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/empty-dir/parent/main", Target: setup.Commits.First.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/empty-dir/parent/main", setup.Commits.First.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + }, + }, + }, + }, + { + desc: "housekeeping fails in read-only transaction", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + RelativePath: setup.RelativePath, + ReadOnly: true, + }, + RunPackRefs{}, + Commit{ + ExpectedError: errReadOnlyHousekeeping, + }, + }, + expectedState: StateAssertion{ + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/main", + References: defaultRefs, + }, + }, + }, + }, + { + desc: "housekeeping fails when there are other updates in transaction", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + RelativePath: setup.RelativePath, + }, + RunPackRefs{}, + Commit{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Second.OID}, + }, + ExpectedError: errHousekeepingConflictOtherUpdates, + }, + }, + expectedState: StateAssertion{ + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/main", + References: defaultRefs, + }, + }, + }, + }, + { + desc: "housekeeping transaction runs concurrently with another housekeeping transaction", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + }, + Commit{ + TransactionID: 2, + ExpectedError: errHousekeepingConflictConcurrent, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(1).toProto(), + }, + Directory: directoryStateWithPackedRefs(1), + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/main", + References: defaultRefs, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + }, + }, + }, + }, + { + desc: "housekeeping transaction runs after another housekeeping transaction in other repository of a pool", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: "pool", + }, + CreateRepository{ + TransactionID: 1, + References: map[git.ReferenceName]git.ObjectID{ + "refs/heads/main": setup.Commits.First.OID, + }, + Packs: [][]byte{setup.Commits.First.Pack}, + }, + Commit{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: "member", + ExpectedSnapshotLSN: 1, + }, + CreateRepository{ + TransactionID: 2, + Alternate: "../../pool/objects", + }, + Commit{ + TransactionID: 2, + }, + Begin{ + TransactionID: 3, + RelativePath: "member", + ExpectedSnapshotLSN: 2, + }, + Begin{ + TransactionID: 4, + RelativePath: "pool", + ExpectedSnapshotLSN: 2, + }, + RunPackRefs{ + TransactionID: 3, + }, + RunPackRefs{ + TransactionID: 4, + }, + Commit{ + TransactionID: 3, + }, + Commit{ + TransactionID: 4, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(4).toProto(), + }, + Repositories: RepositoryStates{ + "pool": { + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + }, + "member": { + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + Alternate: "../../pool/objects", + }, + }, + Directory: testhelper.DirectoryState{ + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), + "/wal/1/objects.pack": packFileDirectoryEntry( + setup.Config, + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + "/wal/1/objects.rev": reverseIndexFileDirectoryEntry(setup.Config), + "/wal/3": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/3/packed-refs": packRefsDirectoryEntry(setup.Config), + "/wal/4": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/4/packed-refs": packRefsDirectoryEntry(setup.Config), + }, + }, + }, + { + desc: "housekeeping transaction runs after another housekeeping transaction", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + RunPackRefs{ + TransactionID: 2, + }, + Commit{ + TransactionID: 2, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1/packed-refs": packRefsDirectoryEntry(setup.Config), + "/wal/2": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/2/packed-refs": packRefsDirectoryEntry(setup.Config), + }, + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/main", + References: defaultRefs, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{ + "# pack-refs with: peeled fully-peeled sorted ", + fmt.Sprintf("%s refs/heads/branch-1", setup.Commits.Second.OID.String()), + fmt.Sprintf("%s refs/heads/branch-2", setup.Commits.Third.OID.String()), + fmt.Sprintf("%s refs/heads/main", setup.Commits.First.OID.String()), + fmt.Sprintf("%s refs/tags/v1.0.0", lightweightTag.String()), + fmt.Sprintf("%s refs/tags/v2.0.0", annotatedTag.OID.String()), + fmt.Sprintf("^%s", setup.Commits.Diverging.OID.String()), + }, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + }, + }, + }, + }, + { + desc: "housekeeping transaction runs concurrently with a repository deletion", + customSetup: customSetup, + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + RelativePath: setup.RelativePath, + }, + RunPackRefs{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + RelativePath: setup.RelativePath, + }, + Commit{ + TransactionID: 2, + DeleteRepository: true, + }, + Begin{ + TransactionID: 3, + RelativePath: setup.RelativePath, + ExpectedSnapshotLSN: 1, + }, + CreateRepository{ + TransactionID: 3, + }, + Commit{ + TransactionID: 3, + }, + Commit{ + TransactionID: 1, + ExpectedError: errConflictRepositoryDeletion, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + }, + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/main", + References: nil, + PackedRefs: &PackedRefsState{ + PackedRefsContent: []string{""}, + LooseReferences: map[git.ReferenceName]git.ObjectID{}, + }, + Objects: []git.ObjectID{}, + }, + }, + }, + }, + } +} diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go index eed2ff37b..3383e289c 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go @@ -115,6 +115,16 @@ func packFileDirectoryEntry(cfg config.Cfg, expectedObjects []git.ObjectID) test } } +// packRefsDirectoryEntry returns a DirectoryEntry that checks for the existence of packed-refs file. The content does +// not matter because it will be asserted in the repository state insteaad. +func packRefsDirectoryEntry(cfg config.Cfg) testhelper.DirectoryEntry { + return testhelper.DirectoryEntry{ + Mode: perm.SharedFile, + Content: "", + ParseContent: func(testing.TB, string, []byte) any { return "" }, + } +} + // indexFileDirectoryEntry returns a DirectoryEntry that asserts the given pack file index is valid. func indexFileDirectoryEntry(cfg config.Cfg) testhelper.DirectoryEntry { return testhelper.DirectoryEntry{ @@ -259,6 +269,7 @@ func TestTransactionManager(t *testing.T) { generateDefaultBranchTests(t, setup), generateAlternateTests(t, setup), generateCustomHooksTests(t, setup), + generateHousekeepingTests(t, ctx, testPartitionID, relativePath), } for _, subCases := range subTests { testCases = append(testCases, subCases...) -- cgit v1.2.3