ABSTRACT: A merger of two programming tools, one a library for writing portable distributed programming systems, the other a language that combines AI and OR methods for decision support software systems, has resulted in an programming environment for maximally exploiting message-passing parallel systems for solving difficult problems involving logic and optimization.
Key terms: Parallel computing, optimization, languages, integer programming
Publication Information: This Brooklyn College Technical Report was presented at the 6th IASTED-ISMM Conference on Parallel and Distributed Computing and Systems in Washington DC in 1994 and appeared in the conference proceedings: Disjunctive programming and distributed programming. Proceedings of the Sixth IASTED-ISMM Conference on Parallel and Distributed Computing and Systems, Washington DC, Oct. 1994.
Acknowledgment of Support: This work was in part supported by PSC-CUNY Grant number 6-63278, PSC-CUNY Grant number 6-64184, NSF grant number IRI-9115603 and ONR grant number N00014-94-1-0035
The support that DP provides is shown below. The startProcs() routine, called by all processes, directs the primary process to call primary() and, for the worker processes, receives the semantic packet, the contents of which are shown in the typedef for SEM. This packet includes the address of the pri mary, the local sample size, and a shift for the guaranteeing independent pseudo-random number generation. The primary() routine spawns all the worker processes, initializing and passing the appropriate semantic packet. It then receives messages from the workers as they generate results.... definitions ... extern double profit, price[MONTHS][KINDS]; extern void get_prices(), startProcs(), sendResult(); continuous Revenue, Plan[MONTHS][MODES][KINDS]; 2lp_main() { double cache; set_up(Plan); // underlying linear model start_procs(); or(;;) { sendResult(); // send profit to primary get_prices(); // exit if DP requests it objective(Revenue,Plan); // set up objective function if not hill_climb(Plan,cache); then fail; // No disjunctive solution possible Revenue >= cache; // cache is lower bound on // profit determined by hill_climb solve_model(); // do branch and bound profit = wp(Revenue); // record result fail; // undo solution and go to top of loop } } objective(continuous Revenue,Plan[][MODES][KINDS]) { Revenue == // objective function sigma(int i=0;i<MONTHS;i++) sigma(int j=0;j<KINDS;j++) ( TONPROFT*Plan[i][PROC][j] - price[i][j]*Plan[i][PURC][j] - STORCOST*Plan[i][STOR][j] ); }
#include "2lplink.h"
#include <dp/dp.h>
/* The semantic packet recvd by */
typedef struct semstr { /* worker processes */
DPID s_primaryAddress; /* DPID of the primary */
int s_jobsRequested; /* the local sample size */
int s_RandomSeedShift; /* guarantee stochastic independence
*/
} SEM;
static SEM *sp, s;
static int jobs_done = 0;
double profit;
double price[MONTHS][KINDS]; /* perturbed price data */
double iprice[MONTHS][KINDS] = {... initial price data ... };
void
get_prices() {
int i, j;
if (jobs_done++ == sp->s_jobsRequested)
dp2lp_exit();
/* distribute prn sequence by discarding different batch */
/* for each process */
for (i=0; i<1000*(sp->s_RandomSeedShift); i++)
(void) rannyu(0);
... loop over price data ...
/* normal perturbation of initial price data */
price[i][j] = normal(iprice[i][j]);
}
static void
primary(int nhosts, SEM *sp, int semsize, int myhid) {
int i, j=0, nprocs;
DPID dummy;
double profitavg=0.0;
dp2lp_getpid(&s.s_primaryAddress);
for (i=0; i<nprocs; i++) {
s.s_RandomSeedShift = i;
dp2lp_spawn(&dummy, (++myhid)%nhosts, (char *) &s, sizeof(s),
1);
}
for (i=0; i<nprocs*jobs_requested; i++) {
dp2lp_recv(&dummy, (char *)&profit, sizeof(profit), DPBLOCK);
if (profit>0.0) {
++;
profitavg += profit;
}
}
profitavg /= (double)j;
... open results file and write profitavg ...
dp2lp_exit("boss done");
}
void
startProcs() { /* CALLED by the 2lp application code */
int nhosts, semsize, myhid;
nhosts = dp2lp_init((char *) &sp, &semsize, &myhid);
if (semsize == -1) {
primary(nhosts,sp,semsize,myhid);
dp2lp_exit("boss done"); /* NEVER RETURN TO 2lp app CODE */
} /* secondary just returns */
}
void
sendResult() {
dp2lp_write(&sp->s_primaryAddress,
(char *) &profit, sizeof(profit), DPREL|DPRECV);
}
As a worker process searches its local tree, when a solution node is reached, it communicates the new bound on the objective function to the other processes via an interrupting message. This and other kinds of interrupting messages force the immediate invocation of a worker's message handling routine:void /* executed by worker when */ get_iter() { /* current task completed */ MSG m; ... fill in MSG text and send to primary ... dp2lp_write(&sp->s_primary, (char *) &m, sizeof(MSG), DPREL|DPGETMSG); dp2lp_block(); /* wait for state */ while (job_status==UNEMPLOYED) /* change, avoiding */ dp2lp_pause(0,0); /* race condition */ dp2lp_unblock(); /* with block/unblock*/ if (job_status == RETIRED) dp2lp_exit("florida"); ... access state information and make ... ... available to 2LP code ... }
static void wcatcher() { /* the worker message handler */ MSG m; /* basic application determined */ /* message structure */ while (dp2lp_getmsg(&src,(char *)&m,sizeof(m)) != DPNOMESSAGE) { switch (m.msg_type) { case M_NEWRESULT: updateBound(); break; case M_NEWJOB: job_status = WORKING; ... create local state ... break; case M_RETIRED: job_status = RETIRED; break; case M_SPLIT: ... split current search tree and ... ... export to another process ... break; } } }
TABLE 1. Cap71 Running Times
processors 4 8 16
seconds 452 345 169