Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.kernel.org/pub/scm/git/git.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÆvar Arnfjörð Bjarmason <avarab@gmail.com>2022-06-07 11:48:19 +0300
committerJunio C Hamano <gitster@pobox.com>2022-06-07 20:01:41 +0300
commitfd3aaf53f713d424d3c08cffa7e76e29b31638ba (patch)
tree153ad1ffcd02c1101faecc489cb09ab0d1b07af0 /run-command.c
parent6cd33dceed60949e2dbc32e3f0f5e67c4c882e1e (diff)
run-command: add an "ungroup" option to run_process_parallel()
Extend the parallel execution API added in c553c72eed6 (run-command: add an asynchronous parallel child processor, 2015-12-15) to support a mode where the stdout and stderr of the processes isn't captured and output in a deterministic order, instead we'll leave it to the kernel and stdio to sort it out. This gives the API same functionality as GNU parallel's --ungroup option. As we'll see in a subsequent commit the main reason to want this is to support stdout and stderr being connected to the TTY in the case of jobs=1, demonstrated here with GNU parallel: $ parallel --ungroup 'test -t {} && echo TTY || echo NTTY' ::: 1 2 TTY TTY $ parallel 'test -t {} && echo TTY || echo NTTY' ::: 1 2 NTTY NTTY Another is as GNU parallel's documentation notes a potential for optimization. As demonstrated in next commit our results with "git hook run" will be similar, but generally speaking this shows that if you want to run processes in parallel where the exact order isn't important this can be a lot faster: $ hyperfine -r 3 -L o ,--ungroup 'parallel {o} seq ::: 10000000 >/dev/null ' Benchmark 1: parallel seq ::: 10000000 >/dev/null Time (mean ± σ): 220.2 ms ± 9.3 ms [User: 124.9 ms, System: 96.1 ms] Range (min … max): 212.3 ms … 230.5 ms 3 runs Benchmark 2: parallel --ungroup seq ::: 10000000 >/dev/null Time (mean ± σ): 154.7 ms ± 0.9 ms [User: 136.2 ms, System: 25.1 ms] Range (min … max): 153.9 ms … 155.7 ms 3 runs Summary 'parallel --ungroup seq ::: 10000000 >/dev/null ' ran 1.42 ± 0.06 times faster than 'parallel seq ::: 10000000 >/dev/null ' A large part of the juggling in the API is to make the API safer for its maintenance and consumers alike. For the maintenance of the API we e.g. avoid malloc()-ing the "pp->pfd", ensuring that SANITIZE=address and other similar tools will catch any unexpected misuse. For API consumers we take pains to never pass the non-NULL "out" buffer to an API user that provided the "ungroup" option. The resulting code in t/helper/test-run-command.c isn't typical of such a user, i.e. they'd typically use one mode or the other, and would know whether they'd provided "ungroup" or not. We could also avoid the strbuf_init() for "buffered_output" by having "struct parallel_processes" use a static PARALLEL_PROCESSES_INIT initializer, but let's leave that cleanup for later. Using a global "run_processes_parallel_ungroup" variable to enable this option is rather nasty, but is being done here to produce as minimal of a change as possible for a subsequent regression fix. This change is extracted from a larger initial version[1] which ends up with a better end-state for the API, but in doing so needed to modify all existing callers of the API. Let's defer that for now, and narrowly focus on what we need for fixing the regression in the subsequent commit. It's safe to do this with a global variable because: A) hook.c is the only user of it that sets it to non-zero, and before we'll get any other API users we'll refactor away this method of passing in the option, i.e. re-roll [1]. B) Even if hook.c wasn't the only user we don't have callers of this API that concurrently invoke this parallel process starting API itself in parallel. As noted above "A" && "B" are rather nasty, and we don't want to live with those caveats long-term, but for now they should be an acceptable compromise. 1. https://lore.kernel.org/git/cover-v2-0.8-00000000000-20220518T195858Z-avarab@gmail.com/ Signed-off-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com> Signed-off-by: Junio C Hamano <gitster@pobox.com>
Diffstat (limited to 'run-command.c')
-rw-r--r--run-command.c70
1 files changed, 51 insertions, 19 deletions
diff --git a/run-command.c b/run-command.c
index a8501e38ce..7ab2dd28f3 100644
--- a/run-command.c
+++ b/run-command.c
@@ -1471,6 +1471,7 @@ enum child_state {
GIT_CP_WAIT_CLEANUP,
};
+int run_processes_parallel_ungroup;
struct parallel_processes {
void *data;
@@ -1494,6 +1495,7 @@ struct parallel_processes {
struct pollfd *pfd;
unsigned shutdown : 1;
+ unsigned ungroup : 1;
int output_owner;
struct strbuf buffered_output; /* of finished children */
@@ -1537,7 +1539,7 @@ static void pp_init(struct parallel_processes *pp,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
task_finished_fn task_finished,
- void *data)
+ void *data, int ungroup)
{
int i;
@@ -1559,15 +1561,21 @@ static void pp_init(struct parallel_processes *pp,
pp->nr_processes = 0;
pp->output_owner = 0;
pp->shutdown = 0;
+ pp->ungroup = ungroup;
CALLOC_ARRAY(pp->children, n);
- CALLOC_ARRAY(pp->pfd, n);
+ if (pp->ungroup)
+ pp->pfd = NULL;
+ else
+ CALLOC_ARRAY(pp->pfd, n);
strbuf_init(&pp->buffered_output, 0);
for (i = 0; i < n; i++) {
strbuf_init(&pp->children[i].err, 0);
child_process_init(&pp->children[i].process);
- pp->pfd[i].events = POLLIN | POLLHUP;
- pp->pfd[i].fd = -1;
+ if (pp->pfd) {
+ pp->pfd[i].events = POLLIN | POLLHUP;
+ pp->pfd[i].fd = -1;
+ }
}
pp_for_signal = pp;
@@ -1615,24 +1623,31 @@ static int pp_start_one(struct parallel_processes *pp)
BUG("bookkeeping is hard");
code = pp->get_next_task(&pp->children[i].process,
- &pp->children[i].err,
+ pp->ungroup ? NULL : &pp->children[i].err,
pp->data,
&pp->children[i].data);
if (!code) {
- strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
- strbuf_reset(&pp->children[i].err);
+ if (!pp->ungroup) {
+ strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+ strbuf_reset(&pp->children[i].err);
+ }
return 1;
}
- pp->children[i].process.err = -1;
- pp->children[i].process.stdout_to_stderr = 1;
+ if (!pp->ungroup) {
+ pp->children[i].process.err = -1;
+ pp->children[i].process.stdout_to_stderr = 1;
+ }
pp->children[i].process.no_stdin = 1;
if (start_command(&pp->children[i].process)) {
- code = pp->start_failure(&pp->children[i].err,
+ code = pp->start_failure(pp->ungroup ? NULL :
+ &pp->children[i].err,
pp->data,
pp->children[i].data);
- strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
- strbuf_reset(&pp->children[i].err);
+ if (!pp->ungroup) {
+ strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+ strbuf_reset(&pp->children[i].err);
+ }
if (code)
pp->shutdown = 1;
return code;
@@ -1640,7 +1655,8 @@ static int pp_start_one(struct parallel_processes *pp)
pp->nr_processes++;
pp->children[i].state = GIT_CP_WORKING;
- pp->pfd[i].fd = pp->children[i].process.err;
+ if (pp->pfd)
+ pp->pfd[i].fd = pp->children[i].process.err;
return 0;
}
@@ -1674,6 +1690,7 @@ static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
static void pp_output(struct parallel_processes *pp)
{
int i = pp->output_owner;
+
if (pp->children[i].state == GIT_CP_WORKING &&
pp->children[i].err.len) {
strbuf_write(&pp->children[i].err, stderr);
@@ -1696,7 +1713,7 @@ static int pp_collect_finished(struct parallel_processes *pp)
code = finish_command(&pp->children[i].process);
- code = pp->task_finished(code,
+ code = pp->task_finished(code, pp->ungroup ? NULL :
&pp->children[i].err, pp->data,
pp->children[i].data);
@@ -1707,10 +1724,13 @@ static int pp_collect_finished(struct parallel_processes *pp)
pp->nr_processes--;
pp->children[i].state = GIT_CP_FREE;
- pp->pfd[i].fd = -1;
+ if (pp->pfd)
+ pp->pfd[i].fd = -1;
child_process_init(&pp->children[i].process);
- if (i != pp->output_owner) {
+ if (pp->ungroup) {
+ ; /* no strbuf_*() work to do here */
+ } else if (i != pp->output_owner) {
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
strbuf_reset(&pp->children[i].err);
} else {
@@ -1747,9 +1767,14 @@ int run_processes_parallel(int n,
int i, code;
int output_timeout = 100;
int spawn_cap = 4;
+ int ungroup = run_processes_parallel_ungroup;
struct parallel_processes pp;
- pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
+ /* unset for the next API user */
+ run_processes_parallel_ungroup = 0;
+
+ pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb,
+ ungroup);
while (1) {
for (i = 0;
i < spawn_cap && !pp.shutdown &&
@@ -1766,8 +1791,15 @@ int run_processes_parallel(int n,
}
if (!pp.nr_processes)
break;
- pp_buffer_stderr(&pp, output_timeout);
- pp_output(&pp);
+ if (ungroup) {
+ int i;
+
+ for (i = 0; i < pp.max_processes; i++)
+ pp.children[i].state = GIT_CP_WAIT_CLEANUP;
+ } else {
+ pp_buffer_stderr(&pp, output_timeout);
+ pp_output(&pp);
+ }
code = pp_collect_finished(&pp);
if (code) {
pp.shutdown = 1;