diff options
Diffstat (limited to 'run-command.c')
-rw-r--r-- | run-command.c | 236 |
1 files changed, 103 insertions, 133 deletions
diff --git a/run-command.c b/run-command.c index 5ec3a46dcc..c772acd743 100644 --- a/run-command.c +++ b/run-command.c @@ -1496,16 +1496,8 @@ enum child_state { GIT_CP_WAIT_CLEANUP, }; -int run_processes_parallel_ungroup; struct parallel_processes { - void *data; - - int max_processes; - int nr_processes; - - get_next_task_fn get_next_task; - start_failure_fn start_failure; - task_finished_fn task_finished; + size_t nr_processes; struct { enum child_state state; @@ -1520,81 +1512,60 @@ struct parallel_processes { struct pollfd *pfd; unsigned shutdown : 1; - unsigned ungroup : 1; - int output_owner; + size_t output_owner; struct strbuf buffered_output; /* of finished children */ }; -static int default_start_failure(struct strbuf *out, - void *pp_cb, - void *pp_task_cb) -{ - return 0; -} +struct parallel_processes_for_signal { + const struct run_process_parallel_opts *opts; + const struct parallel_processes *pp; +}; -static int default_task_finished(int result, - struct strbuf *out, - void *pp_cb, - void *pp_task_cb) +static void kill_children(const struct parallel_processes *pp, + const struct run_process_parallel_opts *opts, + int signo) { - return 0; + for (size_t i = 0; i < opts->processes; i++) + if (pp->children[i].state == GIT_CP_WORKING) + kill(pp->children[i].process.pid, signo); } -static void kill_children(struct parallel_processes *pp, int signo) +static void kill_children_signal(const struct parallel_processes_for_signal *pp_sig, + int signo) { - int i, n = pp->max_processes; - - for (i = 0; i < n; i++) - if (pp->children[i].state == GIT_CP_WORKING) - kill(pp->children[i].process.pid, signo); + kill_children(pp_sig->pp, pp_sig->opts, signo); } -static struct parallel_processes *pp_for_signal; +static struct parallel_processes_for_signal *pp_for_signal; static void handle_children_on_signal(int signo) { - kill_children(pp_for_signal, signo); + kill_children_signal(pp_for_signal, signo); sigchain_pop(signo); raise(signo); } static void pp_init(struct parallel_processes *pp, - int n, - get_next_task_fn get_next_task, - start_failure_fn start_failure, - task_finished_fn task_finished, - void *data, int ungroup) + const struct run_process_parallel_opts *opts, + struct parallel_processes_for_signal *pp_sig) { - int i; + const size_t n = opts->processes; - if (n < 1) - n = online_cpus(); + if (!n) + BUG("you must provide a non-zero number of processes!"); - pp->max_processes = n; + trace_printf("run_processes_parallel: preparing to run up to %"PRIuMAX" tasks", + (uintmax_t)n); - trace_printf("run_processes_parallel: preparing to run up to %d tasks", n); - - pp->data = data; - if (!get_next_task) + if (!opts->get_next_task) BUG("you need to specify a get_next_task function"); - pp->get_next_task = get_next_task; - - pp->start_failure = start_failure ? start_failure : default_start_failure; - pp->task_finished = task_finished ? task_finished : default_task_finished; - pp->nr_processes = 0; - pp->output_owner = 0; - pp->shutdown = 0; - pp->ungroup = ungroup; CALLOC_ARRAY(pp->children, n); - if (pp->ungroup) - pp->pfd = NULL; - else + if (!opts->ungroup) CALLOC_ARRAY(pp->pfd, n); - strbuf_init(&pp->buffered_output, 0); - for (i = 0; i < n; i++) { + for (size_t i = 0; i < n; i++) { strbuf_init(&pp->children[i].err, 0); child_process_init(&pp->children[i].process); if (pp->pfd) { @@ -1603,16 +1574,17 @@ static void pp_init(struct parallel_processes *pp, } } - pp_for_signal = pp; + pp_sig->pp = pp; + pp_sig->opts = opts; + pp_for_signal = pp_sig; sigchain_push_common(handle_children_on_signal); } -static void pp_cleanup(struct parallel_processes *pp) +static void pp_cleanup(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts) { - int i; - trace_printf("run_processes_parallel: done"); - for (i = 0; i < pp->max_processes; i++) { + for (size_t i = 0; i < opts->processes; i++) { strbuf_release(&pp->children[i].err); child_process_clear(&pp->children[i].process); } @@ -1637,39 +1609,45 @@ static void pp_cleanup(struct parallel_processes *pp) * <0 no new job was started, user wishes to shutdown early. Use negative code * to signal the children. */ -static int pp_start_one(struct parallel_processes *pp) +static int pp_start_one(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts) { - int i, code; + size_t i; + int code; - for (i = 0; i < pp->max_processes; i++) + for (i = 0; i < opts->processes; i++) if (pp->children[i].state == GIT_CP_FREE) break; - if (i == pp->max_processes) + if (i == opts->processes) BUG("bookkeeping is hard"); - code = pp->get_next_task(&pp->children[i].process, - pp->ungroup ? NULL : &pp->children[i].err, - pp->data, - &pp->children[i].data); + code = opts->get_next_task(&pp->children[i].process, + opts->ungroup ? NULL : &pp->children[i].err, + opts->data, + &pp->children[i].data); if (!code) { - if (!pp->ungroup) { + if (!opts->ungroup) { strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); strbuf_reset(&pp->children[i].err); } return 1; } - if (!pp->ungroup) { + if (!opts->ungroup) { pp->children[i].process.err = -1; pp->children[i].process.stdout_to_stderr = 1; } pp->children[i].process.no_stdin = 1; if (start_command(&pp->children[i].process)) { - code = pp->start_failure(pp->ungroup ? NULL : - &pp->children[i].err, - pp->data, - pp->children[i].data); - if (!pp->ungroup) { + if (opts->start_failure) + code = opts->start_failure(opts->ungroup ? NULL : + &pp->children[i].err, + opts->data, + pp->children[i].data); + else + code = 0; + + if (!opts->ungroup) { strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); strbuf_reset(&pp->children[i].err); } @@ -1685,19 +1663,21 @@ static int pp_start_one(struct parallel_processes *pp) return 0; } -static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout) +static void pp_buffer_stderr(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts, + int output_timeout) { int i; - while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) { + while ((i = poll(pp->pfd, opts->processes, output_timeout) < 0)) { if (errno == EINTR) continue; - pp_cleanup(pp); + pp_cleanup(pp, opts); die_errno("poll"); } /* Buffer output from all pipes. */ - for (i = 0; i < pp->max_processes; i++) { + for (size_t i = 0; i < opts->processes; i++) { if (pp->children[i].state == GIT_CP_WORKING && pp->pfd[i].revents & (POLLIN | POLLHUP)) { int n = strbuf_read_once(&pp->children[i].err, @@ -1712,9 +1692,9 @@ static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout) } } -static void pp_output(struct parallel_processes *pp) +static void pp_output(const struct parallel_processes *pp) { - int i = pp->output_owner; + size_t i = pp->output_owner; if (pp->children[i].state == GIT_CP_WORKING && pp->children[i].err.len) { @@ -1723,24 +1703,28 @@ static void pp_output(struct parallel_processes *pp) } } -static int pp_collect_finished(struct parallel_processes *pp) +static int pp_collect_finished(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts) { - int i, code; - int n = pp->max_processes; + int code; + size_t i; int result = 0; while (pp->nr_processes > 0) { - for (i = 0; i < pp->max_processes; i++) + for (i = 0; i < opts->processes; i++) if (pp->children[i].state == GIT_CP_WAIT_CLEANUP) break; - if (i == pp->max_processes) + if (i == opts->processes) break; code = finish_command(&pp->children[i].process); - code = pp->task_finished(code, pp->ungroup ? NULL : - &pp->children[i].err, pp->data, - pp->children[i].data); + if (opts->task_finished) + code = opts->task_finished(code, opts->ungroup ? NULL : + &pp->children[i].err, opts->data, + pp->children[i].data); + else + code = 0; if (code) result = code; @@ -1753,12 +1737,14 @@ static int pp_collect_finished(struct parallel_processes *pp) pp->pfd[i].fd = -1; child_process_init(&pp->children[i].process); - if (pp->ungroup) { + if (opts->ungroup) { ; /* no strbuf_*() work to do here */ } else if (i != pp->output_owner) { strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); strbuf_reset(&pp->children[i].err); } else { + const size_t n = opts->processes; + strbuf_write(&pp->children[i].err, stderr); strbuf_reset(&pp->children[i].err); @@ -1783,76 +1769,60 @@ static int pp_collect_finished(struct parallel_processes *pp) return result; } -int run_processes_parallel(int n, - get_next_task_fn get_next_task, - start_failure_fn start_failure, - task_finished_fn task_finished, - void *pp_cb) +void run_processes_parallel(const struct run_process_parallel_opts *opts) { int i, code; int output_timeout = 100; int spawn_cap = 4; - int ungroup = run_processes_parallel_ungroup; - struct parallel_processes pp; - - /* unset for the next API user */ - run_processes_parallel_ungroup = 0; - - pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb, - ungroup); + struct parallel_processes_for_signal pp_sig; + struct parallel_processes pp = { + .buffered_output = STRBUF_INIT, + }; + /* options */ + const char *tr2_category = opts->tr2_category; + const char *tr2_label = opts->tr2_label; + const int do_trace2 = tr2_category && tr2_label; + + if (do_trace2) + trace2_region_enter_printf(tr2_category, tr2_label, NULL, + "max:%d", opts->processes); + + pp_init(&pp, opts, &pp_sig); while (1) { for (i = 0; i < spawn_cap && !pp.shutdown && - pp.nr_processes < pp.max_processes; + pp.nr_processes < opts->processes; i++) { - code = pp_start_one(&pp); + code = pp_start_one(&pp, opts); if (!code) continue; if (code < 0) { pp.shutdown = 1; - kill_children(&pp, -code); + kill_children(&pp, opts, -code); } break; } if (!pp.nr_processes) break; - if (ungroup) { - int i; - - for (i = 0; i < pp.max_processes; i++) + if (opts->ungroup) { + for (size_t i = 0; i < opts->processes; i++) pp.children[i].state = GIT_CP_WAIT_CLEANUP; } else { - pp_buffer_stderr(&pp, output_timeout); + pp_buffer_stderr(&pp, opts, output_timeout); pp_output(&pp); } - code = pp_collect_finished(&pp); + code = pp_collect_finished(&pp, opts); if (code) { pp.shutdown = 1; if (code < 0) - kill_children(&pp, -code); + kill_children(&pp, opts,-code); } } - pp_cleanup(&pp); - return 0; -} - -int run_processes_parallel_tr2(int n, get_next_task_fn get_next_task, - start_failure_fn start_failure, - task_finished_fn task_finished, void *pp_cb, - const char *tr2_category, const char *tr2_label) -{ - int result; + pp_cleanup(&pp, opts); - trace2_region_enter_printf(tr2_category, tr2_label, NULL, "max:%d", - ((n < 1) ? online_cpus() : n)); - - result = run_processes_parallel(n, get_next_task, start_failure, - task_finished, pp_cb); - - trace2_region_leave(tr2_category, tr2_label, NULL); - - return result; + if (do_trace2) + trace2_region_leave(tr2_category, tr2_label, NULL); } int run_auto_maintenance(int quiet) |