diff options
Diffstat (limited to 'core/src/dird/msgchan.cc')
-rw-r--r-- | core/src/dird/msgchan.cc | 91 |
1 files changed, 46 insertions, 45 deletions
diff --git a/core/src/dird/msgchan.cc b/core/src/dird/msgchan.cc index eea3cbd3f..e062f3ccf 100644 --- a/core/src/dird/msgchan.cc +++ b/core/src/dird/msgchan.cc @@ -37,7 +37,7 @@ #include "dird.h" #include "dird/getmsg.h" #include "dird/job.h" -#include "dird/jcr_private.h" +#include "dird/director_jcr_impl.h" #include "dird/msgchan.h" #include "dird/quota.h" #include "dird/sd_cmds.h" @@ -110,9 +110,9 @@ static inline bool SendBootstrapFileToSd(JobControlRecord* jcr, while (fgets(buf, sizeof(buf), bs)) { sd->fsend("%s", buf); } sd->signal(BNET_EOD); fclose(bs); - if (jcr->impl->unlink_bsr) { + if (jcr->dir_impl->unlink_bsr) { SecureErase(jcr, jcr->RestoreBootstrap); - jcr->impl->unlink_bsr = false; + jcr->dir_impl->unlink_bsr = false; } return true; } @@ -148,32 +148,32 @@ bool StartStorageDaemonJob(JobControlRecord* jcr, } // Now send JobId and permissions, and get back the authorization key. - PmStrcpy(job_name, jcr->impl->res.job->resource_name_); + PmStrcpy(job_name, jcr->dir_impl->res.job->resource_name_); BashSpaces(job_name); - if (jcr->impl->res.client) { - PmStrcpy(client_name, jcr->impl->res.client->resource_name_); + if (jcr->dir_impl->res.client) { + PmStrcpy(client_name, jcr->dir_impl->res.client->resource_name_); } else { PmStrcpy(client_name, "**None**"); } BashSpaces(client_name); - if (jcr->impl->res.fileset) { - PmStrcpy(fileset_name, jcr->impl->res.fileset->resource_name_); + if (jcr->dir_impl->res.fileset) { + PmStrcpy(fileset_name, jcr->dir_impl->res.fileset->resource_name_); } else { PmStrcpy(fileset_name, "**None**"); } BashSpaces(fileset_name); - PmStrcpy(backup_format, jcr->impl->backup_format); + PmStrcpy(backup_format, jcr->dir_impl->backup_format); BashSpaces(backup_format); - if (jcr->impl->res.fileset && jcr->impl->res.fileset->MD5[0] == 0) { - bstrncpy(jcr->impl->res.fileset->MD5, "**Dummy**", - sizeof(jcr->impl->res.fileset->MD5)); - fileset_md5 = jcr->impl->res.fileset->MD5; - } else if (jcr->impl->res.fileset) { - fileset_md5 = jcr->impl->res.fileset->MD5; + if (jcr->dir_impl->res.fileset && jcr->dir_impl->res.fileset->MD5[0] == 0) { + bstrncpy(jcr->dir_impl->res.fileset->MD5, "**Dummy**", + sizeof(jcr->dir_impl->res.fileset->MD5)); + fileset_md5 = jcr->dir_impl->res.fileset->MD5; + } else if (jcr->dir_impl->res.fileset) { + fileset_md5 = jcr->dir_impl->res.fileset->MD5; } else { fileset_md5 = "**Dummy**"; } @@ -184,7 +184,7 @@ bool StartStorageDaemonJob(JobControlRecord* jcr, * If we do not cancel it the SD will not accept a new connection * for the same jobid. */ - if (jcr->impl->reschedule_count) { + if (jcr->dir_impl->reschedule_count) { sd->fsend("cancel Job=%s\n", jcr->Job); while (sd->recv() >= 0) { continue; } } @@ -195,10 +195,11 @@ bool StartStorageDaemonJob(JobControlRecord* jcr, sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, job_name.c_str(), client_name.c_str(), jcr->getJobType(), jcr->getJobLevel(), - fileset_name.c_str(), !jcr->impl->res.pool->catalog_files, - jcr->impl->res.job->SpoolAttributes, fileset_md5, - jcr->impl->spool_data, jcr->impl->res.job->PreferMountedVolumes, - edit_int64(jcr->impl->spool_size, ed2), jcr->rerunning, + fileset_name.c_str(), !jcr->dir_impl->res.pool->catalog_files, + jcr->dir_impl->res.job->SpoolAttributes, fileset_md5, + jcr->dir_impl->spool_data, + jcr->dir_impl->res.job->PreferMountedVolumes, + edit_int64(jcr->dir_impl->spool_size, ed2), jcr->rerunning, jcr->VolSessionId, jcr->VolSessionTime, remainingquota, jcr->getJobProtocol(), backup_format.c_str()); @@ -251,11 +252,11 @@ bool StartStorageDaemonJob(JobControlRecord* jcr, /* For the moment, only migrate, copy and vbackup have rpool */ if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) || (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) { - PmStrcpy(pool_type, jcr->impl->res.rpool->pool_type); - PmStrcpy(pool_name, jcr->impl->res.rpool->resource_name_); + PmStrcpy(pool_type, jcr->dir_impl->res.rpool->pool_type); + PmStrcpy(pool_name, jcr->dir_impl->res.rpool->resource_name_); } else { - PmStrcpy(pool_type, jcr->impl->res.pool->pool_type); - PmStrcpy(pool_name, jcr->impl->res.pool->resource_name_); + PmStrcpy(pool_type, jcr->dir_impl->res.pool->pool_type); + PmStrcpy(pool_name, jcr->dir_impl->res.pool->resource_name_); } BashSpaces(pool_type); BashSpaces(pool_name); @@ -294,8 +295,8 @@ bool StartStorageDaemonJob(JobControlRecord* jcr, /* Do write side of storage daemon */ if (ok && write_storage) { - PmStrcpy(pool_type, jcr->impl->res.pool->pool_type); - PmStrcpy(pool_name, jcr->impl->res.pool->resource_name_); + PmStrcpy(pool_type, jcr->dir_impl->res.pool->pool_type); + PmStrcpy(pool_name, jcr->dir_impl->res.pool->resource_name_); BashSpaces(pool_type); BashSpaces(pool_name); foreach_alist (storage, write_storage) { @@ -358,8 +359,8 @@ bool StartStorageDaemonMessageThread(JobControlRecord* jcr) pthread_t thid; jcr->IncUseCount(); /* mark in use by msg thread */ - jcr->impl->sd_msg_thread_done = false; - jcr->impl->SD_msg_chan_started = false; + jcr->dir_impl->sd_msg_thread_done = false; + jcr->dir_impl->SD_msg_chan_started = false; Dmsg0(100, "Start SD msg_thread.\n"); if ((status = pthread_create(&thid, NULL, msg_thread, (void*)jcr)) != 0) { BErrNo be; @@ -367,9 +368,9 @@ bool StartStorageDaemonMessageThread(JobControlRecord* jcr) be.bstrerror(status)); } /* Wait for thread to start */ - while (!jcr->impl->SD_msg_chan_started) { + while (!jcr->dir_impl->SD_msg_chan_started) { Bmicrosleep(0, 50); - if (JobCanceled(jcr) || jcr->impl->sd_msg_thread_done) { return false; } + if (JobCanceled(jcr) || jcr->dir_impl->sd_msg_thread_done) { return false; } } Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->UseCount()); return true; @@ -381,13 +382,13 @@ extern "C" void MsgThreadCleanup(void* arg) jcr->db->EndTransaction(jcr); /* Terminate any open transaction */ jcr->lock(); - jcr->impl->sd_msg_thread_done = true; - jcr->impl->SD_msg_chan_started = false; + jcr->dir_impl->sd_msg_thread_done = true; + jcr->dir_impl->SD_msg_chan_started = false; jcr->unlock(); pthread_cond_broadcast( - &jcr->impl->nextrun_ready); /* wakeup any waiting threads */ + &jcr->dir_impl->nextrun_ready); /* wakeup any waiting threads */ pthread_cond_broadcast( - &jcr->impl->term_wait); /* wakeup any waiting threads */ + &jcr->dir_impl->term_wait); /* wakeup any waiting threads */ Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->UseCount()); jcr->db->ThreadCleanup(); /* remove thread specific data */ @@ -411,8 +412,8 @@ extern "C" void* msg_thread(void* arg) pthread_detach(pthread_self()); SetJcrInThreadSpecificData(jcr); - jcr->impl->SD_msg_chan = pthread_self(); - jcr->impl->SD_msg_chan_started = true; + jcr->dir_impl->SD_msg_chan = pthread_self(); + jcr->dir_impl->SD_msg_chan_started = true; pthread_cleanup_push(MsgThreadCleanup, arg); sd = jcr->store_bsock; @@ -429,7 +430,7 @@ extern "C" void* msg_thread(void* arg) if (jcr->sd_auth_key) { free(jcr->sd_auth_key); } jcr->sd_auth_key = strdup(auth_key); pthread_cond_broadcast( - &jcr->impl->nextrun_ready); /* wakeup any waiting threads */ + &jcr->dir_impl->nextrun_ready); /* wakeup any waiting threads */ continue; } @@ -443,10 +444,10 @@ extern "C" void* msg_thread(void* arg) if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles, &JobBytes, &JobErrors) == 5) { - jcr->impl->SDJobStatus = JobStatus; /* termination status */ - jcr->impl->SDJobFiles = JobFiles; - jcr->impl->SDJobBytes = JobBytes; - jcr->impl->SDErrors = JobErrors; + jcr->dir_impl->SDJobStatus = JobStatus; /* termination status */ + jcr->dir_impl->SDJobFiles = JobFiles; + jcr->dir_impl->SDJobBytes = JobBytes; + jcr->dir_impl->SDErrors = JobErrors; break; } Dmsg1(400, "end loop use=%d\n", jcr->UseCount()); @@ -460,7 +461,7 @@ extern "C" void* msg_thread(void* arg) */ Qmsg(jcr, M_FATAL, 0, _("Director's comm line to SD dropped.\n")); } - if (IsBnetError(sd)) { jcr->impl->SDJobStatus = JS_ErrorTerminated; } + if (IsBnetError(sd)) { jcr->dir_impl->SDJobStatus = JS_ErrorTerminated; } pthread_cleanup_pop(1); /* remove and execute the handler */ return NULL; } @@ -469,7 +470,7 @@ void WaitForStorageDaemonTermination(JobControlRecord* jcr) { int cancel_count = 0; /* Now wait for Storage daemon to Terminate our message thread */ - while (!jcr->impl->sd_msg_thread_done) { + while (!jcr->dir_impl->sd_msg_thread_done) { struct timeval tv; struct timezone tz; struct timespec timeout; @@ -479,10 +480,10 @@ void WaitForStorageDaemonTermination(JobControlRecord* jcr) timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */ Dmsg0(400, "I'm waiting for message thread termination.\n"); lock_mutex(mutex); - pthread_cond_timedwait(&jcr->impl->term_wait, &mutex, &timeout); + pthread_cond_timedwait(&jcr->dir_impl->term_wait, &mutex, &timeout); unlock_mutex(mutex); if (jcr->IsCanceled()) { - if (jcr->impl->SD_msg_chan_started) { + if (jcr->dir_impl->SD_msg_chan_started) { jcr->store_bsock->SetTimedOut(); jcr->store_bsock->SetTerminated(); SdMsgThreadSendSignal(jcr, TIMEOUT_SIGNAL); |