/[MITgcm]/MITgcm_contrib/llc_hires/llc_4320/code-async/recvTask.c
ViewVC logotype

Annotation of /MITgcm_contrib/llc_hires/llc_4320/code-async/recvTask.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.2 - (hide annotations) (download)
Fri Jan 10 16:50:36 2014 UTC (11 years, 6 months ago) by dimitri
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +70 -38 lines
File MIME type: text/plain
suncing llc_4320 with llc_2160 changes

1 dimitri 1.1
2     // Code to do the m-on-n fan-in, recompositing, and output, of data
3     // tiles for mitGCM.
4     //
5     // There are aspects of this code that would be simpler in C++, but
6     // we deliberately wrote the code in ansi-C to make linking it with
7     // the Fortran main code easier (should be able to just include the
8     // .o on the link line).
9    
10    
11     #include "PACKAGES_CONFIG.h"
12    
13     #include <stdio.h>
14     #include <unistd.h>
15     #include <stdlib.h>
16     #include <string.h>
17     #include <sys/types.h>
18     #include <sys/stat.h>
19     #include <fcntl.h>
20    
21     #include <mpi.h>
22     #include <alloca.h>
23    
24    
25     #include <stdint.h>
26     #include <errno.h>
27    
28     #define DEBUG 1
29    
30     #if (DEBUG >= 2)
31     #define FPRINTF fprintf
32     #else
33     #include <stdarg.h>
34     void FPRINTF(FILE *fp,...){return;}
35     #endif
36    
37    
38     // Define our own version of "assert", where we sleep rather than abort.
39     // This makes it easier to attach the debugger.
40     #if (DEBUG >= 1)
41     #define ASSERT(_expr) \
42     if (!(_expr)) { \
43     fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \
44     getpid(), #_expr, __FILE__, __LINE__, __func__); \
45     sleep(9999); \
46     }
47     #else
48     #define ASSERT assert
49     #endif
50    
51    
52    
53     // If numRanksPerNode is set to be > 0, we just use that setting.
54     // If it is <= 0, we dynamically determine the number of cores on
55     // a node, and use that. (N.B.: we assume *all* nodes being used in
56     // the run have the *same* number of cores.)
57     int numRanksPerNode = 0;
58    
59     // Just an error check; can be zero (if you're confident it's all correct).
60     #define numCheckBits 2
61     // This bitMask definition works for 2's complement
62     #define bitMask ((1 << numCheckBits) - 1)
63    
64    
65     ///////////////////////////////////////////////////////////////////////
66     // Info about the data fields
67    
68     // double for now. Might also be float someday
69     #define datumType double
70     #define datumSize (sizeof(datumType))
71    
72    
73     // Info about the data fields. We assume that all the fields have the
74     // same X and Y characteristics, but may have either 1 or NUM_Z levels.
75     //
76     // the following 3 values are required here. Could be sent over or read in
77     // with some rearrangement in init routines
78     //
79     #define NUM_X 4320
80     #define NUM_Y 56160L // get rid of this someday
81     #define NUM_Z 90
82     #define MULTDIM 7
83     #define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize)
84     #define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z)
85     #define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM)
86    
87     // Info about the data tiles. We assume that all the tiles are the same
88     // size (no odd-sized last piece), they all have the same X and Y
89     // characteristics (including ghosting), and are full depth in Z
90 dimitri 1.2 // (either 1, or NUM_Z, or MULTDIM, as appropriate).
91 dimitri 1.1 //
92     // all these data now sent over from compute ranks
93     //
94     int TILE_X = -1;
95     int TILE_Y = -1;
96     int XGHOSTS = -1;
97     int YGHOSTS = -1;
98    
99     // Size of one Z level of a tile (NOT one Z level of a whole field)
100     int tileOneZLevelItemCount = -1;
101     int tileOneZLevelSizeInBytes = -1;
102    
103    
104     typedef struct dataFieldDepth {
105     char dataFieldID;
106     int numZ;
107     } dataFieldDepth_t;
108    
109     dataFieldDepth_t fieldDepths[] = {
110 dimitri 1.2 { 'A', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h
111    
112     { 'B', 1 },
113     { 'C', 1 },
114     { 'D', 1 },
115     { 'E', 1 },
116     { 'F', 1 },
117     { 'G', 1 },
118     { 'H', 1 },
119     { 'I', 1 },
120     { 'J', 1 },
121     { 'K', 1 },
122     { 'L', 1 },
123     { 'M', 1 },
124     { 'N', 1 },
125     { 'O', 1 },
126     { 'P', 1 },
127     { 'Q', 1 },
128     { 'R', 1 },
129    
130     { 'S', NUM_Z },
131     { 'T', NUM_Z },
132     { 'U', NUM_Z },
133     { 'V', NUM_Z },
134     { 'W', NUM_Z },
135     { 'X', NUM_Z },
136     { 'Y', NUM_Z },
137 dimitri 1.1 };
138     #define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t))
139    
140    
141    
142     ///////////////////////////////////////////////////////////////////////
143     // Info about the various kinds of i/o epochs
144     typedef struct epochFieldInfo {
145     char dataFieldID;
146     MPI_Comm registrationIntercomm;
147     MPI_Comm dataIntercomm;
148     MPI_Comm ioRanksIntracomm;
149     int tileCount;
150     int zDepth; // duplicates the fieldDepth entry; filled in automatically
151     char filenameTemplate[128];
152     long int offset;
153     int pickup;
154     } fieldInfoThisEpoch_t;
155    
156     // The normal i/o dump
157     fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = {
158 dimitri 1.2 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "U.%010d.%s", 0, 0 },
159     { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "V.%010d.%s", 0, 0 },
160     { 'W', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "W.%010d.%s", 0, 0 },
161     { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Salt.%010d.%s", 0, 0 },
162     { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Theta.%010d.%s", 0,0 },
163     { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Eta.%010d.%s", 0,0 },
164    
165     { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIarea.%010d.%s", 0,0 },
166     { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIheff.%010d.%s", 0,0 },
167     { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsnow.%010d.%s", 0,0 },
168     { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIuice.%010d.%s", 0,0 },
169     { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIvice.%010d.%s", 0,0 },
170     { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsalt.%010d.%s", 0,0 },
171    
172     //{ 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "EtaHnm1.%010d.%s", 0,0 },
173     { 'I', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUX.%010d.%s", 0,0 },
174     { 'J', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUY.%010d.%s", 0,0 },
175     { 'K', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "KPPhbl.%010d.%s", 0,0 },
176     { 'L', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceSflux.%010d.%s", 0,0 },
177    
178     { 'M', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceFWflx.%010d.%s", 0,0 },
179     { 'O', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQnet.%010d.%s", 0,0 },
180     { 'P', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "PhiBot.%010d.%s", 0,0 },
181     { 'Q', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQsw.%010d.%s", 0,0 },
182     //{ 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "dEtaHdt.%010d.%s", 0,0 },
183    
184 dimitri 1.1 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0,0 },
185     };
186    
187    
188     // pickup file
189     fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = {
190 dimitri 1.2 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1},
191     { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1},
192     { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1},
193     { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1},
194     { 'X', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1},
195     { 'Y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1},
196     { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1},
197     { 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1},
198     { 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1},
199 dimitri 1.1 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 ,1},
200     };
201    
202    
203     // seaice pickup
204     fieldInfoThisEpoch_t fieldsForEpochStyle_2[] = {
205 dimitri 1.2 { 'A', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2},
206     { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2},
207     { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2},
208     { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2},
209     { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2},
210     { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2},
211     { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2},
212 dimitri 1.1 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 },
213     };
214    
215    
216     fieldInfoThisEpoch_t *epochStyles[] = {
217     fieldsForEpochStyle_0,
218     fieldsForEpochStyle_1,
219     fieldsForEpochStyle_2,
220     };
221     int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *));
222    
223    
224     typedef enum {
225     cmd_illegal,
226     cmd_newEpoch,
227     cmd_epochComplete,
228     cmd_exit,
229     } epochCmd_t;
230    
231    
232     ///////////////////////////////////////////////////////////////////////
233    
234     // Note that a rank will only have access to one of the Intracomms,
235     // but all ranks will define the Intercomm.
236     MPI_Comm ioIntracomm = MPI_COMM_NULL;
237     int ioIntracommRank = -1;
238     MPI_Comm computeIntracomm = MPI_COMM_NULL;
239     int computeIntracommRank = -1;
240     MPI_Comm globalIntercomm = MPI_COMM_NULL;
241    
242    
243     #define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y))
244     #define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y))
245    
246    
247    
248     typedef enum {
249     bufState_illegal,
250     bufState_Free,
251     bufState_InUse,
252     } bufState_t;
253    
254    
255     typedef struct buf_header{
256     struct buf_header *next;
257     bufState_t state;
258     int requestsArraySize;
259     MPI_Request *requests;
260     uint64_t ck0;
261     char *payload; // [tileOneZLevelSizeInBytes * NUM_Z]; // Max payload size
262     uint64_t ck1; // now rec'vd from compute ranks & dynamically allocated in
263     } bufHdr_t; // allocateTileBufs
264    
265    
266     bufHdr_t *freeTileBufs_ptr = NULL;
267     bufHdr_t *inUseTileBufs_ptr = NULL;
268    
269     int maxTagValue = -1;
270     int totalNumTiles = -1;
271    
272     // routine to convert double to float during memcpy
273     // need to get byteswapping in here as well
274     memcpy_r8_2_r4 (float *f, double *d, long long *n)
275     {
276     long long i, rem;
277     rem = *n%16LL;
278     for (i = 0; i < rem; i++) {
279     f [i] = d [i];
280     }
281     for (i = rem; i < *n; i += 16) {
282     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) );
283     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) );
284     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0");
285     __asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory");
286     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1");
287     __asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory");
288     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2");
289     __asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory");
290     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3");
291     __asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory");
292     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4");
293     __asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory");
294     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5");
295     __asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory");
296     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6");
297     __asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory");
298     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7");
299     __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) );
300     __asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory");
301     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8");
302     __asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory");
303     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9");
304     __asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory");
305     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10");
306     __asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory");
307     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11");
308     __asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory");
309     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12");
310     __asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory");
311     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13");
312     __asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory");
313     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14");
314     __asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory");
315     __asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15");
316     __asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory");
317     }
318     }
319    
320     // Debug routine
321     countBufs(int nbufs)
322     {
323     int nInUse, nFree;
324     bufHdr_t *bufPtr;
325     static int target = -1;
326    
327     if (-1 == target) {
328     ASSERT(-1 != nbufs);
329     target = nbufs;
330     }
331    
332     bufPtr = freeTileBufs_ptr;
333     for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1;
334    
335     bufPtr = inUseTileBufs_ptr;
336     for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1;
337    
338     if (nInUse + nFree != target) {
339     fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n",
340     ioIntracommRank, nFree, nInUse, target);
341     sleep(5000);
342     }
343     }
344    
345     int readn(int fd, void *p, int nbytes)
346     {
347    
348     char *ptr = (char*)(p);
349    
350     int nleft, nread;
351    
352     nleft = nbytes;
353     while (nleft > 0){
354     nread = read(fd, ptr, nleft);
355     if (nread < 0)
356     return(nread); // error
357     else if (nread == 0)
358     break; // EOF
359    
360     nleft -= nread;
361     ptr += nread;
362     }
363     return (nbytes - nleft);
364     }
365    
366    
367     ssize_t writen(int fd, void *p, size_t nbytes)
368     {
369     char *ptr = (char*)(p);
370    
371     size_t nleft;
372     ssize_t nwritten;
373    
374     nleft = nbytes;
375     while (nleft > 0){
376     nwritten = write(fd, ptr, nleft);
377     if (nwritten <= 0){
378     if (errno==EINTR) continue; // POSIX, not SVr4
379     return(nwritten); // non-EINTR error
380     }
381    
382     nleft -= nwritten;
383     ptr += nwritten;
384     }
385     return(nbytes - nleft);
386     }
387    
388    
389     void write_pickup_meta(FILE *fp, int gcmIter, int pickup)
390     {
391     int i;
392     int ndims = 2;
393     int nrecords,nfields;
394     char**f;
395     char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"};
396     char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"};
397     // for now, just list the fields here. When the whole field specification apparatus
398     // is cleaned up, pull the names out of the epochstyle definition or whatever
399    
400     ASSERT(1==pickup||2==pickup);
401    
402     if (1==pickup){
403     nrecords = 6*NUM_Z+3;
404     nfields = sizeof(fld1)/sizeof(char*);
405     f = fld1;
406     }
407     else if (2==pickup){
408     nrecords = MULTDIM+6;
409     nfields = sizeof(fld2)/sizeof(char*);
410     f = fld2;
411     }
412    
413     fprintf(fp," nDims = [ %3d ];\n",ndims);
414     fprintf(fp," dimList = [\n");
415     fprintf(fp," %10u,%10d,%10u,\n",NUM_X,1,NUM_X);
416     fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y);
417     fprintf(fp," ];\n");
418     fprintf(fp," dataprec = [ 'float64' ];\n");
419     fprintf(fp," nrecords = [ %5d ];\n",nrecords);
420     fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter);
421     fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be?
422     fprintf(fp," nFlds = [ %4d ];\n",nfields);
423     fprintf(fp," fldList = {\n");
424     for (i=0;i<nfields;++i)
425     fprintf(fp," '%-8s'",f[i]);
426     fprintf(fp,"\n };\n");
427     }
428    
429    
430    
431     double *outBuf=NULL;//[NUM_X*NUM_Y*NUM_Z]; // only needs to be myNumZSlabs
432     size_t outBufSize=0;
433    
434    
435     void
436     do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int firstZ, int numZ, int gcmIter)
437     {
438     if (0==numZ) return; // this is *not* global NUM_Z : change name of parameter to avoid grief!
439    
440     int pickup = whichField->pickup;
441    
442     ///////////////////////////////
443     // swap here, if necessary
444     // int i,j;
445    
446     //i = NUM_X*NUM_Y*numZ;
447     //mds_byteswapr8_(&i,outBuf);
448    
449     // mds_byteswapr8 expects an integer count, which is gonna overflow someday
450     // can't redefine to long without affecting a bunch of other calls
451     // so do a slab at a time here, to delay the inevitable
452     // i = NUM_X*NUM_Y;
453     //for (j=0;j<numZ;++j)
454     // mds_byteswapr8_(&i,&outBuf[i*j]);
455    
456     // gnu builtin evidently honored by intel compilers
457    
458     if (pickup) {
459     uint64_t *alias = (uint64_t*)outBuf;
460     size_t i;
461     for (i=0;i<NUM_X*NUM_Y*numZ;++i)
462     alias[i] = __builtin_bswap64(alias[i]);
463     }
464     else {
465     uint32_t *alias = (uint32_t*)outBuf;
466     size_t i;
467     for (i=0;i<NUM_X*NUM_Y*numZ;++i)
468     alias[i] = __builtin_bswap32(alias[i]);
469     }
470    
471     // end of swappiness
472     //////////////////////////////////
473    
474     char s[1024];
475     //sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID);
476    
477     sprintf(s,whichField->filenameTemplate,gcmIter,"data");
478    
479     int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP);
480     ASSERT(fd!=-1);
481    
482     size_t nbytes;
483    
484     if (pickup) {
485     lseek(fd,whichField->offset,SEEK_SET);
486     lseek(fd,firstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR);
487     nbytes = NUM_X*NUM_Y*numZ*datumSize;
488     }
489     else {
490     lseek(fd,firstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR);
491     nbytes = NUM_X*NUM_Y*numZ*sizeof(float);
492     }
493    
494     ssize_t bwrit = writen(fd,outBuf,nbytes);
495    
496     if (-1==bwrit) perror("Henze : file write problem : ");
497    
498     FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,firstZ,numZ);
499    
500     // ASSERT(nbytes == bwrit);
501    
502     if (nbytes!=bwrit)
503     fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes);
504    
505     close(fd);
506    
507    
508     return;
509     }
510    
511    
512     int NTILES = -1;
513    
514     typedef struct {
515     int off;
516     int skip;
517     } tile_layout_t;
518    
519     tile_layout_t *offsetTable;
520    
521     void
522     processSlabSection(
523     fieldInfoThisEpoch_t *whichField,
524     int tileID,
525     void *data,
526     int myNumZSlabs)
527     {
528     int intracommSize,intracommRank;
529     // MPI_Comm_size(whichField->ioRanksIntracomm, &intracommSize);
530     MPI_Comm_rank(whichField->ioRanksIntracomm, &intracommRank);
531     //printf("i/o rank %d/%d recv'd %d::%d (%d->%d)\n",intracommRank,intracommSize,whichField,tileID,firstZ,lastZ);
532     // printf("rank %d : tile %d is gonna go at %d and stride with %d, z = %d\n",intracommRank,tileID,offsetTable[tileID].off,
533     // offsetTable[tileID].skip,myNumZSlabs);
534    
535     int z;
536    
537     int pickup = whichField->pickup;
538    
539     //ASSERT((tileID > 0) && (tileID < (sizeof(offsetTable)/sizeof(tile_layout_t))));
540     ASSERT( (tileID > 0) && (tileID <= NTILES) ); // offsetTable now dynamically allocated
541    
542     if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){
543    
544     free(outBuf);
545    
546     outBufSize = myNumZSlabs * twoDFieldSizeInBytes;
547    
548     outBuf = malloc(outBufSize);
549     ASSERT(outBuf);
550    
551     memset(outBuf,0,outBufSize);
552     }
553    
554     for (z=0;z<myNumZSlabs;++z){
555    
556     off_t zoffset = z*TILE_X*TILE_Y*NTILES; //NOT totalNumTiles;
557     off_t hoffset = offsetTable[tileID].off;
558     off_t skipdst = offsetTable[tileID].skip;
559    
560     //double *dst = outBuf + zoffset + hoffset;
561     void *dst;
562     if (pickup)
563     dst = outBuf + zoffset + hoffset;
564     else
565     dst = (float*)outBuf + zoffset + hoffset;
566    
567     off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS);
568     off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS;
569     double *src = (double*)data + zoff + hoff;
570    
571     off_t skipsrc = TILE_X+2*XGHOSTS;
572    
573     //fprintf(stderr,"rank %d tile %d offset %d skip %d dst %x src %x\n",intracommRank,tileID,hoffset,skipdst,dst,src);
574    
575     long long n = TILE_X;
576    
577     int y;
578     if (pickup)
579     for (y=0;y<TILE_Y;++y)
580     memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize);
581     else
582     for (y=0;y<TILE_Y;++y)
583     memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n);
584     }
585    
586     return;
587     }
588    
589    
590    
591     // Allocate some buffers to receive tile-data from the compute ranks
592     void
593     allocateTileBufs(int numTileBufs, int maxIntracommSize)
594     {
595    
596     ASSERT(tileOneZLevelSizeInBytes>0); // be sure we have rec'vd values by now
597    
598     int i, j;
599     for (i = 0; i < numTileBufs; ++i) {
600    
601     bufHdr_t *newBuf = malloc(sizeof(bufHdr_t));
602     ASSERT(NULL != newBuf);
603    
604     newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z);
605     ASSERT(NULL != newBuf->payload);
606    
607     newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request));
608     ASSERT(NULL != newBuf->requests);
609    
610     // Init some values
611     newBuf->requestsArraySize = maxIntracommSize;
612     for (j = 0; j < maxIntracommSize; ++j) {
613     newBuf->requests[j] = MPI_REQUEST_NULL;
614     }
615     newBuf->ck0 = newBuf->ck1 = 0xdeadbeefdeadbeef;
616    
617     // Put the buf into the free list
618     newBuf->state = bufState_Free;
619     newBuf->next = freeTileBufs_ptr;
620     freeTileBufs_ptr = newBuf;
621     }
622     }
623    
624    
625    
626     bufHdr_t *
627     getFreeBuf()
628     {
629     int j;
630     bufHdr_t *rtnValue = freeTileBufs_ptr;
631    
632     if (NULL != rtnValue) {
633     ASSERT(bufState_Free == rtnValue->state);
634     freeTileBufs_ptr = rtnValue->next;
635     rtnValue->next = NULL;
636    
637     // Paranoia. This should already be the case
638     for (j = 0; j < rtnValue->requestsArraySize; ++j) {
639     rtnValue->requests[j] = MPI_REQUEST_NULL;
640     }
641     }
642     return rtnValue;
643     }
644    
645    
646     /////////////////////////////////////////////////////////////////
647    
648    
649     bufHdr_t *
650     tryToReceiveDataTile(
651     MPI_Comm dataIntercomm,
652     int epochID,
653     size_t expectedMsgSize,
654     int *tileID)
655     {
656     bufHdr_t *bufHdr;
657     int pending, i, count;
658     MPI_Status mpiStatus;
659    
660     MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm,
661     &pending, &mpiStatus);
662    
663     // If no data are pending, or if we can't get a buffer to
664     // put the pending data into, return NULL
665     if (!pending) return NULL;
666     if (((bufHdr = getFreeBuf()) == NULL)) {
667     FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n",
668     mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits);
669     return NULL;
670     }
671    
672     // Do sanity checks on the pending message
673     MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
674     ASSERT(count == expectedMsgSize);
675     ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask));
676    
677     // Recieve the data we saw in the iprobe
678     MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE,
679     mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG,
680     dataIntercomm, &mpiStatus);
681     bufHdr->state = bufState_InUse;
682    
683     // Overrun check
684     ASSERT(bufHdr->ck0==0xdeadbeefdeadbeef);
685     ASSERT(bufHdr->ck0==bufHdr->ck1);
686    
687     // Return values
688     *tileID = mpiStatus.MPI_TAG >> numCheckBits;
689    
690    
691     FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n",
692     mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE);
693    
694     return bufHdr;
695     }
696    
697    
698    
699     int
700     tryToReceiveZSlab(
701     void *buf,
702     int expectedMsgSize,
703     MPI_Comm intracomm)
704     {
705     MPI_Status mpiStatus;
706     int pending, count;
707    
708     MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus);
709     if (!pending) return -1;
710    
711     MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
712     ASSERT(count == expectedMsgSize);
713    
714     MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE,
715     mpiStatus.MPI_TAG, intracomm, &mpiStatus);
716    
717     FPRINTF(stderr, "recv slab %d from rank %d\n",
718     mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE);
719    
720     // return the tileID
721     return mpiStatus.MPI_TAG;
722     }
723    
724    
725    
726     void
727     redistributeZSlabs(
728     bufHdr_t *bufHdr,
729     int tileID,
730     int zSlabsPer,
731     int thisFieldNumZ,
732     MPI_Comm intracomm)
733     {
734     int pieceSize = zSlabsPer * tileOneZLevelSizeInBytes;
735     int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ;
736     int offset = 0;
737     int recvRank = 0;
738    
739     while ((offset + pieceSize) <= tileSizeInBytes) {
740    
741     MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
742     tileID, intracomm, &(bufHdr->requests[recvRank]));
743     ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
744    
745     offset += pieceSize;
746     recvRank += 1;
747     }
748    
749     // There might be one last odd-sized piece
750     if (offset < tileSizeInBytes) {
751     pieceSize = tileSizeInBytes - offset;
752     ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0);
753    
754     MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
755     tileID, intracomm, &(bufHdr->requests[recvRank]));
756     ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
757    
758     offset += pieceSize;
759     recvRank += 1;
760     }
761    
762     // Sanity check
763     ASSERT(recvRank <= bufHdr->requestsArraySize);
764     while (recvRank < bufHdr->requestsArraySize) {
765     ASSERT(MPI_REQUEST_NULL == bufHdr->requests[recvRank++])
766     }
767     }
768    
769    
770    
771     /////////////////////////////////////////////////////////////////
772     // This is called by the i/o ranks
773     void
774     doNewEpoch(int epochID, int epochStyleIndex, int gcmIter)
775     {
776     // In an i/o epoch, the i/o ranks are partitioned into groups,
777     // each group dealing with one field. The ranks within a group:
778     // (1) receive data tiles from the compute ranks
779     // (2) slice the received data tiles into slabs in the 'z'
780     // dimension, and redistribute the tile-slabs among the group.
781     // (3) receive redistributed tile-slabs, and reconstruct a complete
782     // field-slab for the whole field.
783     // (4) Write the completed field-slab to disk.
784    
785     fieldInfoThisEpoch_t *fieldInfo;
786     int intracommRank, intracommSize;
787    
788     int zSlabsPer;
789     int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv;
790    
791     int numTilesRecvd, numSlabPiecesRecvd;
792    
793    
794     // Find the descriptor for my assigned field for this epoch style.
795     // It is the one whose dataIntercomm is not null
796     fieldInfo = epochStyles[epochStyleIndex];
797     while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo;
798     ASSERT('\0' != fieldInfo->dataFieldID);
799    
800    
801    
802     MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank);
803     MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize);
804    
805    
806     // Compute which z slab(s) we will be reassembling.
807     zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize);
808     myNumZSlabs = zSlabsPer;
809    
810     // Adjust myNumZSlabs in case it didn't divide evenly
811     myFirstZSlab = intracommRank * myNumZSlabs;
812     if (myFirstZSlab >= fieldInfo->zDepth) {
813     myNumZSlabs = 0;
814     } else if ((myFirstZSlab + myNumZSlabs) > fieldInfo->zDepth) {
815     myNumZSlabs = fieldInfo->zDepth - myFirstZSlab;
816     } else {
817     myNumZSlabs = zSlabsPer;
818     }
819     myLastZSlab = myFirstZSlab + myNumZSlabs - 1;
820    
821     // If we were not assigned any z-slabs, we don't get any redistributed
822     // tile-slabs. If we were assigned one or more slabs, we get
823     // redistributed tile-slabs for every tile.
824     myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : totalNumTiles;
825    
826    
827     numTilesRecvd = 0;
828     numSlabPiecesRecvd = 0;
829    
830     ////////////////////////////////////////////////////////////////////
831     // Main loop. Handle tiles from the current epoch
832     for(;;) {
833     bufHdr_t *bufHdr;
834    
835     ////////////////////////////////////////////////////////////////
836     // Check for tiles from the computational tasks
837     while (numTilesRecvd < fieldInfo->tileCount) {
838     int tileID = -1;
839     size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth;
840     bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID,
841     msgSize, &tileID);
842     if (NULL == bufHdr) break; // No tile was received
843    
844     numTilesRecvd += 1;
845     redistributeZSlabs(bufHdr, tileID, zSlabsPer,
846     fieldInfo->zDepth, fieldInfo->ioRanksIntracomm);
847    
848     // Add the bufHdr to the "in use" list
849     bufHdr->next = inUseTileBufs_ptr;
850     inUseTileBufs_ptr = bufHdr;
851    
852    
853     }
854    
855    
856     ////////////////////////////////////////////////////////////////
857     // Check for tile-slabs redistributed from the i/o processes
858     while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) {
859     int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
860     char data[msgSize];
861    
862     int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
863     if (tileID < 0) break; // No slab was received
864    
865    
866     numSlabPiecesRecvd += 1;
867     processSlabSection(fieldInfo, tileID, data, myNumZSlabs);
868    
869    
870     // Can do the write here, or at the end of the epoch.
871     // Probably want to make it asynchronous (waiting for
872     // completion at the barrier at the start of the next epoch).
873     //if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) {
874     // ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
875     // do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs);
876     //}
877     }
878    
879    
880     ////////////////////////////////////////////////////////////////
881     // Check if we can release any of the inUse buffers (i.e. the
882     // isends we used to redistribute those z-slabs have completed).
883     // We do this by detaching the inUse list, then examining each
884     // element in the list, putting each element either into the
885     // free list or back into the inUse list, as appropriate.
886     bufHdr = inUseTileBufs_ptr;
887     inUseTileBufs_ptr = NULL;
888     while (NULL != bufHdr) {
889     int count = 0;
890     int completions[intracommSize];
891     MPI_Status statuses[intracommSize];
892    
893     // Acquire the "next" bufHdr now, before we change anything.
894     bufHdr_t *nextBufHeader = bufHdr->next;
895    
896     // Check to see if the Isend requests have completed for this buf
897     MPI_Testsome(intracommSize, bufHdr->requests,
898     &count, completions, statuses);
899    
900     if (MPI_UNDEFINED == count) {
901     // A return of UNDEFINED means that none of the requests
902     // is still outstanding, i.e. they have all completed.
903     // Put the buf on the free list.
904     bufHdr->state = bufState_Free;
905     bufHdr->next = freeTileBufs_ptr;
906     freeTileBufs_ptr = bufHdr;
907     FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank);
908     } else {
909     // At least one request is still outstanding.
910     // Put the buf on the inUse list.
911     bufHdr->next = inUseTileBufs_ptr;
912     inUseTileBufs_ptr = bufHdr;
913     }
914    
915     // Countinue with the next buffer
916     bufHdr = nextBufHeader;
917     }
918    
919    
920     ////////////////////////////////////////////////////////////////
921     // Check if the epoch is complete
922     if ((numTilesRecvd >= fieldInfo->tileCount) &&
923     (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) &&
924     (NULL == inUseTileBufs_ptr))
925     {
926     ASSERT(numTilesRecvd == fieldInfo->tileCount);
927     ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
928    
929     //fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs);
930    
931     // Ok, wrap up the current i/o epoch
932     // Probably want to make this asynchronous (waiting for
933     // completion at the start of the next epoch).
934     do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter);
935    
936     // The epoch is complete
937     return;
938     }
939     }
940    
941     }
942    
943    
944    
945    
946    
947    
948    
949     // Main routine for the i/o tasks. Callers DO NOT RETURN from
950     // this routine; they MPI_Finalize and exit.
951     void
952     ioRankMain(int totalNumTiles)
953     {
954     // This is one of a group of i/o processes that will be receiving tiles
955     // from the computational processes. This same process is also
956     // responsible for compositing slices of tiles together into one or
957     // more complete slabs, and then writing those slabs out to disk.
958     //
959     // When a tile is received, it is cut up into slices, and each slice is
960     // sent to the appropriate compositing task that is dealing with those
961     // "z" level(s) (possibly including ourselves). These are tagged with
962     // the tile-id. When we *receive* such a message (from ourselves, or
963     // from any other process in this group), we unpack the data (stripping
964     // the ghost cells), and put them into the slab we are assembling. Once
965     // a slab is fully assembled, it is written to disk.
966    
967     int i, size, count, numTileBufs;
968     MPI_Status status;
969     int currentEpochID = 0;
970     int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0;
971    
972     int numComputeRanks, epochStyleIndex;
973     MPI_Comm_remote_size(globalIntercomm, &numComputeRanks);
974    
975    
976     ////////////////////////////////////////////////////////////
977     //// get global tile info via bcast over the global intercom
978    
979    
980     // int ierr = MPI_Bcast(&NTILES,1,MPI_INT,0,globalIntercomm);
981    
982     int data[9];
983    
984     int ierr = MPI_Bcast(data,9,MPI_INT,0,globalIntercomm);
985    
986     NTILES = data[3];
987     TILE_X = data[5];
988     TILE_Y = data[6];
989     XGHOSTS = data[7];
990     YGHOSTS = data[8];
991    
992     tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
993     tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // also calculated in compute ranks, urghh
994    
995     int *xoff = malloc(NTILES*sizeof(int));
996     ASSERT(xoff);
997     int *yoff = malloc(NTILES*sizeof(int));
998     ASSERT(yoff);
999     int *xskip = malloc(NTILES*sizeof(int));
1000     ASSERT(xskip);
1001    
1002     offsetTable = malloc((NTILES+1)*sizeof(tile_layout_t));
1003     ASSERT(offsetTable);
1004    
1005     ierr = MPI_Bcast(xoff,NTILES,MPI_INT,0,globalIntercomm);
1006     ierr = MPI_Bcast(yoff,NTILES,MPI_INT,0,globalIntercomm);
1007     ierr = MPI_Bcast(xskip,NTILES,MPI_INT,0,globalIntercomm);
1008    
1009     for (i=0;i<NTILES;++i){
1010     offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1;
1011     offsetTable[i+1].skip = xskip[i];
1012     }
1013    
1014     free(xoff);
1015     free(yoff);
1016     free(xskip);
1017    
1018     ///////////////////////////////////////////////////////////////
1019    
1020    
1021     // At this point, the multitude of various inter-communicators have
1022     // been created and stored in the various epoch "style" arrays.
1023     // The next step is to have the computational tasks "register" the
1024     // data tiles they will send. In this version of the code, we only
1025     // track and store *counts*, not the actual tileIDs.
1026     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1027     fieldInfoThisEpoch_t *p = NULL;
1028     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1029     int thisIntracommSize;
1030    
1031     // Find the field we were assigned for this epoch style
1032     int curFieldIndex = -1;
1033     while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1034     p = &(thisEpochStyle[curFieldIndex]);
1035     if (MPI_COMM_NULL != p->registrationIntercomm) break;
1036     }
1037     ASSERT(NULL != p);
1038     ASSERT('\0' != p->dataFieldID);
1039     MPI_Comm_size(p->ioRanksIntracomm, &size);
1040     if (size > maxIntracommSize) maxIntracommSize = size;
1041    
1042    
1043     // Receive a message from *each* computational rank telling us
1044     // a count of how many tiles that rank will send during epochs
1045     // of this style. Note that some of these counts may be zero
1046     for (i = 0; i < numComputeRanks; ++i) {
1047     MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG,
1048     p->registrationIntercomm, &status);
1049     p->tileCount += status.MPI_TAG;
1050     }
1051     if (p->tileCount > maxTileCount) maxTileCount = p->tileCount;
1052     if (p->zDepth > 1) {
1053     if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount;
1054     }
1055    
1056     // Sanity check
1057     MPI_Allreduce (&(p->tileCount), &count, 1,
1058     MPI_INT, MPI_SUM, p->ioRanksIntracomm);
1059     ASSERT(count == totalNumTiles);
1060     }
1061    
1062    
1063     // In some as-of-yet-undetermined fashion, we decide how many
1064     // buffers to allocate to hold tiles received from the computational
1065     // tasks. The number of such buffers is more-or-less arbitrary (the
1066     // code should be able to function with any number greater than zero).
1067     // More buffers means more parallelism, but uses more space.
1068     // Note that maxTileCount is an upper bound on the number of buffers
1069     // that we can usefully use. Note also that if we are assigned a 2D
1070     // field, we might be assigned to recieve a very large number of tiles
1071     // for that field in that epoch style.
1072    
1073     // Hack - for now, just pick a value for numTileBufs
1074     numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount;
1075     if (numTileBufs < 4) numTileBufs = 4;
1076     if (numTileBufs > 15) numTileBufs = 15;
1077     // if (numTileBufs > 25) numTileBufs = 25;
1078    
1079     allocateTileBufs(numTileBufs, maxIntracommSize);
1080     countBufs(numTileBufs);
1081    
1082    
1083     ////////////////////////////////////////////////////////////////////
1084     // Main loop.
1085     ////////////////////////////////////////////////////////////////////
1086    
1087     for (;;) {
1088     int cmd[4];
1089    
1090     // Make sure all the i/o ranks have completed processing the prior
1091     // cmd (i.e. the prior i/o epoch is complete).
1092     MPI_Barrier(ioIntracomm);
1093    
1094     if (0 == ioIntracommRank) {
1095     fprintf(stderr, "I/O ranks waiting for new epoch\n");
1096     MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm);
1097    
1098     MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE);
1099     fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d\n", cmd[1],cmd[3]);
1100    
1101     // before we start a new epoch, have i/o rank 0:
1102     // determine output filenames for this epoch
1103     // clean up any extant files with same names
1104     // write .meta files
1105    
1106     if (cmd_exit != cmd[0]){
1107    
1108     fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]);
1109    
1110     int epochStyle = cmd[2];
1111     int gcmIter = cmd[3];
1112    
1113     fieldInfoThisEpoch_t *fieldInfo;
1114     fieldInfo = epochStyles[epochStyle];
1115     char s[1024];
1116     int res;
1117     FILE *fp;
1118    
1119     if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields
1120     char f;
1121     while (f = fieldInfo->dataFieldID){
1122     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1123     fprintf(stderr,"%s\n",s);
1124     res = unlink(s);
1125     if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1126    
1127     // skip writing meta files for non-pickup fields
1128     /*
1129     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1130     fp = fopen(s,"w+");
1131     fclose(fp);
1132     */
1133    
1134     ++fieldInfo;
1135    
1136     }
1137     }
1138    
1139     else { // single pickup or pickup_seaice file
1140    
1141     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1142     fprintf(stderr,"%s\n",s);
1143     res = unlink(s);
1144     if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1145    
1146     sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1147     fp = fopen(s,"w+");
1148     write_pickup_meta(fp, gcmIter, fieldInfo->pickup);
1149     fclose(fp);
1150    
1151     }
1152     }
1153     }
1154     MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm);
1155    
1156     switch (cmd[0]) {
1157    
1158     case cmd_exit:
1159     // Don't bother trying to disconnect and free the
1160     // plethora of communicators; just exit.
1161     FPRINTF(stderr,"Received shutdown message\n");
1162     MPI_Finalize();
1163     exit(0);
1164     break;
1165    
1166     case cmd_newEpoch:
1167     if ((currentEpochID + 1) != cmd[1]) {
1168     fprintf(stderr, "ERROR: Missing i/o epoch? was %d, "
1169     "but is now %d ??\n", currentEpochID, cmd[1]);
1170     sleep(1); // Give the message a chance to propagate
1171     abort();
1172     }
1173     currentEpochID = cmd[1];
1174    
1175     memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined
1176    
1177     doNewEpoch(cmd[1], cmd[2], cmd[3]);
1178     break;
1179    
1180     default:
1181     fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n",
1182     cmd[0], cmd[1], cmd[2], cmd[3]);
1183     sleep(1);
1184     abort();
1185     break;
1186     }
1187    
1188     }
1189    
1190     }
1191    
1192    
1193    
1194     ///////////////////////////////////////////////////////////////////////////////////
1195    
1196     int
1197     findField(const char c)
1198     {
1199     int i;
1200     for (i = 0; i < numAllFields; ++i) {
1201     if (c == fieldDepths[i].dataFieldID) return i;
1202     }
1203    
1204     // Give the error message a chance to propagate before exiting.
1205     fprintf(stderr, "ERROR: Field not found: '%c'\n", c);
1206     sleep(1);
1207     abort();
1208     }
1209    
1210    
1211    
1212     // Given a number of ranks and a set of fields we will want to output,
1213     // figure out how to distribute the ranks among the fields.
1214     void
1215     computeIORankAssigments(
1216     int numComputeRanks,
1217     int numIORanks,
1218     int numFields,
1219     fieldInfoThisEpoch_t *fields,
1220     int assignments[])
1221     {
1222     int i,j,k, sum, count;
1223    
1224     int numIONodes = numIORanks / numRanksPerNode;
1225     int numIORanksThisField[numFields];
1226     long int bytesPerIORankThisField[numFields];
1227     int expectedMessagesPerRankThisField[numFields];
1228     long int fieldSizes[numFields];
1229    
1230     struct ioNodeInfo_t {
1231     int expectedNumMessagesThisNode;
1232     int numCoresAssigned;
1233     int *assignedFieldThisCore;
1234     } ioNodeInfo[numIONodes];
1235    
1236     // Since numRanksPerNode might be dynamically determined, we have
1237     // to dynamically allocate the assignedFieldThisCore arrays.
1238     for (i = 0; i < numIONodes; ++i) {
1239     ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode);
1240     ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore);
1241     }
1242    
1243     ASSERT((numIONodes*numRanksPerNode) == numIORanks);
1244    
1245    
1246     //////////////////////////////////////////////////////////////
1247     // Compute the size for each field in this epoch style
1248     for (i = 0; i < numFields; ++i) {
1249     ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth));
1250     fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth;
1251     }
1252    
1253    
1254     /////////////////////////////////////////////////////////
1255     // Distribute the available i/o ranks among the fields,
1256     // proportionally based on field size.
1257    
1258     // First, assign one rank per field
1259     ASSERT(numIORanks >= numFields);
1260     for (i = 0; i < numFields; ++i) {
1261     numIORanksThisField[i] = 1;
1262     bytesPerIORankThisField[i] = fieldSizes[i];
1263     }
1264    
1265     // Now apportion any extra ranks
1266     for (; i < numIORanks; ++i) {
1267    
1268     // Find the field 'k' with the most bytesPerIORank
1269     k = 0;
1270     for (j = 1; j < numFields; ++j) {
1271     if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) {
1272     k = j;
1273     }
1274     }
1275    
1276     // Assign an i/o rank to that field
1277     numIORanksThisField[k] += 1;
1278     bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k];
1279     }
1280    
1281     ////////////////////////////////////////////////////////////
1282     // At this point, all the i/o ranks should be apportioned
1283     // among the fields. Check we didn't screw up the count.
1284     for (sum = 0, i = 0; i < numFields; ++i) {
1285     sum += numIORanksThisField[i];
1286     }
1287     ASSERT(sum == numIORanks);
1288    
1289    
1290    
1291     //////////////////////////////////////////////////////////////////
1292     // The *number* of i/o ranks assigned to a field is based on the
1293     // field size. But the *placement* of those ranks is based on
1294     // the expected number of messages the process will receive.
1295     // [In particular, if we have several fields each with only one
1296     // assigned i/o rank, we do not want to place them all on the
1297     // same node if we don't have to.] This traffic-load balance
1298     // strategy is an approximation at best since the messages may
1299     // not all be the same size (e.g. 2D fields vs. 3D fields), so
1300     // "number of messages" is not the same thing as "traffic".
1301     // But it should suffice.
1302    
1303     // Init a couple of things
1304     for (i = 0; i < numFields; ++i) {
1305     expectedMessagesPerRankThisField[i] =
1306     numComputeRanks / numIORanksThisField[i];
1307     }
1308     for (i = 0; i < numIONodes; ++i) {
1309     ioNodeInfo[i].expectedNumMessagesThisNode = 0;
1310     ioNodeInfo[i].numCoresAssigned = 0;
1311     for (j = 0; j < numRanksPerNode; ++j) {
1312     ioNodeInfo[i].assignedFieldThisCore[j] = -1;
1313     }
1314     }
1315    
1316     ///////////////////////////////////////////////////////////////////
1317     // Select the i/o node with the smallest expectedNumMessages, and
1318     // assign it a rank from the field with the highest remaining
1319     // expectedMessagesPerRank. Repeat until everything is assigned.
1320     // (Yes, this could be a lot faster, but isn't worth the trouble.)
1321    
1322     for (count = 0; count < numIORanks; ++count) {
1323    
1324     // Make an initial choice of node
1325     for (i = 0; i < numIONodes; ++i) {
1326     if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break;
1327     }
1328     j = i;
1329     ASSERT(j < numIONodes);
1330    
1331     // Search for a better choice
1332     for (++i; i < numIONodes; ++i) {
1333     if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue;
1334     if (ioNodeInfo[i].expectedNumMessagesThisNode <
1335     ioNodeInfo[j].expectedNumMessagesThisNode)
1336     {
1337     j = i;
1338     }
1339     }
1340    
1341    
1342     // Make an initial choice of field
1343     for (i = 0; i < numFields; ++i) {
1344     if (numIORanksThisField[i] > 0) break;
1345     }
1346     k = i;
1347     ASSERT(k < numFields);
1348    
1349     // Search for a better choice
1350     for (++i; i < numFields; ++i) {
1351     if (numIORanksThisField[i] <= 0) continue;
1352     if (expectedMessagesPerRankThisField[i] >
1353     expectedMessagesPerRankThisField[k])
1354     {
1355     k = i;
1356     }
1357     }
1358    
1359     // Put one rank from the chosen field onto the chosen node
1360     ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k];
1361     ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k;
1362     ioNodeInfo[j].numCoresAssigned += 1;
1363     numIORanksThisField[k] -= 1;
1364     }
1365    
1366     // Sanity check - all ranks were assigned to a core
1367     for (i = 1; i < numFields; ++i) {
1368     ASSERT(0 == numIORanksThisField[i]);
1369     }
1370     // Sanity check - all cores were assigned a rank
1371     for (i = 1; i < numIONodes; ++i) {
1372     ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned);
1373     }
1374    
1375    
1376     /////////////////////////////////////
1377     // Return the computed assignments
1378     for (i = 0; i < numIONodes; ++i) {
1379     for (j = 0; j < numRanksPerNode; ++j) {
1380     assignments[i*numRanksPerNode + j] =
1381     ioNodeInfo[i].assignedFieldThisCore[j];
1382     }
1383     }
1384    
1385     // Clean up
1386     for (i = 0; i < numIONodes; ++i) {
1387     free(ioNodeInfo[i].assignedFieldThisCore);
1388     }
1389    
1390     }
1391    
1392     //////////////////////////////////////////////////////////////////////////////////
1393    
1394    
1395    
1396    
1397     int
1398     isIORank(int commRank, int totalNumNodes, int numIONodes)
1399     {
1400     // Figure out if this rank is on a node that will do i/o.
1401     // Note that the i/o nodes are distributed throughout the
1402     // task, not clustered together.
1403     int ioNodeStride = totalNumNodes / numIONodes;
1404     int thisRankNode = commRank / numRanksPerNode;
1405     int n = thisRankNode / ioNodeStride;
1406     return (((n * ioNodeStride) == thisRankNode) && (n < numIONodes)) ? 1 : 0;
1407     }
1408    
1409    
1410     // Find the number of *physical cores* on the current node
1411     // (ignore hyperthreading). This should work for pretty much
1412     // any Linux based system (and fail horribly for anything else).
1413     int
1414     getNumCores(void)
1415     {
1416     return 20; // until we figure out why this cratered
1417    
1418     char *magic = "cat /proc/cpuinfo | egrep \"core id|physical id\" | "
1419     "tr -d \"\\n\" | sed s/physical/\\\\nphysical/g | "
1420     "grep -v ^$ | sort -u | wc -l";
1421    
1422     FILE *fp = popen(magic,"r");
1423     ASSERT(fp);
1424    
1425     int rtnValue = -1;
1426    
1427     int res = fscanf(fp,"%d",&rtnValue);
1428    
1429     ASSERT(1==res);
1430    
1431     pclose(fp);
1432    
1433     ASSERT(rtnValue > 0);
1434     return rtnValue;
1435     }
1436    
1437    
1438    
1439     ////////////////////////////////////////////////////////////////////////
1440     ////////////////////////////////////////////////////////////////////////
1441     // Routines called by the mitGCM code
1442    
1443    
1444     ////////////////////////////////////////
1445     // Various "one time" initializations
1446     void
1447     f1(
1448     MPI_Comm parentComm,
1449     int numComputeRanks,
1450     int numTiles,
1451     MPI_Comm *rtnComputeComm)
1452     {
1453     MPI_Comm newIntracomm, newIntercomm, dupParentComm;
1454     int newIntracommRank, thisIsIORank;
1455     int parentSize, parentRank;
1456     int numIONodes, numIORanks;
1457     int mpiIsInitialized, *intPtr, tagUBexists;
1458     int i, j, nF, compRoot, fieldIndex, epochStyleIndex;
1459     int totalNumNodes, numComputeNodes, newIntracommSize;
1460    
1461     // Init globals
1462     totalNumTiles = numTiles;
1463     if (numRanksPerNode <= 0) {
1464     // Might also want to check for an env var (or something)
1465     numRanksPerNode = getNumCores();
1466     }
1467    
1468     // Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors
1469     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1470     fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex];
1471    
1472     int curFieldIndex = -1;
1473     while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') {
1474     i = findField(f->dataFieldID);
1475     if (-1 != f->zDepth) {
1476     if (f->zDepth != fieldDepths[i].numZ) {
1477     fprintf(stderr, "Inconsistent z-depth for field '%c': "
1478     "fieldDepths[%d] and epoch style %d\n",
1479     f->dataFieldID, i, epochStyleIndex);
1480     }
1481     }
1482     f->zDepth = fieldDepths[i].numZ;
1483     }
1484     }
1485    
1486    
1487     // Find the max MPI "tag" value
1488     MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists);
1489     ASSERT(tagUBexists);
1490     maxTagValue = *intPtr;
1491    
1492    
1493     // Info about the parent communicator
1494     MPI_Initialized(&mpiIsInitialized);
1495     ASSERT(mpiIsInitialized);
1496     MPI_Comm_size(parentComm, &parentSize);
1497     MPI_Comm_rank(parentComm, &parentRank);
1498    
1499    
1500     // Figure out how many nodes we can use for i/o.
1501     // To make things (e.g. memory calculations) simpler, we want
1502     // a node to have either all compute ranks, or all i/o ranks.
1503     // If numComputeRanks does not evenly divide numRanksPerNode, we have
1504     // to round up in favor of the compute side.
1505    
1506     totalNumNodes = divCeil(parentSize, numRanksPerNode);
1507     numComputeNodes = divCeil(numComputeRanks, numRanksPerNode);
1508     numIONodes = (parentSize - numComputeRanks) / numRanksPerNode;
1509     ASSERT(numIONodes > 0);
1510     ASSERT(numIONodes <= (totalNumNodes - numComputeNodes));
1511     numIORanks = numIONodes * numRanksPerNode;
1512    
1513    
1514     // It is surprisingly easy to launch the job with the wrong number
1515     // of ranks, particularly if the number of compute ranks is not a
1516     // multiple of numRanksPerNode (the tendency is to just launch one
1517     // rank per core for all available cores). So we introduce a third
1518     // option for a rank: besides being an i/o rank or a compute rank,
1519     // it might instead be an "excess" rank, in which case it just
1520     // calls MPI_Finalize and exits.
1521    
1522     typedef enum { isCompute, isIO, isExcess } rankType;
1523     rankType thisRanksType;
1524    
1525     if (isIORank(parentRank, totalNumNodes, numIONodes)) {
1526     thisRanksType = isIO;
1527     } else if (parentRank >= numComputeRanks + numIORanks) {
1528     thisRanksType = isExcess;
1529     } else {
1530     thisRanksType = isCompute;
1531     }
1532    
1533     // Split the parent communicator into parts
1534     MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm);
1535     MPI_Comm_rank(newIntracomm, &newIntracommRank);
1536    
1537     // Excess ranks disappear
1538     // N.B. "parentSize" still includes these ranks.
1539     if (isExcess == thisRanksType) {
1540     MPI_Finalize();
1541     exit(0);
1542     }
1543    
1544     // Sanity check
1545     MPI_Comm_size(newIntracomm, &newIntracommSize);
1546     if (isIO == thisRanksType) {
1547     ASSERT(newIntracommSize == numIORanks);
1548     } else {
1549     ASSERT(newIntracommSize == numComputeRanks);
1550     }
1551    
1552    
1553     // Create a special intercomm from the i/o and compute parts
1554     if (isIO == thisRanksType) {
1555     // Set globals
1556     ioIntracomm = newIntracomm;
1557     MPI_Comm_rank(ioIntracomm, &ioIntracommRank);
1558    
1559     // Find the lowest computation rank
1560     for (i = 0; i < parentSize; ++i) {
1561     if (!isIORank(i, totalNumNodes, numIONodes)) break;
1562     }
1563     } else { // isCompute
1564     // Set globals
1565     computeIntracomm = newIntracomm;
1566     MPI_Comm_rank(computeIntracomm, &computeIntracommRank);
1567    
1568     // Find the lowest IO rank
1569     for (i = 0; i < parentSize; ++i) {
1570     if (isIORank(i, totalNumNodes, numIONodes)) break;
1571     }
1572     }
1573     MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm);
1574    
1575    
1576    
1577     ///////////////////////////////////////////////////////////////////
1578     // For each different i/o epoch style, split the i/o ranks among
1579     // the fields, and create an inter-communicator for each split.
1580    
1581     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1582     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1583     int fieldAssignments[numIORanks];
1584     int rankAssignments[numComputeRanks + numIORanks];
1585    
1586     // Count the number of fields in this epoch style
1587     for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ;
1588    
1589     // Decide how to apportion the i/o ranks among the fields
1590     for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1;
1591     computeIORankAssigments(numComputeRanks, numIORanks, nF,
1592     thisEpochStyle, fieldAssignments);
1593     // Sanity check
1594     for (i=0; i < numIORanks; ++i) {
1595     ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF));
1596     }
1597    
1598     // Embed the i/o rank assignments into an array holding
1599     // the assignments for *all* the ranks (i/o and compute).
1600     // Rank assignment of "-1" means "compute".
1601     j = 0;
1602     for (i = 0; i < numComputeRanks + numIORanks; ++i) {
1603     rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ?
1604     fieldAssignments[j++] : -1;
1605     }
1606     // Sanity check. Check the assignment for this rank.
1607     if (isIO == thisRanksType) {
1608     ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]);
1609     } else {
1610     ASSERT(-1 == rankAssignments[parentRank]);
1611     }
1612     ASSERT(j == numIORanks);
1613    
1614     if (0 == parentRank) {
1615     printf("\ncpu assignments, epoch style %d\n", epochStyleIndex);
1616     for (i = 0; i < numComputeNodes + numIONodes ; ++i) {
1617     if (rankAssignments[i*numRanksPerNode] >= 0) {
1618     // i/o node
1619     for (j = 0; j < numRanksPerNode; ++j) {
1620     printf(" %1d", rankAssignments[i*numRanksPerNode + j]);
1621     }
1622     } else {
1623     // compute node
1624     for (j = 0; j < numRanksPerNode; ++j) {
1625     if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break;
1626     ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]);
1627     }
1628     printf(" #");
1629     for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any)
1630     printf("X");
1631     }
1632     }
1633     printf(" ");
1634     }
1635     printf("\n\n");
1636     }
1637    
1638     // Find the lowest rank assigned to computation; use it as
1639     // the "root" for the upcoming intercomm creates.
1640     for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot);
1641     // Sanity check
1642     if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank);
1643    
1644    
1645     // If this is an I/O rank, split the newIntracomm (which, for
1646     // the i/o ranks, is a communicator among all the i/o ranks)
1647     // into the pieces as assigned.
1648    
1649     if (isIO == thisRanksType) {
1650     MPI_Comm fieldIntracomm;
1651     int myField = rankAssignments[parentRank];
1652     ASSERT(myField >= 0);
1653    
1654     MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm);
1655     thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm;
1656    
1657     // Now create an inter-communicator between the computational
1658     // ranks, and each of the newly split off field communicators.
1659     for (i = 0; i < nF; ++i) {
1660    
1661     // Do one field at a time to avoid clashes on parentComm
1662     MPI_Barrier(newIntracomm);
1663    
1664     if (myField != i) continue;
1665    
1666     // Create the intercomm for this field for this epoch style
1667     MPI_Intercomm_create(fieldIntracomm, 0, parentComm,
1668     compRoot, i, &(thisEpochStyle[myField].dataIntercomm));
1669    
1670     // ... and dup a separate one for tile registrations
1671     MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm,
1672     &(thisEpochStyle[myField].registrationIntercomm));
1673     }
1674     }
1675     else {
1676     // This is a computational rank; create the intercommunicators
1677     // to the various split off separate field communicators.
1678    
1679     for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) {
1680    
1681     // Find the remote "root" process for this field
1682     int fieldRoot = -1;
1683     while (rankAssignments[++fieldRoot] != fieldIndex);
1684    
1685     // Create the intercomm for this field for this epoch style
1686     MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot,
1687     fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm));
1688    
1689     // ... and dup a separate one for tile registrations
1690     MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm,
1691     &(thisEpochStyle[fieldIndex].registrationIntercomm));
1692     }
1693     }
1694    
1695     } // epoch style loop
1696    
1697    
1698     // I/O processes split off and start receiving data
1699     // NOTE: the I/O processes DO NOT RETURN from this call
1700     if (isIO == thisRanksType) ioRankMain(totalNumTiles);
1701    
1702    
1703     // The "compute" processes now return with their new intra-communicator.
1704     *rtnComputeComm = newIntracomm;
1705    
1706     // but first, call mpiio initialization routine!
1707     initSizesAndTypes();
1708    
1709     return;
1710     }
1711    
1712    
1713    
1714    
1715     // "Register" the tile-id(s) that this process will be sending
1716     void
1717     f2(int tileID)
1718     {
1719     int i, epochStyleIndex;
1720    
1721     static int count = 0;
1722    
1723     // This code assumes that the same tileID will apply to all the
1724     // fields. Note that we use the tileID as a tag, so we require it
1725     // be an int with a legal tag value, but we do *NOT* actually use
1726     // the tag *value* for anything (e.g. it is NOT used as an index
1727     // into an array). It is treated as an opaque handle that is
1728     // passed from the compute task(s) to the compositing routines.
1729     // Those two end-points likely assign meaning to the tileID (i.e.
1730     // use it to identify where the tile belongs within the domain),
1731     // but that is their business.
1732     //
1733     // A tileID of -1 signifies the end of registration.
1734    
1735     ASSERT(computeIntracommRank >= 0);
1736    
1737     if (tileID >= 0) {
1738     // In this version of the code, in addition to the tileID, we also
1739     // multiplex in the low order bit(s) of the epoch number as an
1740     // error check. So the tile value must be small enough to allow that.
1741     ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue);
1742    
1743     // In this version of the code, we do not send the actual tileID's
1744     // to the i/o processes during the registration procedure. We only
1745     // send a count of the number of tiles that we will send.
1746     ++count;
1747     return;
1748     }
1749    
1750    
1751     // We get here when we are called with a negative tileID, signifying
1752     // the end of registration. We now need to figure out and inform
1753     // *each* i/o process in *each* field just how many tiles we will be
1754     // sending them in *each* epoch style.
1755    
1756     for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1757     fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1758    
1759     int curFieldIndex = -1;
1760     while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1761     fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]);
1762     int numRemoteRanks, *tileCounts, remainder;
1763     MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks);
1764    
1765     tileCounts = alloca(numRemoteRanks * sizeof(int));
1766    
1767     memset(tileCounts,0,numRemoteRanks * sizeof(int));
1768    
1769     // Distribute the tiles among the i/o ranks.
1770     for (i = 0; i < numRemoteRanks; ++i) {
1771     tileCounts[i] = count / numRemoteRanks;
1772     }
1773     // Deal with any remainder
1774     remainder = count - ((count / numRemoteRanks) * numRemoteRanks);
1775     for (i = 0; i < remainder; ++i) {
1776     int target = (computeIntracommRank + i) % numRemoteRanks;
1777     tileCounts[target] += 1;
1778     }
1779    
1780     // Communicate these counts to the i/o processes for this
1781     // field. Note that we send a message to each process,
1782     // even if the count is zero.
1783     for (i = 0; i < numRemoteRanks; ++i) {
1784     MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i],
1785     thisField->registrationIntercomm);
1786     }
1787    
1788     } // field loop
1789     } // epoch-style loop
1790    
1791     }
1792    
1793    
1794    
1795    
1796     int currentEpochID = 0;
1797     int currentEpochStyle = 0;
1798    
1799     void
1800     beginNewEpoch(int newEpochID, int gcmIter, int epochStyle)
1801     {
1802     fieldInfoThisEpoch_t *p;
1803    
1804     if (newEpochID != (currentEpochID + 1)) {
1805     fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n",
1806     currentEpochID, newEpochID);
1807     sleep(1); // Give the message a chance to propagate
1808     abort();
1809     }
1810    
1811     ////////////////////////////////////////////////////////////////////////
1812     // Need to be sure the i/o tasks are done processing the previous epoch
1813     // before any compute tasks start sending tiles from a new epoch.
1814    
1815     if (0 == computeIntracommRank) {
1816     int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter };
1817    
1818     // Wait to get an ack that the i/o tasks are done with the prior epoch.
1819     MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1820     globalIntercomm, MPI_STATUS_IGNORE);
1821    
1822     // Tell the i/o ranks about the new epoch.
1823     // (Just tell rank 0; it will bcast to the others)
1824     MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm);
1825     }
1826    
1827     // Compute ranks wait here until rank 0 gets the ack from the i/o ranks
1828     MPI_Barrier(computeIntracomm);
1829    
1830    
1831     currentEpochID = newEpochID;
1832     currentEpochStyle = epochStyle;
1833    
1834     // Reset the tileCount (used by f3)
1835     for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1836     p->tileCount = 0;
1837     }
1838     }
1839    
1840    
1841     void
1842     f3(char dataFieldID, int tileID, int epochID, void *data)
1843     {
1844     int whichField, receiver, tag;
1845    
1846     static char priorDataFieldID = '\0';
1847     static int priorEpoch = -1;
1848     static int remoteCommSize;
1849     static fieldInfoThisEpoch_t *p;
1850    
1851     int flag=0;
1852    
1853     // Check that this global has been set
1854     ASSERT(computeIntracommRank >= 0);
1855    
1856    
1857     // Figure out some info about this dataFieldID. It is
1858     // likely to be another tile from the same field as last time
1859     // we were called, in which case we can reuse the "static" values
1860     // retained from the prior call. If not, we need to look it up.
1861    
1862     if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) {
1863    
1864     // It's not the same; we need to look it up.
1865    
1866     for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1867     if (p->dataFieldID == dataFieldID) break;
1868     }
1869     // Make sure we found a valid field
1870    
1871     ASSERT(p->dataIntercomm != MPI_COMM_NULL);
1872    
1873     MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize);
1874    
1875     flag=1;
1876    
1877     }
1878    
1879     ASSERT(p->dataFieldID == dataFieldID);
1880    
1881     receiver = (computeIntracommRank + p->tileCount) % remoteCommSize;
1882    
1883     tag = (tileID << numCheckBits) | (epochID & bitMask);
1884    
1885     //fprintf(stderr,"%d %d\n",flag,tileID);
1886    
1887     /*
1888     if (tileID==189){
1889     int i,j;
1890     for (i=0;i<TILE_Y;++i){
1891     for (j=0;j<TILE_X;++j)
1892     fprintf(stderr,"%f ",((double*)data)[872+i*108+j]);
1893     fprintf(stderr,"\n");
1894     }
1895     }
1896    
1897     */
1898    
1899    
1900    
1901     FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n",
1902     computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits);
1903    
1904     MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth,
1905     MPI_BYTE, receiver, tag, p->dataIntercomm);
1906    
1907     p->tileCount += 1;
1908     priorDataFieldID = dataFieldID;
1909     priorEpoch = epochID;
1910     }
1911    
1912    
1913    
1914     void
1915     f4(int epochID)
1916     {
1917     int i;
1918     ASSERT(computeIntracommRank >= 0);
1919    
1920     if (0 == computeIntracommRank) {
1921     int cmd[3] = { cmd_exit, -1, -1 };
1922    
1923     // Recv the ack that the prior i/o epoch is complete
1924     MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1925     globalIntercomm, MPI_STATUS_IGNORE);
1926    
1927     // Tell the i/o ranks to exit
1928     // Just tell rank 0; it will bcast to the others
1929     MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm);
1930     }
1931    
1932     }
1933    
1934    
1935    
1936     void myasyncio_set_global_sizes_(int *nx, int *ny, int *nz,
1937     int *nt, int *nb,
1938     int *tx, int *ty,
1939     int *ox, int *oy)
1940     {
1941    
1942    
1943     int data[] = {*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy};
1944    
1945     int items = sizeof(data)/sizeof(int);
1946    
1947    
1948     NTILES = *nt; // total number of tiles
1949    
1950    
1951     TILE_X = *tx;
1952     TILE_Y = *ty;
1953     XGHOSTS = *ox;
1954     YGHOSTS = *oy;
1955     tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
1956     tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // needed by compute ranks in f3
1957    
1958    
1959     int rank,ierr;
1960     MPI_Comm_rank(globalIntercomm,&rank);
1961    
1962    
1963     if (0==rank)
1964     printf("%d %d %d\n%d %d\n%d %d\n%d %d\n",*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy);
1965    
1966    
1967     /*
1968     if (0==rank)
1969     ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_ROOT,globalIntercomm);
1970     else
1971     ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1972     */
1973    
1974     if (0==rank)
1975     ierr=MPI_Bcast(data,items,MPI_INT,MPI_ROOT,globalIntercomm);
1976     else
1977     ierr=MPI_Bcast(data,items,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1978    
1979     }
1980    
1981     void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip)
1982     {
1983     int rank,ierr;
1984     MPI_Comm_rank(globalIntercomm,&rank);
1985    
1986     if (0==rank)
1987     ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1988     else
1989     ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1990    
1991     if (0==rank)
1992     ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1993     else
1994     ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1995    
1996     if (0==rank)
1997     ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1998     else
1999     ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
2000    
2001     }
2002    
2003    
2004    
2005    
2006    
2007     //////////////////////////////////////////////////////////////////////
2008     // Fortran interface
2009    
2010     void
2011     bron_f1(
2012     MPI_Fint *ptr_parentComm,
2013     int *ptr_numComputeRanks,
2014     int *ptr_totalNumTiles,
2015     MPI_Fint *rtnComputeComm)
2016     {
2017     // Convert the Fortran handle into a C handle
2018     MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm);
2019    
2020     f1(parentComm, *ptr_numComputeRanks, *ptr_totalNumTiles, &newComm);
2021    
2022     // Convert the C handle into a Fortran handle
2023     *rtnComputeComm = MPI_Comm_c2f(newComm);
2024     }
2025    
2026    
2027    
2028     void
2029     bron_f2(int *ptr_tileID)
2030     {
2031     f2(*ptr_tileID);
2032     }
2033    
2034    
2035     void
2036     beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle)
2037     {
2038     beginNewEpoch(*newEpochID, *gcmIter, *epochStyle);
2039     }
2040    
2041    
2042     void
2043     bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data)
2044     {
2045     f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data);
2046     }
2047    
2048    
2049    
2050     void
2051     bron_f4(int *ptr_epochID)
2052     {
2053     f4(*ptr_epochID);
2054     }
2055    

  ViewVC Help
Powered by ViewVC 1.1.22