parallel.c

Go to the documentation of this file.
00001 /*                      P A R A L L E L . C
00002  * BRL-CAD
00003  *
00004  * Copyright (c) 2004-2006 United States Government as represented by
00005  * the U.S. Army Research Laboratory.
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public License
00009  * as published by the Free Software Foundation; either version 2 of
00010  * the License, or (at your option) any later version.
00011  *
00012  * This library is distributed in the hope that it will be useful, but
00013  * WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Library General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this file; see the file named COPYING for more
00019  * information.
00020  */
00021 
00022 /** \addtogroup thread */
00023 /*@{*/
00024 
00025 /** @file parallel.c
00026  *
00027  * @brief routines for parallel processing
00028  *
00029  *  Machine-specific routines for parallel processing.
00030  *  Primarily calling functions in multiple threads on multiple CPUs.
00031  *
00032  *  @author  Michael John Muuss
00033  *      
00034  *
00035  *  @par Source -
00036  *      The U. S. Army Research Laboratory
00037  *  @n  Aberdeen Proving Ground, Maryland  21005-5068  USA
00038  *
00039  */
00040 
00041 #ifndef lint
00042 static const char RCSparallel[] = "@(#)$Header: /cvsroot/brlcad/brlcad/src/libbu/parallel.c,v 14.22 2006/09/03 15:14:07 lbutler Exp $ (ARL)";
00043 #endif
00044 
00045 #include "common.h"
00046 
00047 /* XXX header mess needs cleaned up */
00048 
00049 #include <stdio.h>
00050 #include <ctype.h>
00051 #include <math.h>
00052 #ifdef HAVE_SIGNAL_H
00053 #  include <signal.h>
00054 #endif
00055 #ifdef HAVE_STRING_H
00056 #  include <string.h>
00057 #else
00058 #  include <strings.h>
00059 #endif
00060 #include "machine.h"
00061 #include "bu.h"
00062 
00063 #ifdef linux
00064 #  include <sys/time.h>
00065 #  include <sys/types.h>
00066 #  include <sys/resource.h>
00067 #  ifdef HAVE_SYS_WAIT_H
00068 #    include <sys/wait.h>
00069 #  endif
00070 #  include <sys/stat.h>
00071 #  include <sys/sysinfo.h>
00072 #endif
00073 
00074 #ifdef __FreeBSD__
00075 #  include <sys/types.h>
00076 #  include <sys/time.h>
00077 #  include <sys/resource.h>
00078 #  ifdef HAVE_SYS_WAIT_H
00079 #    include <sys/wait.h>
00080 #  endif
00081 #  include <sys/stat.h>
00082 #endif
00083 
00084 #ifdef __APPLE__
00085 #  include <sys/types.h>
00086 #  include <sys/time.h>
00087 #  include <sys/resource.h>
00088 #  ifdef HAVE_SYS_WAIT_H
00089 #    include <sys/wait.h>
00090 #  endif
00091 #  include <sys/stat.h>
00092 #  include <sys/param.h>
00093 #  include <sys/sysctl.h>
00094 #endif
00095 
00096 #ifdef __sp3__
00097 #  include <sys/types.h>
00098 #  include <sys/sysconfig.h>
00099 #  include <sys/var.h>
00100 #endif
00101 
00102 #ifdef CRAY
00103 #  include <sys/category.h>
00104 #  include <sys/resource.h>
00105 #  include <sys/types.h>
00106 #  ifdef CRAY1
00107 #    include <sys/machd.h>      /* For HZ */
00108 #  endif
00109 #endif
00110 
00111 #ifdef CRAY2
00112 #  undef MAXINT
00113 #  include <sys/param.h>
00114 #endif
00115 
00116 #ifdef HEP
00117 #  include <synch.h>
00118 #  undef stderr
00119 #  define stderr stdout
00120 #endif /* HEP */
00121 
00122 #if defined(alliant) && !defined(i860)
00123 /* Alliant FX/8 */
00124 #  include <cncall.h>
00125 #endif
00126 
00127 #if (defined(sgi) && defined(mips)) || (defined(__sgi) && defined(__mips))
00128 /* XXX hack that should eventually go away when it can be verified */
00129 #  define SGI_4D        1
00130 #  define _SGI_SOURCE   1       /* IRIX 5.0.1 needs this to def M_BLKSZ */
00131 #  define _BSD_TYPES    1       /* IRIX 5.0.1 botch in sys/prctl.h */
00132 #endif
00133 
00134 #ifdef HAVE_SYS_TYPES_H
00135 #  include <sys/types.h>
00136 #endif
00137 #ifdef HAVE_ULOCKS_H
00138 #  include <ulocks.h>
00139 #endif
00140 #ifdef HAVE_SYS_SYSMP_H
00141 #  include <sys/sysmp.h> /* for sysmp() */
00142 #endif
00143 #ifdef HAVE_MALLOC_H
00144 #  include <malloc.h>
00145 #endif
00146 
00147 #ifdef HAVE_SYS_WAIT_H
00148 #  include <sys/wait.h>
00149 #endif
00150 
00151 #ifdef HAVE_SCHED_H
00152 #  include <sched.h>
00153 #else
00154 #  ifdef HAVE_SYS_SCHED_H
00155 #    include <sys/sched.h>
00156 #  endif
00157 #endif
00158 #if defined(IRIX64) && IRIX64 >= 64
00159 static struct sched_param bu_param;
00160 #endif
00161 
00162 #ifdef ardent
00163 #  include <thread.h>
00164 #endif
00165 
00166 #if defined(n16)
00167 #  include <parallel.h>
00168 #  include <sys/sysadmin.h>
00169 #endif
00170 
00171 /*
00172  * multithreading support for SunOS 5.X / Solaris 2.x
00173  */
00174 #if defined(SUNOS) && SUNOS >= 52
00175 #  include <sys/unistd.h>
00176 #  include <thread.h>
00177 #  include <synch.h>
00178 #  define rt_thread_t   thread_t
00179 #endif  /* SUNOS */
00180 
00181 /*
00182  * multithread support built on POSIX Threads (pthread) library.
00183  */
00184 #ifdef HAVE_UNISTD_H
00185 #  include <unistd.h>
00186 #else
00187 #  ifdef HAVE_SYS_UNISTD_H
00188 #    include <sys/unistd.h>
00189 #  endif
00190 #endif
00191 #ifdef HAVE_PTHREAD_H
00192 #  include <pthread.h>
00193 #  define rt_thread_t   pthread_t
00194 #endif
00195 
00196 #ifdef CRAY
00197 struct taskcontrol {
00198         int     tsk_len;
00199         int     tsk_id;
00200         int     tsk_value;
00201 } bu_taskcontrol[MAX_PSW];
00202 #endif
00203 
00204 /**
00205  *                      B U _ N I C E _ S E T
00206  *
00207  *  Without knowing what the current UNIX "nice" value is,
00208  *  change to a new absolute "nice" value.
00209  *  (The system routine makes a relative change).
00210  */
00211 void
00212 bu_nice_set(int newnice)
00213 {
00214 #ifdef _WIN32
00215   if (bu_debug)
00216     bu_log("bu_nice_set() Priority NOT changed\n");
00217 
00218   return;
00219 
00220 #else  /* not _WIN32 */
00221   int opri, npri;
00222 
00223 #  ifdef BSD
00224 #    ifndef PRIO_PROCESS  /* necessary for linux */
00225 #      define PRIO_PROCESS  0   /* From /usr/include/sys/resource.h */
00226 #    endif
00227   opri = getpriority( PRIO_PROCESS, 0 );
00228   setpriority( PRIO_PROCESS, 0, newnice );
00229   npri = getpriority( PRIO_PROCESS, 0 );
00230 
00231 #  else  /* not BSD */
00232   int bias, chg;
00233 
00234   /* " nice adds the value of incr to the nice value of the process" */
00235   /* "The default nice value is 20" */
00236   /* "Upon completion, nice returns the new nice value minus 20" */
00237   bias = 0;
00238   opri = nice(0) - bias;
00239   chg = newnice - opri;
00240   (void)nice(chg);
00241   npri = nice(0) - bias;
00242   if( npri != newnice )  bu_log("bu_nice_set() SysV error:  wanted nice %d! check bias=%d\n", newnice, bias );
00243 #  endif  /* BSD */
00244 
00245   if( bu_debug ) bu_log("bu_nice_set() Priority changed from %d to %d\n", opri, npri);
00246 
00247 #endif  /* _WIN32 */
00248 }
00249 
00250 
00251 /**
00252  *                      B U _ C P U L I M I T _ G E T
00253  *
00254  *  Return the current CPU limit, in seconds.
00255  *  Zero or negative return indicates that limits are not in effect.
00256  */
00257 int
00258 bu_cpulimit_get(void)
00259 {
00260 #ifdef CRAY
00261         long    old;                    /* 64-bit clock counts */
00262         extern long limit();
00263 
00264         if( (old = limit( C_PROC, 0, L_CPU, -1 )) < 0 )  {
00265                 perror("bu_cpulimit_get(): CPU limit(get)");
00266         }
00267         if( old <= 0 )
00268                 return(999999);         /* virtually unlimited */
00269         return( (old + HZ - 1) / HZ );
00270 #else
00271         return(-1);
00272 #endif
00273 }
00274 
00275 /**
00276  *                      B U _ C P U L I M I T _ S E T
00277  *
00278  *  Set CPU time limit, in seconds.
00279  */
00280 /* ARGSUSED */
00281 void
00282 bu_cpulimit_set(int sec)
00283 {
00284 #ifdef CRAY
00285         long    old;            /* seconds */
00286         long    new;            /* seconds */
00287         long    newtick;        /* 64-bit clock counts */
00288         extern long limit();
00289 
00290         old = bu_cpulimit_get();
00291         new = old + sec;
00292         if( new <= 0 || new > 999999 )
00293                 new = 999999;   /* no limit, for practical purposes */
00294         newtick = new * HZ;
00295         if( limit( C_PROC, 0, L_CPU, newtick ) < 0 )  {
00296                 perror("bu_cpulimit_set: CPU limit(set)");
00297         }
00298         bu_log("Cray CPU limit changed from %d to %d seconds\n",
00299                 old, newtick/HZ );
00300 
00301         /* Eliminate any memory limit */
00302         if( limit( C_PROC, 0, L_MEM, 0 ) < 0 )  {
00303                 /* Hopefully, not fatal if memory limits are imposed */
00304                 perror("bu_cpulimit_set: MEM limit(set)");
00305         }
00306 #endif
00307         if (sec < 0) sec = 0;
00308 }
00309 
00310 
00311 
00312 /**
00313  *                      B U _ A V A I L _ C P U S
00314  *
00315  *  Return the maximum number of physical CPUs that are considered to be
00316  *  available to this process now.
00317  */
00318 int
00319 bu_avail_cpus(void)
00320 {
00321         int ncpu = -1;
00322 
00323 
00324 #if defined(_SC_NPROCESSORS_ONLN)
00325         /* SUNOS and linux */
00326         ncpu = sysconf(_SC_NPROCESSORS_ONLN);
00327         if (ncpu < 0) {
00328                 perror("Unable to get the number of available CPUs");
00329                 ncpu = 1;
00330         }
00331         goto DONE_NCPU;
00332 #elif defined(_SC_NPROC_ONLN)
00333         ncpu = sysconf(_SC_NPROC_ONLN);
00334         if (ncpu < 0) {
00335                 perror("Unable to get the number of available CPUs");
00336                 ncpu = 1;
00337         }
00338         goto DONE_NCPU;
00339 #elif defined(_SC_CRAY_NCPU)
00340         /* cray */
00341         ncpu = sysconf(_SC_CRAY_NCPU);
00342         if (ncpu < 0) {
00343                 perror("Unable to get the number of available CPUs");
00344                 ncpu = 1;
00345         }
00346         goto DONE_NCPU;
00347 #endif
00348 
00349 
00350 #ifdef SGI_4D
00351         /* XXX LAB 04 June 2002
00352          * The call prctl(PR_MAXPPROCS) is supposed to indicate the number
00353          * of processors this process can use.  Unfortuantely, this returns
00354          * 0 when running under a CPU set.  A bug report has been filed with
00355          * SGI.
00356          *
00357          * The sysmp(MP_NPROCS) call returns the number of physically
00358          * configured processors.  This will have to suffice until SGI
00359          * comes up with a fix.
00360          */
00361 #  ifdef HAVE_SYSMP
00362         ncpu = sysmp(MP_NPROCS);
00363 #  elif defined(HAVE_PRCTL)
00364         ncpu = (int)prctl(PR_MAXPPROCS);
00365 #  endif
00366         goto DONE_NCPU;
00367 #endif /* SGI_4D */
00368 
00369 
00370 #ifdef alliant
00371         {
00372           long  memsize, ipnum, cenum, detnum, attnum;
00373 
00374 #  if !defined(i860)
00375           /* FX/8 */
00376           lib_syscfg( &memsize, &ipnum, &cenum, &detnum, &attnum );
00377 #  else
00378           /* FX/2800 */
00379           attnum = 28;
00380 #  endif /* i860 */
00381           ncpu = attnum;                /* # of CEs attached to parallel Complex */
00382           goto DONE_NCPU;
00383         }
00384 #endif /* alliant */
00385 
00386 
00387 #if defined(__sp3__)
00388         {
00389           int status;
00390           int cmd;
00391           int parmlen;
00392           struct var p;
00393 
00394           cmd = SYS_GETPARMS;
00395           parmlen = sizeof(struct var);
00396           if ( sysconfig(cmd, &p, parmlen) != 0 ) {
00397             bu_bomb("bu_parallel(): sysconfig error for sp3");
00398           }
00399           ncpu = p.v_ncpus;
00400           goto DONE_NCPU;
00401         }
00402 #endif  /* __sp3__ */
00403 
00404 
00405 #if defined(n16)
00406         if( (ncpu = sysadmin( SADMIN_NUMCPUS, 0 )) < 0 )
00407           perror("sysadmin");
00408         goto DONE_NCPU;
00409 #endif /* n16 */
00410 
00411 
00412 #ifdef __FreeBSD__
00413         {
00414           int maxproc;
00415           size_t len;
00416           len = 4;
00417           if (sysctlbyname("hw.ncpu", &maxproc, &len, NULL, 0) == -1) {
00418             ncpu = 1;
00419             perror("sysctlbyname");
00420           } else {
00421             ncpu = maxproc;
00422           }
00423           goto DONE_NCPU;
00424         }
00425 #endif
00426 
00427 
00428 #if defined(__ppc__)
00429         {
00430           int mib[2], maxproc;
00431           size_t len;
00432 
00433           mib[0] = CTL_HW;
00434           mib[1] = HW_NCPU;
00435           len = sizeof(maxproc);
00436           if (sysctl(mib, 2, &maxproc, &len, NULL, 0) == -1) {
00437             ncpu = 1;
00438             perror("sysctl");
00439           } else {
00440             ncpu = maxproc; /* should be able to get sysctl to return maxproc */
00441           }
00442           goto DONE_NCPU;
00443         }
00444 #endif /* __ppc__ */
00445 
00446 
00447 #if defined(HAVE_GET_NPROCS)
00448         ncpu = get_nprocs(); /* GNU extension from sys/sysinfo.h */
00449         goto DONE_NCPU;
00450 #endif
00451 
00452 
00453 #if defined(linux) && 0
00454         {
00455           /* old retired linux method */
00456           /*
00457            * Ultra-kludgey way to determine the number of cpus in a
00458            * linux box--count the number of processor entries in
00459            * /proc/cpuinfo!
00460            */
00461 
00462 #       define CPUINFO_FILE "/proc/cpuinfo"
00463           FILE *fp;
00464           char buf[128];
00465 
00466           ncpu = 0;
00467 
00468           fp = fopen (CPUINFO_FILE,"r");
00469 
00470           if (fp == NULL) {
00471             ncpu = 1;
00472             perror (CPUINFO_FILE);
00473           } else {
00474             while (fgets (buf, 80, fp) != NULL) {
00475               if (strncmp (buf, "processor",9) == 0) {
00476                 ++ ncpu;
00477               }
00478             }
00479             fclose (fp);
00480 
00481             if (ncpu <= 0) {
00482               ncpu = 1;
00483             }
00484           }
00485           goto DONE_NCPU;
00486         }
00487 #endif
00488 
00489 #if defined(_WIN32)
00490         /* Windows */
00491         {
00492             SYSTEM_INFO sysinfo;
00493 
00494             GetSystemInfo(&sysinfo);
00495             ncpu = (int)sysinfo.dwNumberOfProcessors;
00496             goto DONE_NCPU;
00497         }
00498 #endif
00499 
00500 DONE_NCPU:  ; /* allows debug and final validity check */
00501 
00502 
00503 #if defined(HAVE_PTHREAD_H)
00504         /* if they have threading and we could not detect properly, claim two */
00505         if (ncpu < 0) {
00506                 ncpu = 2;
00507         }
00508 #endif
00509 
00510         if (bu_debug & BU_DEBUG_PARALLEL) {
00511                 /* do not use bu_log() here, this can get called before semaphores are initialized */
00512                 fprintf( stderr, "bu_avail_cpus: counted %d cpus.\n", ncpu);
00513         }
00514 
00515         if (ncpu > 0) {
00516                 return ncpu;
00517         }
00518 
00519         return( DEFAULT_PSW );
00520 }
00521 
00522 
00523 /**
00524  *                      B U _ G E T _ L O A D _ A V E R A G E
00525  *
00526  *  A generally portable method for obtaining the 1-minute load average.
00527  *  Vendor-specific methods which don't involve a fork/exec sequence
00528  *  would be preferable.
00529  *  Alas, very very few systems put the load average in /proc,
00530  *  most still grunge the avenrun[3] array out of /dev/kmem,
00531  *  which requires special privleges to open.
00532  */
00533 fastf_t
00534 bu_get_load_average(void)
00535 {
00536         double  load = -1.0;
00537 #ifndef _WIN32
00538         FILE    *fp;
00539 
00540         fp = popen("PATH=/bin:/usr/bin:/usr/ucb:/usr/bsd; export PATH; uptime|sed -e 's/.*average: //' -e 's/,.*//' ", "r");
00541         if( !fp )
00542                 return -1.0;
00543 
00544         fscanf( fp, "%lf", &load );
00545         fclose(fp);
00546 
00547         while( wait(NULL) != -1 )  ;    /* NIL */
00548 #endif
00549         return load;
00550 }
00551 
00552 /**
00553  *                      B U _ G E T _ P U B L I C _ C P U S
00554  *
00555  *  A general mechanism for non-privleged users of a server system to control
00556  *  how many processors of their server get consumed by multi-thread
00557  *  cruncher processes, by leaving a world-writable file.
00558  *
00559  *  If the number in the file is negative, it means "all but that many."
00560  *
00561  *  Returns the number of processors presently available for "public" use.
00562  */
00563 #ifndef _WIN32
00564 #  define PUBLIC_CPUS1  "/var/tmp/public_cpus"
00565 #  define PUBLIC_CPUS2  "/usr/tmp/public_cpus"
00566 #endif
00567 int
00568 bu_get_public_cpus(void)
00569 {
00570         int     avail_cpus = bu_avail_cpus();
00571 #ifndef _WIN32
00572         int     public_cpus = 1;
00573         FILE    *fp;
00574 
00575         if( (fp = fopen(PUBLIC_CPUS1, "r")) != NULL ||
00576             (fp = fopen(PUBLIC_CPUS2, "r")) != NULL
00577         )  {
00578                 (void)fscanf( fp, "%d", &public_cpus );
00579                 fclose(fp);
00580                 if( public_cpus < 0 )  public_cpus = avail_cpus + public_cpus;
00581                 if( public_cpus > avail_cpus )  public_cpus = avail_cpus;
00582                 return public_cpus;
00583         }
00584 
00585         (void)unlink(PUBLIC_CPUS1);
00586         (void)unlink(PUBLIC_CPUS2);
00587         if( (fp = fopen(PUBLIC_CPUS1, "w")) != NULL ||
00588             (fp = fopen(PUBLIC_CPUS2, "w")) != NULL
00589         )  {
00590                 fprintf(fp, "%d\n", avail_cpus);
00591                 fclose(fp);
00592                 (void)chmod(PUBLIC_CPUS1, 0666);
00593                 (void)chmod(PUBLIC_CPUS2, 0666);
00594         }
00595 #endif
00596         return avail_cpus;
00597 }
00598 
00599 /**
00600  *                      B U _ S E T _ R E A L T I M E
00601  *
00602  *  If possible, mark this process for real-time scheduler priority.
00603  *  Will often need root privs to succeed.
00604  *
00605  *  Returns -
00606  *      1       realtime priority obtained
00607  *      0       running with non-realtime scheduler behavior
00608  */
00609 int
00610 bu_set_realtime(void)
00611 {
00612 #       if defined(IRIX64) && IRIX64 >= 64
00613         {
00614                 int     policy;
00615 
00616                 if( (policy = sched_getscheduler(0)) >= 0 )  {
00617                         if( policy == SCHED_RR || policy == SCHED_FIFO )
00618                                 return 1;
00619                 }
00620 
00621                 sched_getparam( 0, &bu_param );
00622 
00623                 if ( sched_setscheduler( 0,
00624                         SCHED_RR,               /* policy */
00625                         &bu_param
00626                     ) >= 0 )  {
00627                         return 1;               /* realtime */
00628                 }
00629                 /* Fall through to return 0 */
00630         }
00631 #       endif
00632         return 0;
00633 }
00634 
00635 /**********************************************************************/
00636 
00637 #if defined(unix) || defined(__unix)
00638         /*
00639          * Cray is known to wander among various pids, perhaps others.
00640          */
00641 #       define  CHECK_PIDS      1
00642 #endif
00643 
00644 #if defined(PARALLEL)
00645 
00646 /* bu_worker_tbl_not_empty and bu_kill_workers are only used by the sgi arch */
00647 #  ifdef SGI_4D
00648 
00649 /**
00650  *                      B U _ W O R K E R _ T B L _ N O T _ E M P T Y
00651  */
00652 static int
00653 bu_worker_tbl_not_empty(tbl)
00654 int tbl[MAX_PSW];
00655 {
00656         register int i;
00657         register int children=0;
00658 
00659         for (i=1 ; i < MAX_PSW ; ++i)
00660                 if (tbl[i]) children++;
00661 
00662         return(children);
00663 }
00664 
00665 /**
00666  *                      B U _ K I L L _ W O R K E R S
00667  */
00668 static void
00669 bu_kill_workers(tbl)
00670 int tbl[MAX_PSW];
00671 {
00672   register int i;
00673 
00674   for (i=1 ; i < MAX_PSW ; ++i) {
00675     if ( tbl[i] ) {
00676       if( kill(tbl[i], 9) ) {
00677         perror("bu_kill_workers(): SIGKILL to child process");
00678       }
00679       else {
00680         bu_log("bu_kill_workers(): child pid %d killed\n", tbl[i]);
00681       }
00682     }
00683   }
00684 
00685   bzero( (char *)tbl, sizeof(tbl) );
00686 }
00687 #  endif   /* end check if sgi_4d defined */
00688 
00689 extern int      bu_pid_of_initiating_thread;    /* From ispar.c */
00690 
00691 static int      bu_nthreads_started = 0;        /* # threads started */
00692 static int      bu_nthreads_finished = 0;       /* # threads properly finished */
00693 static void     (*bu_parallel_func) BU_ARGS((int,genptr_t));    /* user function to run in parallel */
00694 static genptr_t bu_parallel_arg;                /* User's arg to his threads */
00695 
00696 /**
00697  *                      B U _ P A R A L L E L _ I N T E R F A C E
00698  *
00699  *  Interface layer between bu_parallel and the user's function.
00700  *  Necessary so that we can provide unique thread numbers as a
00701  *  parameter to the user's function, and to decrement the global
00702  *  counter when the user's function returns to us (as opposed to
00703  *  dumping core or longjmp'ing too far).
00704  *
00705  *  Note that not all architectures can pass an argument
00706  *  (e.g. the pointer to the user's function), so we depend on
00707  *  using a global variable to communicate this.
00708  *  This is no problem, since only one copy of bu_parallel()
00709  *  may be active at any one time.
00710  */
00711 static void
00712 bu_parallel_interface(void)
00713 {
00714         register int    cpu;            /* our CPU (thread) number */
00715 
00716 #if 0
00717 #ifdef HAVE_PTHREAD_H
00718         {
00719                 pthread_t       pt;
00720                 pt = pthread_self();
00721                 fprintf(stderr,"bu_parallel_interface, Thread ID = 0x%x\n", (unsigned int)pt);
00722         }
00723 #endif
00724 #endif
00725         bu_semaphore_acquire( BU_SEM_SYSCALL );
00726         cpu = bu_nthreads_started++;
00727         bu_semaphore_release( BU_SEM_SYSCALL );
00728 
00729         (*bu_parallel_func)(cpu, bu_parallel_arg);
00730 
00731         bu_semaphore_acquire( BU_SEM_SYSCALL );
00732         bu_nthreads_finished++;
00733         bu_semaphore_release( BU_SEM_SYSCALL );
00734 
00735 #       if defined(SGI_4D) || defined(IRIX)
00736         /*
00737          *  On an SGI, a process/thread created with the "sproc" syscall has
00738          *  all of it's file descriptors closed when it "returns" to sproc.
00739          *  Since this trashes file descriptors which may still be in use by
00740          *  other processes, we avoid ever returning to sproc.
00741          */
00742         if(cpu) _exit(0);
00743 #       endif /* SGI */
00744 }
00745 #endif /* PARALLEL */
00746 
00747 #ifdef SGI_4D
00748 /**
00749  *                      B U _ P R _ F I L E
00750  *
00751  *  SGI-specific.  Formatted printing of stdio's FILE struct.
00752  */
00753 void
00754 bu_pr_FILE(title, fp)
00755 char    *title;
00756 FILE    *fp;
00757 {
00758         bu_log("FILE structure '%s', at x%x:\n", title, fp );
00759         bu_log(" _cnt = x%x\n", fp->_cnt);
00760         bu_log(" _ptr = x%x\n", fp->_ptr);
00761         bu_log(" _base = x%x\n", fp->_base);
00762         bu_log(" _file = x%x\n", fp->_file);
00763         bu_printb(" _flag ", fp->_flag & 0xFF,
00764                 "\010\010_IORW\7_100\6_IOERR\5_IOEOF\4_IOMYBUF\3_004\2_IOWRT\1_IOREAD" );
00765         bu_log("\n");
00766 }
00767 #endif
00768 
00769 /**
00770  *                      B U _ P A R A L L E L
00771  *
00772  *  Create 'ncpu' copies of function 'func' all running in parallel,
00773  *  with private stack areas.  Locking and work dispatching are
00774  *  handled by 'func' using a "self-dispatching" paradigm.
00775  *
00776  *  'func' is called with one parameter, it's thread number.
00777  *  Threads are given increasing numbers, starting with zero.
00778  *
00779  *  This function will not return control until all invocations
00780  *  of the subroutine are finished.
00781  *
00782  *  Don't use registers in this function (bu_parallel).  At least on the Alliant,
00783  *  register context is NOT preserved when exiting the parallel mode,
00784  *  because the serial portion resumes on some arbitrary processor,
00785  *  not necessarily the one that serial execution started on.
00786  *  The registers are not shared.
00787  */
00788 void
00789 bu_parallel( func, ncpu, arg )
00790 void            (*func) BU_ARGS((int, genptr_t));
00791 int             ncpu;
00792 genptr_t        arg;
00793 {
00794 #if defined(PARALLEL)
00795         int     avail_cpus;
00796 
00797 #  if defined(alliant) && !defined(i860) && !__STDC__
00798         register int d7;        /* known to be in d7 */
00799         register int d6 = ncpu; /* known to be in d6 */
00800 #  endif
00801         int     x;
00802 
00803 #  if defined(SGI_4D) || defined(CRAY)
00804         int     new;
00805 #  endif
00806 
00807 #  ifdef sgi
00808         long    stdin_pos;
00809         FILE    stdin_save;
00810         int     worker_pid_tbl[MAX_PSW];
00811 #  endif
00812 
00813 /*
00814  * multithreading support for SunOS 5.X / Solaris 2.x
00815  */
00816 #  if defined(SUNOS) && SUNOS >= 52
00817         static int      concurrency = 0; /* Max concurrency we have set */
00818 #  endif
00819 #  if (defined(SUNOS) && SUNOS >= 52) || defined(HAVE_PTHREAD_H)
00820         int             nthreadc;
00821         int             nthreade;
00822         rt_thread_t     thread;
00823         rt_thread_t     thread_tbl[MAX_PSW];
00824         int             i;
00825 #  endif        /* SUNOS */
00826 
00827 #  ifdef sgi
00828         bzero(worker_pid_tbl, sizeof(worker_pid_tbl) );
00829 #  endif
00830 
00831         if( bu_debug & BU_DEBUG_PARALLEL )
00832                 bu_log("bu_parallel(0x%lx, %d, x%lx)\n", (long)func, ncpu, (long)arg );
00833 
00834         if( bu_pid_of_initiating_thread )
00835                 bu_bomb("bu_parallel() called from within parallel section\n");
00836 
00837         bu_pid_of_initiating_thread = getpid();
00838 
00839         if (ncpu > MAX_PSW) {
00840                 bu_log("WARNING: bu_parallel() ncpu(%d) > MAX_PSW(%d), adjusting ncpu\n", ncpu, MAX_PSW);
00841                 ncpu = MAX_PSW;
00842         }
00843         bu_nthreads_started = 0;
00844         bu_nthreads_finished = 0;
00845         bu_parallel_func = func;
00846         bu_parallel_arg = arg;
00847         avail_cpus = bu_avail_cpus();
00848         if( ncpu > avail_cpus ) {
00849                 bu_log( "%d cpus requested, but only %d available\n", ncpu, avail_cpus );
00850                 ncpu = avail_cpus;
00851         }
00852 
00853 
00854 #  ifdef HEP
00855         bu_nthreads_started = 1;
00856         bu_nthreads_finished = 1;
00857         for( x=1; x<ncpu; x++ )  {
00858                 /* This is more expensive when GEMINUS>1 */
00859                 Dcreate( bu_parallel_interface );
00860         }
00861         (*func)(0,arg); /* avoid wasting this task */
00862 #  endif /* HEP */
00863 
00864 #  ifdef CRAY
00865 #    if 0
00866         /* Try to give up processors as soon as they are un needed */
00867         new = 0;
00868         TSKTUNE( "DBRELEAS", &new );
00869 #    endif
00870 
00871         bu_nthreads_started = 1;
00872         bu_nthreads_finished = 1;
00873         /* Create any extra worker tasks */
00874         for( x=1; x<ncpu; x++ ) {
00875                 bu_taskcontrol[x].tsk_len = 3;
00876                 bu_taskcontrol[x].tsk_value = x;
00877                 TSKSTART( &bu_taskcontrol[x], bu_parallel_interface );
00878         }
00879         (*func)(0,arg); /* avoid wasting this task */
00880 
00881         /* Wait for them to finish */
00882         for( x=1; x<ncpu; x++ )  {
00883                 TSKWAIT( &bu_taskcontrol[x] );
00884         }
00885         /* There needs to be some way to kill the tfork()'ed processes here */
00886 #  endif
00887 
00888 #  if defined(alliant) && !defined(i860)
00889 #       if defined(__STDC__)    /* fxc defines it == 0 !! */
00890 #       undef __STDC__
00891 #       define __STDC__ 2
00892 
00893         /* Calls bu_parallel_interface in parallel "ncpu" times */
00894         concurrent_call(CNCALL_COUNT|CNCALL_NO_QUIT, bu_parallel_interface, ncpu);
00895 
00896 #       else
00897         {
00898                 asm("   movl            d6,d0");
00899                 asm("   subql           #1,d0");
00900                 asm("   cstart          d0");
00901                 asm("super_loop:");
00902                 bu_parallel_interface();                /* d7 has current index, like magic */
00903                 asm("   crepeat         super_loop");
00904         }
00905 #       endif
00906 #  endif
00907 
00908 #  if defined(alliant) && defined(i860)
00909         #pragma loop cncall
00910         for( x=0; x<ncpu; x++) {
00911                 bu_parallel_interface();
00912         }
00913 #  endif
00914 
00915 #  if defined(convex) || defined(__convex__)
00916         /*$dir force_parallel */
00917         for( x=0; x<ncpu; x++ )  {
00918                 bu_parallel_interface();
00919         }
00920 #  endif /* convex */
00921 
00922 #  ifdef ardent
00923         /* The stack size parameter is pure guesswork */
00924         parstack( bu_parallel_interface, 1024*1024, ncpu );
00925 #  endif /* ardent */
00926 
00927 #  ifdef SGI_4D
00928         stdin_pos = ftell(stdin);
00929         stdin_save = *(stdin);          /* struct copy */
00930         bu_nthreads_started = 1;
00931         bu_nthreads_finished = 1;
00932 
00933         /* Note:  it may be beneficial to call prctl(PR_SETEXITSIG); */
00934         /* prctl(PR_TERMCHILD) could help when parent dies.  But SIGHUP??? hmmm */
00935         for( x = 1; x < ncpu; x++)  {
00936                 /*
00937                  *  Start a share-group process, sharing ALL resources.
00938                  *  This direct sys-call can be used because none of the
00939                  *  task-management services of, eg, taskcreate() are needed.
00940                  */
00941 #    if defined(IRIX) && IRIX <= 4
00942                 /*  Stack size per proc comes from RLIMIT_STACK (typ 64MBytes). */
00943                 new = sproc( bu_parallel_interface, PR_SALL, 0 );
00944 #    else
00945                 /* State maximum stack size.
00946                  * Be generous, as this mainly costs address space.
00947                  * RAM is allocated only to those pages used.
00948                  * On the other hand, don't be too generous, because each
00949                  * proc needs this much space on, e.g. a 64 processor system.
00950                  * Don't go quite for an even number of megabytes,
00951                  * in the hopes of creating a small 32k "buffer zone"
00952                  * to catch stack overflows.
00953                  */
00954                 new = sprocsp( (void (*)(void *, size_t))bu_parallel_interface,
00955                         PR_SALL, 0, NULL,
00956 #                       if defined(IRIX64)
00957                                 64*1024*1024 - 32*1024
00958 #                       else
00959                                 4*1024*1024 - 32*1024
00960 #                       endif
00961                         );
00962 #    endif
00963                 if( new < 0 )  {
00964                         perror("sproc");
00965                         bu_log("ERROR bu_parallel(): sproc(x%x, x%x, )=%d failed on processor %d\n",
00966                                 bu_parallel_interface, PR_SALL,
00967                                 new, x );
00968                         bu_log("sbrk(0)=x%x\n", sbrk(0) );
00969                         bu_bomb("bu_parallel() failure");
00970                 } else {
00971                         worker_pid_tbl[x] = new;
00972                 }
00973 
00974         }
00975         (*func)(0,arg); /* don't waste this thread */
00976         {
00977                 int     pid;
00978                 int     pstat;
00979                 int     children;
00980 
00981                 /*
00982                  * Make sure all children are done.
00983                  */
00984                 while ( children=bu_worker_tbl_not_empty(worker_pid_tbl) ) {
00985                         pstat = 0;
00986                         if ( (pid = wait(&pstat)) < 0) {
00987                                 perror("bu_parallel() wait()");
00988                                 bu_kill_workers(worker_pid_tbl);
00989                                 bu_bomb("parallelism error");
00990                         } else if (pid == 0) {
00991                                 bu_log("bu_parallel() wait() == 0 with %d children remaining\n", children);
00992                                 bu_kill_workers(worker_pid_tbl);
00993                                 bu_bomb("Missing worker");
00994                         } else {
00995                                 if( (pstat & 0xFF) != 0 )  {
00996                                         bu_log("***ERROR: bu_parallel() worker %d exited with status x%x!\n", pid, pstat);
00997                                         /* XXX How to cope with this;  can't back out work that was lost at this level. */
00998 #    ifdef IRIX
00999         if (WIFEXITED(pstat))
01000                 bu_log ("Child terminated normally with status %d 0x%0x\n",
01001                         WEXITSTATUS(pstat));
01002 
01003         if (WIFSIGNALED(pstat)) {
01004                 bu_log("child terminated on signal %d %0x\n", WTERMSIG(pstat));
01005                 if (pstat & 0200)
01006                         bu_log("core dumped\n");
01007                 else
01008                         bu_log("No core dump\n");
01009         }
01010         if (WIFSTOPPED(pstat))
01011                 bu_log("child is stopped on signal %d 0x%x\n", WSTOPSIG(pstat));
01012 
01013         if ( (pstat & 0177777) == 0177777 )
01014                 bu_log("child has continued\n");
01015 
01016 #    endif
01017                                         bu_kill_workers(worker_pid_tbl);
01018                                         bu_bomb("A worker blew out");
01019                                 }
01020                                 /* remove pid from worker_pid_tbl */
01021                                 for (x=1 ; x < ncpu ; x++)
01022                                         if (worker_pid_tbl[x] == pid) {
01023                                                 worker_pid_tbl[x] = 0;
01024                                                 break;
01025                                         }
01026 
01027                                 if (x >= ncpu) {
01028                                         bu_log("WARNING: bu_parallel(): wait() returned non-child process, pid %d\n", pid);
01029                                 }
01030                         }
01031                 }
01032         }
01033         if( ftell(stdin) != stdin_pos )  {
01034                 /*
01035                  *  Gross SGI bug:  when a thread is finished, it returns
01036                  *  to the stack frame created by sproc(), which
01037                  *  just calls exit(0), resulting in all STDIO file buffers
01038                  *  being fflush()ed.  This zaps the stdin position, and
01039                  *  may wreak additional havoc.
01040                  *  Exists in IRIX 3.3.1, Irix 4.0.5,
01041                  *  should be fixed in a later release.  Maybe.
01042                  */
01043                 bu_log("\nWarning:  stdin file pointer has been corrupted by SGI multi-processor bug!\n");
01044                 if( bu_debug & BU_DEBUG_PARALLEL )  {
01045                         bu_log("Original position was x%x, now position is x%x!\n", stdin_pos, ftell(stdin) );
01046                         bu_pr_FILE("saved stdin", &stdin_save);
01047                         bu_pr_FILE("current stdin", stdin);
01048                 }
01049                 fseek(stdin, stdin_pos, SEEK_SET);
01050                 if( ftell(stdin) != stdin_pos )  {
01051                         bu_log("WARNING: fseek() did not recover proper position.\n");
01052                 } else {
01053                         bu_log("It was fixed by fseek()\n");
01054                 }
01055         }
01056 #  endif /* sgi */
01057 
01058 #  if defined(n16)
01059         /* The shared memory size requirement is sheer guesswork */
01060         /* The stack size is also guesswork */
01061         if( task_init( 8*1024*1024, ncpu, bu_parallel_interface, 128*1024, 0 ) < 0 )
01062                 perror("bu_parallel()/task_init()");
01063 #  endif
01064 
01065         /*
01066          * multithreading support for SunOS 5.X / Solaris 2.x
01067          */
01068 #  if defined(SUNOS) && SUNOS >= 52
01069 
01070         thread = 0;
01071         nthreadc = 0;
01072 
01073         /* Give the thread system a hint... */
01074         if (ncpu > concurrency) {
01075                 if (thr_setconcurrency(ncpu)) {
01076                         fprintf(stderr, "ERROR parallel.c/bu_parallel(): thr_setconcurrency(%d) failed\n",
01077                                 ncpu);
01078                         bu_log("ERROR parallel.c/bu_parallel(): thr_setconcurrency(%d) failed\n",
01079                                ncpu);
01080                         /* Not much to do, lump it */
01081                 } else {
01082                         concurrency = ncpu;
01083                 }
01084         }
01085 
01086         /* Create the threads */
01087         for (x = 0; x < ncpu; x++)  {
01088 
01089                 if (thr_create(0, 0, (void *(*)(void *))bu_parallel_interface, 0, 0, &thread)) {
01090                         fprintf(stderr, "ERROR parallel.c/bu_parallel(): thr_create(0x0, 0x0, 0x%x, 0x0, 0, 0x%x) failed on processor %d\n",
01091                                 bu_parallel_interface, &thread, x);
01092                         bu_log("ERROR parallel.c/bu_parallel(): thr_create(0x0, 0x0, 0x%x, 0x0, 0, 0x%x) failed on processor %d\n",
01093                                 bu_parallel_interface, &thread, x);
01094                         /* Not much to do, lump it */
01095                 } else {
01096                         if( bu_debug & BU_DEBUG_PARALLEL )
01097                                 bu_log("bu_parallel(): created thread: (thread: 0x%x) (loop:%d) (nthreadc:%d)\n",
01098                                        thread, x, nthreadc);
01099 
01100                         thread_tbl[nthreadc] = thread;
01101                         nthreadc++;
01102                 }
01103         }
01104 
01105         if( bu_debug & BU_DEBUG_PARALLEL )
01106                 for (i = 0; i < nthreadc; i++)
01107                         bu_log("bu_parallel(): thread_tbl[%d] = 0x%x\n",
01108                                i, thread_tbl[i]);
01109 
01110         /*
01111          * Wait for completion of all threads.  We don't wait for
01112          * threads in order.  We wait for any old thread but we keep
01113          * track of how many have returned and whether it is one that we
01114          * started
01115          */
01116         thread = 0;
01117         nthreade = 0;
01118         for (x = 0; x < nthreadc; x++)  {
01119                 if( bu_debug & BU_DEBUG_PARALLEL )
01120                         bu_log("bu_parallel(): waiting for thread to complete:\t(loop:%d) (nthreadc:%d) (nthreade:%d)\n",
01121                                x, nthreadc, nthreade);
01122 
01123                 if (thr_join((rt_thread_t)0, &thread, NULL)) {
01124                         /* badness happened */
01125                         fprintf(stderr, "thr_join()");
01126                 }
01127 
01128                 /* Check to see if this is one the threads we created */
01129                 for (i = 0; i < nthreadc; i++) {
01130                         if (thread_tbl[i] == thread) {
01131                                 thread_tbl[i] = (rt_thread_t)-1;
01132                                 nthreade++;
01133                                 break;
01134                         }
01135                 }
01136 
01137                 if ((thread_tbl[i] != (rt_thread_t)-1) && i < nthreadc) {
01138                         bu_log("bu_parallel(): unknown thread %d completed.\n",
01139                                thread);
01140                 }
01141 
01142                 if( bu_debug & BU_DEBUG_PARALLEL )
01143                         bu_log("bu_parallel(): thread completed: (thread: %d)\t(loop:%d) (nthreadc:%d) (nthreade:%d)\n",
01144                                thread, x, nthreadc, nthreade);
01145         }
01146 
01147         if( bu_debug & BU_DEBUG_PARALLEL )
01148                 bu_log("bu_parallel(): %d threads created.  %d threads exited.\n",
01149                        nthreadc, nthreade);
01150 #  endif        /* SUNOS */
01151 
01152 #  if defined(HAVE_PTHREAD_H) && !defined(sgi)
01153 
01154         thread = 0;
01155         nthreadc = 0;
01156 
01157         /* XXX How to advise thread library that we need 'ncpu' processors? */
01158 
01159         /* Create the threads */
01160         for (x = 0; x < ncpu; x++)  {
01161                 pthread_attr_t attrs;
01162                 pthread_attr_init(&attrs);
01163                 pthread_attr_setstacksize(&attrs,10*1024*1024);
01164 
01165                 if (pthread_create(&thread, &attrs,
01166                     (void *(*)(void *))bu_parallel_interface, NULL)) {
01167                         fprintf(stderr, "ERROR parallel.c/bu_parallel(): thr_create(0x0, 0x0, 0x%lx, 0x0, 0, 0x%lx) failed on processor %d\n",
01168                                 (unsigned long int)bu_parallel_interface, (unsigned long int)&thread, x);
01169                         bu_log("ERROR parallel.c/bu_parallel(): thr_create(0x0, 0x0, 0x%x, 0x0, 0, 0x%x) failed on processor %d\n",
01170                                 bu_parallel_interface, &thread, x);
01171                         /* Not much to do, lump it */
01172                 } else {
01173                         if( bu_debug & BU_DEBUG_PARALLEL ) {
01174                                 bu_log("bu_parallel(): created thread: (thread: %d) (loop:%d) (nthreadc:%d)\n",
01175                                        thread, x, nthreadc);
01176                         }
01177 
01178                         thread_tbl[nthreadc] = thread;
01179                         nthreadc++;
01180                 }
01181         }
01182 
01183 
01184         if( bu_debug & BU_DEBUG_PARALLEL ) {
01185                 for (i = 0; i < nthreadc; i++) {
01186                         bu_log("bu_parallel(): thread_tbl[%d] = %d\n",
01187                                i, thread_tbl[i]);
01188                 }
01189 #    if defined(HAVE_RAISE) && defined(SIGINFO)
01190                 /* may be BSD-only (calls _thread_dump_info()) */
01191                 raise(SIGINFO);
01192 #    endif
01193         }
01194 
01195         /*
01196          * Wait for completion of all threads.
01197          * Wait for them in order.
01198          */
01199         thread = 0;
01200         nthreade = 0;
01201         for (x = 0; x < nthreadc; x++)  {
01202                 int ret;
01203 
01204                 if( bu_debug & BU_DEBUG_PARALLEL )
01205                         bu_log("bu_parallel(): waiting for thread x%x to complete:\t(loop:%d) (nthreadc:%d) (nthreade:%d)\n",
01206                                 thread_tbl[x], x, nthreadc, nthreade);
01207 
01208                 if ( (ret = pthread_join(thread_tbl[x], NULL)) != 0) {
01209                         /* badness happened */
01210                         fprintf(stderr, "pthread_join(thread_tbl[%d]=0x%x) ret=%d\n", x, (unsigned int)thread_tbl[x], ret);
01211                 }
01212                 nthreade++;
01213                 thread_tbl[x] = (rt_thread_t)-1;
01214 
01215                 if( bu_debug & BU_DEBUG_PARALLEL )
01216                         bu_log("bu_parallel(): thread completed: (thread: %d)\t(loop:%d) (nthreadc:%d) (nthreade:%d)\n",
01217                                thread, x, nthreadc, nthreade);
01218         }
01219 
01220         if( bu_debug & BU_DEBUG_PARALLEL )
01221                 bu_log("bu_parallel(): %d threads created.  %d threads exited.\n",
01222                        nthreadc, nthreade);
01223 
01224 #  endif /* end if posix threads */
01225 
01226         /*
01227          *  Ensure that all the threads are REALLY finished.
01228          *  On some systems, if threads core dump, the rest of
01229          *  the gang keeps going, so this can actually happen (sigh).
01230          */
01231         if( bu_nthreads_finished != bu_nthreads_started )  {
01232                 bu_log("*** ERROR bu_parallel(%d): %d workers did not finish!\n\n",
01233                         ncpu, ncpu - bu_nthreads_finished);
01234         }
01235         if( bu_nthreads_started != ncpu )  {
01236                 bu_log("bu_parallel() NOTICE:  only %d workers started, expected %d\n",
01237                         bu_nthreads_started, ncpu );
01238         }
01239 
01240         if( bu_debug & BU_DEBUG_PARALLEL )
01241                 bu_log("bu_parallel(%d) complete, now serial\n", ncpu);
01242 
01243 #  ifdef CHECK_PIDS
01244         /*
01245          * At this point, all multi-tasking activity should have ceased,
01246          * and we should be just a single UNIX process with our original
01247          * PID and open file table (kernel struct u).  If not, then any
01248          * output may be written into the wrong file.
01249          */
01250         if( bu_pid_of_initiating_thread != (x=getpid()) )  {
01251                 bu_log("WARNING: bu_parallel():  PID of initiating thread changed from %d to %d, open file table may be botched!\n",
01252                         bu_pid_of_initiating_thread, x );
01253         }
01254 #  endif
01255         bu_pid_of_initiating_thread = 0;        /* No threads any more */
01256 #else   /* PARALLEL */
01257         bu_log("bu_parallel( x%lx, %d., x%lx ):  Not compiled for PARALLEL machine, running single-threaded\n", (long)func, ncpu, (long)arg );
01258         /* do the work anyways */
01259         (*func)(0,arg);
01260 #endif  /* PARALLEL */
01261 
01262         return;
01263 }
01264 
01265 #if defined(sgi) && !defined(mips)
01266 /* Horrible bug in 3.3.1 and 3.4 and 3.5 -- hypot ruins stack! */
01267 long float
01268 hypot(a,b)
01269 double a,b;
01270 {
01271         return(sqrt(a*a+b*b));
01272 }
01273 #endif /* sgi */
01274 
01275 /*@}*/
01276 
01277 /*
01278  * Local Variables:
01279  * mode: C
01280  * tab-width: 8
01281  * c-basic-offset: 4
01282  * indent-tabs-mode: t
01283  * End:
01284  * ex: shiftwidth=4 tabstop=8
01285  */

Generated on Mon Sep 18 01:24:48 2006 for BRL-CAD by  doxygen 1.4.6