/[MITgcm]/MITgcm_contrib/llc_hires/llc_4320/code-async/recvTask.c
ViewVC logotype

Contents of /MITgcm_contrib/llc_hires/llc_4320/code-async/recvTask.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.2 - (show annotations) (download)
Fri Jan 10 16:50:36 2014 UTC (11 years, 6 months ago) by dimitri
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +70 -38 lines
File MIME type: text/plain
suncing llc_4320 with llc_2160 changes

1
2 // Code to do the m-on-n fan-in, recompositing, and output, of data
3 // tiles for mitGCM.
4 //
5 // There are aspects of this code that would be simpler in C++, but
6 // we deliberately wrote the code in ansi-C to make linking it with
7 // the Fortran main code easier (should be able to just include the
8 // .o on the link line).
9
10
11 #include "PACKAGES_CONFIG.h"
12
13 #include <stdio.h>
14 #include <unistd.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20
21 #include <mpi.h>
22 #include <alloca.h>
23
24
25 #include <stdint.h>
26 #include <errno.h>
27
28 #define DEBUG 1
29
30 #if (DEBUG >= 2)
31 #define FPRINTF fprintf
32 #else
33 #include <stdarg.h>
34 void FPRINTF(FILE *fp,...){return;}
35 #endif
36
37
38 // Define our own version of "assert", where we sleep rather than abort.
39 // This makes it easier to attach the debugger.
40 #if (DEBUG >= 1)
41 #define ASSERT(_expr) \
42 if (!(_expr)) { \
43 fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \
44 getpid(), #_expr, __FILE__, __LINE__, __func__); \
45 sleep(9999); \
46 }
47 #else
48 #define ASSERT assert
49 #endif
50
51
52
53 // If numRanksPerNode is set to be > 0, we just use that setting.
54 // If it is <= 0, we dynamically determine the number of cores on
55 // a node, and use that. (N.B.: we assume *all* nodes being used in
56 // the run have the *same* number of cores.)
57 int numRanksPerNode = 0;
58
59 // Just an error check; can be zero (if you're confident it's all correct).
60 #define numCheckBits 2
61 // This bitMask definition works for 2's complement
62 #define bitMask ((1 << numCheckBits) - 1)
63
64
65 ///////////////////////////////////////////////////////////////////////
66 // Info about the data fields
67
68 // double for now. Might also be float someday
69 #define datumType double
70 #define datumSize (sizeof(datumType))
71
72
73 // Info about the data fields. We assume that all the fields have the
74 // same X and Y characteristics, but may have either 1 or NUM_Z levels.
75 //
76 // the following 3 values are required here. Could be sent over or read in
77 // with some rearrangement in init routines
78 //
79 #define NUM_X 4320
80 #define NUM_Y 56160L // get rid of this someday
81 #define NUM_Z 90
82 #define MULTDIM 7
83 #define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize)
84 #define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z)
85 #define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM)
86
87 // Info about the data tiles. We assume that all the tiles are the same
88 // size (no odd-sized last piece), they all have the same X and Y
89 // characteristics (including ghosting), and are full depth in Z
90 // (either 1, or NUM_Z, or MULTDIM, as appropriate).
91 //
92 // all these data now sent over from compute ranks
93 //
94 int TILE_X = -1;
95 int TILE_Y = -1;
96 int XGHOSTS = -1;
97 int YGHOSTS = -1;
98
99 // Size of one Z level of a tile (NOT one Z level of a whole field)
100 int tileOneZLevelItemCount = -1;
101 int tileOneZLevelSizeInBytes = -1;
102
103
104 typedef struct dataFieldDepth {
105 char dataFieldID;
106 int numZ;
107 } dataFieldDepth_t;
108
109 dataFieldDepth_t fieldDepths[] = {
110 { 'A', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h
111
112 { 'B', 1 },
113 { 'C', 1 },
114 { 'D', 1 },
115 { 'E', 1 },
116 { 'F', 1 },
117 { 'G', 1 },
118 { 'H', 1 },
119 { 'I', 1 },
120 { 'J', 1 },
121 { 'K', 1 },
122 { 'L', 1 },
123 { 'M', 1 },
124 { 'N', 1 },
125 { 'O', 1 },
126 { 'P', 1 },
127 { 'Q', 1 },
128 { 'R', 1 },
129
130 { 'S', NUM_Z },
131 { 'T', NUM_Z },
132 { 'U', NUM_Z },
133 { 'V', NUM_Z },
134 { 'W', NUM_Z },
135 { 'X', NUM_Z },
136 { 'Y', NUM_Z },
137 };
138 #define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t))
139
140
141
142 ///////////////////////////////////////////////////////////////////////
143 // Info about the various kinds of i/o epochs
144 typedef struct epochFieldInfo {
145 char dataFieldID;
146 MPI_Comm registrationIntercomm;
147 MPI_Comm dataIntercomm;
148 MPI_Comm ioRanksIntracomm;
149 int tileCount;
150 int zDepth; // duplicates the fieldDepth entry; filled in automatically
151 char filenameTemplate[128];
152 long int offset;
153 int pickup;
154 } fieldInfoThisEpoch_t;
155
156 // The normal i/o dump
157 fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = {
158 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "U.%010d.%s", 0, 0 },
159 { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "V.%010d.%s", 0, 0 },
160 { 'W', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "W.%010d.%s", 0, 0 },
161 { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Salt.%010d.%s", 0, 0 },
162 { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Theta.%010d.%s", 0,0 },
163 { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "Eta.%010d.%s", 0,0 },
164
165 { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIarea.%010d.%s", 0,0 },
166 { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIheff.%010d.%s", 0,0 },
167 { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsnow.%010d.%s", 0,0 },
168 { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIuice.%010d.%s", 0,0 },
169 { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIvice.%010d.%s", 0,0 },
170 { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "SIhsalt.%010d.%s", 0,0 },
171
172 //{ 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "EtaHnm1.%010d.%s", 0,0 },
173 { 'I', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUX.%010d.%s", 0,0 },
174 { 'J', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceTAUY.%010d.%s", 0,0 },
175 { 'K', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "KPPhbl.%010d.%s", 0,0 },
176 { 'L', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceSflux.%010d.%s", 0,0 },
177
178 { 'M', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceFWflx.%010d.%s", 0,0 },
179 { 'O', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQnet.%010d.%s", 0,0 },
180 { 'P', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "PhiBot.%010d.%s", 0,0 },
181 { 'Q', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "oceQsw.%010d.%s", 0,0 },
182 //{ 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "dEtaHdt.%010d.%s", 0,0 },
183
184 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0,0 },
185 };
186
187
188 // pickup file
189 fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = {
190 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1},
191 { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1},
192 { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1},
193 { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1},
194 { 'X', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1},
195 { 'Y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1},
196 { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1},
197 { 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1},
198 { 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1},
199 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 ,1},
200 };
201
202
203 // seaice pickup
204 fieldInfoThisEpoch_t fieldsForEpochStyle_2[] = {
205 { 'A', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2},
206 { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2},
207 { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2},
208 { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2},
209 { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2},
210 { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2},
211 { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2},
212 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, "", 0 },
213 };
214
215
216 fieldInfoThisEpoch_t *epochStyles[] = {
217 fieldsForEpochStyle_0,
218 fieldsForEpochStyle_1,
219 fieldsForEpochStyle_2,
220 };
221 int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *));
222
223
224 typedef enum {
225 cmd_illegal,
226 cmd_newEpoch,
227 cmd_epochComplete,
228 cmd_exit,
229 } epochCmd_t;
230
231
232 ///////////////////////////////////////////////////////////////////////
233
234 // Note that a rank will only have access to one of the Intracomms,
235 // but all ranks will define the Intercomm.
236 MPI_Comm ioIntracomm = MPI_COMM_NULL;
237 int ioIntracommRank = -1;
238 MPI_Comm computeIntracomm = MPI_COMM_NULL;
239 int computeIntracommRank = -1;
240 MPI_Comm globalIntercomm = MPI_COMM_NULL;
241
242
243 #define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y))
244 #define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y))
245
246
247
248 typedef enum {
249 bufState_illegal,
250 bufState_Free,
251 bufState_InUse,
252 } bufState_t;
253
254
255 typedef struct buf_header{
256 struct buf_header *next;
257 bufState_t state;
258 int requestsArraySize;
259 MPI_Request *requests;
260 uint64_t ck0;
261 char *payload; // [tileOneZLevelSizeInBytes * NUM_Z]; // Max payload size
262 uint64_t ck1; // now rec'vd from compute ranks & dynamically allocated in
263 } bufHdr_t; // allocateTileBufs
264
265
266 bufHdr_t *freeTileBufs_ptr = NULL;
267 bufHdr_t *inUseTileBufs_ptr = NULL;
268
269 int maxTagValue = -1;
270 int totalNumTiles = -1;
271
272 // routine to convert double to float during memcpy
273 // need to get byteswapping in here as well
274 memcpy_r8_2_r4 (float *f, double *d, long long *n)
275 {
276 long long i, rem;
277 rem = *n%16LL;
278 for (i = 0; i < rem; i++) {
279 f [i] = d [i];
280 }
281 for (i = rem; i < *n; i += 16) {
282 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) );
283 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) );
284 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0");
285 __asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory");
286 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1");
287 __asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory");
288 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2");
289 __asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory");
290 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3");
291 __asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory");
292 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4");
293 __asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory");
294 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5");
295 __asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory");
296 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6");
297 __asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory");
298 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7");
299 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) );
300 __asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory");
301 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8");
302 __asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory");
303 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9");
304 __asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory");
305 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10");
306 __asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory");
307 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11");
308 __asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory");
309 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12");
310 __asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory");
311 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13");
312 __asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory");
313 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14");
314 __asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory");
315 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15");
316 __asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory");
317 }
318 }
319
320 // Debug routine
321 countBufs(int nbufs)
322 {
323 int nInUse, nFree;
324 bufHdr_t *bufPtr;
325 static int target = -1;
326
327 if (-1 == target) {
328 ASSERT(-1 != nbufs);
329 target = nbufs;
330 }
331
332 bufPtr = freeTileBufs_ptr;
333 for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1;
334
335 bufPtr = inUseTileBufs_ptr;
336 for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1;
337
338 if (nInUse + nFree != target) {
339 fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n",
340 ioIntracommRank, nFree, nInUse, target);
341 sleep(5000);
342 }
343 }
344
345 int readn(int fd, void *p, int nbytes)
346 {
347
348 char *ptr = (char*)(p);
349
350 int nleft, nread;
351
352 nleft = nbytes;
353 while (nleft > 0){
354 nread = read(fd, ptr, nleft);
355 if (nread < 0)
356 return(nread); // error
357 else if (nread == 0)
358 break; // EOF
359
360 nleft -= nread;
361 ptr += nread;
362 }
363 return (nbytes - nleft);
364 }
365
366
367 ssize_t writen(int fd, void *p, size_t nbytes)
368 {
369 char *ptr = (char*)(p);
370
371 size_t nleft;
372 ssize_t nwritten;
373
374 nleft = nbytes;
375 while (nleft > 0){
376 nwritten = write(fd, ptr, nleft);
377 if (nwritten <= 0){
378 if (errno==EINTR) continue; // POSIX, not SVr4
379 return(nwritten); // non-EINTR error
380 }
381
382 nleft -= nwritten;
383 ptr += nwritten;
384 }
385 return(nbytes - nleft);
386 }
387
388
389 void write_pickup_meta(FILE *fp, int gcmIter, int pickup)
390 {
391 int i;
392 int ndims = 2;
393 int nrecords,nfields;
394 char**f;
395 char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"};
396 char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"};
397 // for now, just list the fields here. When the whole field specification apparatus
398 // is cleaned up, pull the names out of the epochstyle definition or whatever
399
400 ASSERT(1==pickup||2==pickup);
401
402 if (1==pickup){
403 nrecords = 6*NUM_Z+3;
404 nfields = sizeof(fld1)/sizeof(char*);
405 f = fld1;
406 }
407 else if (2==pickup){
408 nrecords = MULTDIM+6;
409 nfields = sizeof(fld2)/sizeof(char*);
410 f = fld2;
411 }
412
413 fprintf(fp," nDims = [ %3d ];\n",ndims);
414 fprintf(fp," dimList = [\n");
415 fprintf(fp," %10u,%10d,%10u,\n",NUM_X,1,NUM_X);
416 fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y);
417 fprintf(fp," ];\n");
418 fprintf(fp," dataprec = [ 'float64' ];\n");
419 fprintf(fp," nrecords = [ %5d ];\n",nrecords);
420 fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter);
421 fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be?
422 fprintf(fp," nFlds = [ %4d ];\n",nfields);
423 fprintf(fp," fldList = {\n");
424 for (i=0;i<nfields;++i)
425 fprintf(fp," '%-8s'",f[i]);
426 fprintf(fp,"\n };\n");
427 }
428
429
430
431 double *outBuf=NULL;//[NUM_X*NUM_Y*NUM_Z]; // only needs to be myNumZSlabs
432 size_t outBufSize=0;
433
434
435 void
436 do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int firstZ, int numZ, int gcmIter)
437 {
438 if (0==numZ) return; // this is *not* global NUM_Z : change name of parameter to avoid grief!
439
440 int pickup = whichField->pickup;
441
442 ///////////////////////////////
443 // swap here, if necessary
444 // int i,j;
445
446 //i = NUM_X*NUM_Y*numZ;
447 //mds_byteswapr8_(&i,outBuf);
448
449 // mds_byteswapr8 expects an integer count, which is gonna overflow someday
450 // can't redefine to long without affecting a bunch of other calls
451 // so do a slab at a time here, to delay the inevitable
452 // i = NUM_X*NUM_Y;
453 //for (j=0;j<numZ;++j)
454 // mds_byteswapr8_(&i,&outBuf[i*j]);
455
456 // gnu builtin evidently honored by intel compilers
457
458 if (pickup) {
459 uint64_t *alias = (uint64_t*)outBuf;
460 size_t i;
461 for (i=0;i<NUM_X*NUM_Y*numZ;++i)
462 alias[i] = __builtin_bswap64(alias[i]);
463 }
464 else {
465 uint32_t *alias = (uint32_t*)outBuf;
466 size_t i;
467 for (i=0;i<NUM_X*NUM_Y*numZ;++i)
468 alias[i] = __builtin_bswap32(alias[i]);
469 }
470
471 // end of swappiness
472 //////////////////////////////////
473
474 char s[1024];
475 //sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID);
476
477 sprintf(s,whichField->filenameTemplate,gcmIter,"data");
478
479 int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP);
480 ASSERT(fd!=-1);
481
482 size_t nbytes;
483
484 if (pickup) {
485 lseek(fd,whichField->offset,SEEK_SET);
486 lseek(fd,firstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR);
487 nbytes = NUM_X*NUM_Y*numZ*datumSize;
488 }
489 else {
490 lseek(fd,firstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR);
491 nbytes = NUM_X*NUM_Y*numZ*sizeof(float);
492 }
493
494 ssize_t bwrit = writen(fd,outBuf,nbytes);
495
496 if (-1==bwrit) perror("Henze : file write problem : ");
497
498 FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,firstZ,numZ);
499
500 // ASSERT(nbytes == bwrit);
501
502 if (nbytes!=bwrit)
503 fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes);
504
505 close(fd);
506
507
508 return;
509 }
510
511
512 int NTILES = -1;
513
514 typedef struct {
515 int off;
516 int skip;
517 } tile_layout_t;
518
519 tile_layout_t *offsetTable;
520
521 void
522 processSlabSection(
523 fieldInfoThisEpoch_t *whichField,
524 int tileID,
525 void *data,
526 int myNumZSlabs)
527 {
528 int intracommSize,intracommRank;
529 // MPI_Comm_size(whichField->ioRanksIntracomm, &intracommSize);
530 MPI_Comm_rank(whichField->ioRanksIntracomm, &intracommRank);
531 //printf("i/o rank %d/%d recv'd %d::%d (%d->%d)\n",intracommRank,intracommSize,whichField,tileID,firstZ,lastZ);
532 // printf("rank %d : tile %d is gonna go at %d and stride with %d, z = %d\n",intracommRank,tileID,offsetTable[tileID].off,
533 // offsetTable[tileID].skip,myNumZSlabs);
534
535 int z;
536
537 int pickup = whichField->pickup;
538
539 //ASSERT((tileID > 0) && (tileID < (sizeof(offsetTable)/sizeof(tile_layout_t))));
540 ASSERT( (tileID > 0) && (tileID <= NTILES) ); // offsetTable now dynamically allocated
541
542 if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){
543
544 free(outBuf);
545
546 outBufSize = myNumZSlabs * twoDFieldSizeInBytes;
547
548 outBuf = malloc(outBufSize);
549 ASSERT(outBuf);
550
551 memset(outBuf,0,outBufSize);
552 }
553
554 for (z=0;z<myNumZSlabs;++z){
555
556 off_t zoffset = z*TILE_X*TILE_Y*NTILES; //NOT totalNumTiles;
557 off_t hoffset = offsetTable[tileID].off;
558 off_t skipdst = offsetTable[tileID].skip;
559
560 //double *dst = outBuf + zoffset + hoffset;
561 void *dst;
562 if (pickup)
563 dst = outBuf + zoffset + hoffset;
564 else
565 dst = (float*)outBuf + zoffset + hoffset;
566
567 off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS);
568 off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS;
569 double *src = (double*)data + zoff + hoff;
570
571 off_t skipsrc = TILE_X+2*XGHOSTS;
572
573 //fprintf(stderr,"rank %d tile %d offset %d skip %d dst %x src %x\n",intracommRank,tileID,hoffset,skipdst,dst,src);
574
575 long long n = TILE_X;
576
577 int y;
578 if (pickup)
579 for (y=0;y<TILE_Y;++y)
580 memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize);
581 else
582 for (y=0;y<TILE_Y;++y)
583 memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n);
584 }
585
586 return;
587 }
588
589
590
591 // Allocate some buffers to receive tile-data from the compute ranks
592 void
593 allocateTileBufs(int numTileBufs, int maxIntracommSize)
594 {
595
596 ASSERT(tileOneZLevelSizeInBytes>0); // be sure we have rec'vd values by now
597
598 int i, j;
599 for (i = 0; i < numTileBufs; ++i) {
600
601 bufHdr_t *newBuf = malloc(sizeof(bufHdr_t));
602 ASSERT(NULL != newBuf);
603
604 newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z);
605 ASSERT(NULL != newBuf->payload);
606
607 newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request));
608 ASSERT(NULL != newBuf->requests);
609
610 // Init some values
611 newBuf->requestsArraySize = maxIntracommSize;
612 for (j = 0; j < maxIntracommSize; ++j) {
613 newBuf->requests[j] = MPI_REQUEST_NULL;
614 }
615 newBuf->ck0 = newBuf->ck1 = 0xdeadbeefdeadbeef;
616
617 // Put the buf into the free list
618 newBuf->state = bufState_Free;
619 newBuf->next = freeTileBufs_ptr;
620 freeTileBufs_ptr = newBuf;
621 }
622 }
623
624
625
626 bufHdr_t *
627 getFreeBuf()
628 {
629 int j;
630 bufHdr_t *rtnValue = freeTileBufs_ptr;
631
632 if (NULL != rtnValue) {
633 ASSERT(bufState_Free == rtnValue->state);
634 freeTileBufs_ptr = rtnValue->next;
635 rtnValue->next = NULL;
636
637 // Paranoia. This should already be the case
638 for (j = 0; j < rtnValue->requestsArraySize; ++j) {
639 rtnValue->requests[j] = MPI_REQUEST_NULL;
640 }
641 }
642 return rtnValue;
643 }
644
645
646 /////////////////////////////////////////////////////////////////
647
648
649 bufHdr_t *
650 tryToReceiveDataTile(
651 MPI_Comm dataIntercomm,
652 int epochID,
653 size_t expectedMsgSize,
654 int *tileID)
655 {
656 bufHdr_t *bufHdr;
657 int pending, i, count;
658 MPI_Status mpiStatus;
659
660 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm,
661 &pending, &mpiStatus);
662
663 // If no data are pending, or if we can't get a buffer to
664 // put the pending data into, return NULL
665 if (!pending) return NULL;
666 if (((bufHdr = getFreeBuf()) == NULL)) {
667 FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n",
668 mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits);
669 return NULL;
670 }
671
672 // Do sanity checks on the pending message
673 MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
674 ASSERT(count == expectedMsgSize);
675 ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask));
676
677 // Recieve the data we saw in the iprobe
678 MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE,
679 mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG,
680 dataIntercomm, &mpiStatus);
681 bufHdr->state = bufState_InUse;
682
683 // Overrun check
684 ASSERT(bufHdr->ck0==0xdeadbeefdeadbeef);
685 ASSERT(bufHdr->ck0==bufHdr->ck1);
686
687 // Return values
688 *tileID = mpiStatus.MPI_TAG >> numCheckBits;
689
690
691 FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n",
692 mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE);
693
694 return bufHdr;
695 }
696
697
698
699 int
700 tryToReceiveZSlab(
701 void *buf,
702 int expectedMsgSize,
703 MPI_Comm intracomm)
704 {
705 MPI_Status mpiStatus;
706 int pending, count;
707
708 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus);
709 if (!pending) return -1;
710
711 MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
712 ASSERT(count == expectedMsgSize);
713
714 MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE,
715 mpiStatus.MPI_TAG, intracomm, &mpiStatus);
716
717 FPRINTF(stderr, "recv slab %d from rank %d\n",
718 mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE);
719
720 // return the tileID
721 return mpiStatus.MPI_TAG;
722 }
723
724
725
726 void
727 redistributeZSlabs(
728 bufHdr_t *bufHdr,
729 int tileID,
730 int zSlabsPer,
731 int thisFieldNumZ,
732 MPI_Comm intracomm)
733 {
734 int pieceSize = zSlabsPer * tileOneZLevelSizeInBytes;
735 int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ;
736 int offset = 0;
737 int recvRank = 0;
738
739 while ((offset + pieceSize) <= tileSizeInBytes) {
740
741 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
742 tileID, intracomm, &(bufHdr->requests[recvRank]));
743 ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
744
745 offset += pieceSize;
746 recvRank += 1;
747 }
748
749 // There might be one last odd-sized piece
750 if (offset < tileSizeInBytes) {
751 pieceSize = tileSizeInBytes - offset;
752 ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0);
753
754 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
755 tileID, intracomm, &(bufHdr->requests[recvRank]));
756 ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
757
758 offset += pieceSize;
759 recvRank += 1;
760 }
761
762 // Sanity check
763 ASSERT(recvRank <= bufHdr->requestsArraySize);
764 while (recvRank < bufHdr->requestsArraySize) {
765 ASSERT(MPI_REQUEST_NULL == bufHdr->requests[recvRank++])
766 }
767 }
768
769
770
771 /////////////////////////////////////////////////////////////////
772 // This is called by the i/o ranks
773 void
774 doNewEpoch(int epochID, int epochStyleIndex, int gcmIter)
775 {
776 // In an i/o epoch, the i/o ranks are partitioned into groups,
777 // each group dealing with one field. The ranks within a group:
778 // (1) receive data tiles from the compute ranks
779 // (2) slice the received data tiles into slabs in the 'z'
780 // dimension, and redistribute the tile-slabs among the group.
781 // (3) receive redistributed tile-slabs, and reconstruct a complete
782 // field-slab for the whole field.
783 // (4) Write the completed field-slab to disk.
784
785 fieldInfoThisEpoch_t *fieldInfo;
786 int intracommRank, intracommSize;
787
788 int zSlabsPer;
789 int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv;
790
791 int numTilesRecvd, numSlabPiecesRecvd;
792
793
794 // Find the descriptor for my assigned field for this epoch style.
795 // It is the one whose dataIntercomm is not null
796 fieldInfo = epochStyles[epochStyleIndex];
797 while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo;
798 ASSERT('\0' != fieldInfo->dataFieldID);
799
800
801
802 MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank);
803 MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize);
804
805
806 // Compute which z slab(s) we will be reassembling.
807 zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize);
808 myNumZSlabs = zSlabsPer;
809
810 // Adjust myNumZSlabs in case it didn't divide evenly
811 myFirstZSlab = intracommRank * myNumZSlabs;
812 if (myFirstZSlab >= fieldInfo->zDepth) {
813 myNumZSlabs = 0;
814 } else if ((myFirstZSlab + myNumZSlabs) > fieldInfo->zDepth) {
815 myNumZSlabs = fieldInfo->zDepth - myFirstZSlab;
816 } else {
817 myNumZSlabs = zSlabsPer;
818 }
819 myLastZSlab = myFirstZSlab + myNumZSlabs - 1;
820
821 // If we were not assigned any z-slabs, we don't get any redistributed
822 // tile-slabs. If we were assigned one or more slabs, we get
823 // redistributed tile-slabs for every tile.
824 myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : totalNumTiles;
825
826
827 numTilesRecvd = 0;
828 numSlabPiecesRecvd = 0;
829
830 ////////////////////////////////////////////////////////////////////
831 // Main loop. Handle tiles from the current epoch
832 for(;;) {
833 bufHdr_t *bufHdr;
834
835 ////////////////////////////////////////////////////////////////
836 // Check for tiles from the computational tasks
837 while (numTilesRecvd < fieldInfo->tileCount) {
838 int tileID = -1;
839 size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth;
840 bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID,
841 msgSize, &tileID);
842 if (NULL == bufHdr) break; // No tile was received
843
844 numTilesRecvd += 1;
845 redistributeZSlabs(bufHdr, tileID, zSlabsPer,
846 fieldInfo->zDepth, fieldInfo->ioRanksIntracomm);
847
848 // Add the bufHdr to the "in use" list
849 bufHdr->next = inUseTileBufs_ptr;
850 inUseTileBufs_ptr = bufHdr;
851
852
853 }
854
855
856 ////////////////////////////////////////////////////////////////
857 // Check for tile-slabs redistributed from the i/o processes
858 while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) {
859 int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
860 char data[msgSize];
861
862 int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
863 if (tileID < 0) break; // No slab was received
864
865
866 numSlabPiecesRecvd += 1;
867 processSlabSection(fieldInfo, tileID, data, myNumZSlabs);
868
869
870 // Can do the write here, or at the end of the epoch.
871 // Probably want to make it asynchronous (waiting for
872 // completion at the barrier at the start of the next epoch).
873 //if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) {
874 // ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
875 // do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs);
876 //}
877 }
878
879
880 ////////////////////////////////////////////////////////////////
881 // Check if we can release any of the inUse buffers (i.e. the
882 // isends we used to redistribute those z-slabs have completed).
883 // We do this by detaching the inUse list, then examining each
884 // element in the list, putting each element either into the
885 // free list or back into the inUse list, as appropriate.
886 bufHdr = inUseTileBufs_ptr;
887 inUseTileBufs_ptr = NULL;
888 while (NULL != bufHdr) {
889 int count = 0;
890 int completions[intracommSize];
891 MPI_Status statuses[intracommSize];
892
893 // Acquire the "next" bufHdr now, before we change anything.
894 bufHdr_t *nextBufHeader = bufHdr->next;
895
896 // Check to see if the Isend requests have completed for this buf
897 MPI_Testsome(intracommSize, bufHdr->requests,
898 &count, completions, statuses);
899
900 if (MPI_UNDEFINED == count) {
901 // A return of UNDEFINED means that none of the requests
902 // is still outstanding, i.e. they have all completed.
903 // Put the buf on the free list.
904 bufHdr->state = bufState_Free;
905 bufHdr->next = freeTileBufs_ptr;
906 freeTileBufs_ptr = bufHdr;
907 FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank);
908 } else {
909 // At least one request is still outstanding.
910 // Put the buf on the inUse list.
911 bufHdr->next = inUseTileBufs_ptr;
912 inUseTileBufs_ptr = bufHdr;
913 }
914
915 // Countinue with the next buffer
916 bufHdr = nextBufHeader;
917 }
918
919
920 ////////////////////////////////////////////////////////////////
921 // Check if the epoch is complete
922 if ((numTilesRecvd >= fieldInfo->tileCount) &&
923 (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) &&
924 (NULL == inUseTileBufs_ptr))
925 {
926 ASSERT(numTilesRecvd == fieldInfo->tileCount);
927 ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
928
929 //fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs);
930
931 // Ok, wrap up the current i/o epoch
932 // Probably want to make this asynchronous (waiting for
933 // completion at the start of the next epoch).
934 do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter);
935
936 // The epoch is complete
937 return;
938 }
939 }
940
941 }
942
943
944
945
946
947
948
949 // Main routine for the i/o tasks. Callers DO NOT RETURN from
950 // this routine; they MPI_Finalize and exit.
951 void
952 ioRankMain(int totalNumTiles)
953 {
954 // This is one of a group of i/o processes that will be receiving tiles
955 // from the computational processes. This same process is also
956 // responsible for compositing slices of tiles together into one or
957 // more complete slabs, and then writing those slabs out to disk.
958 //
959 // When a tile is received, it is cut up into slices, and each slice is
960 // sent to the appropriate compositing task that is dealing with those
961 // "z" level(s) (possibly including ourselves). These are tagged with
962 // the tile-id. When we *receive* such a message (from ourselves, or
963 // from any other process in this group), we unpack the data (stripping
964 // the ghost cells), and put them into the slab we are assembling. Once
965 // a slab is fully assembled, it is written to disk.
966
967 int i, size, count, numTileBufs;
968 MPI_Status status;
969 int currentEpochID = 0;
970 int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0;
971
972 int numComputeRanks, epochStyleIndex;
973 MPI_Comm_remote_size(globalIntercomm, &numComputeRanks);
974
975
976 ////////////////////////////////////////////////////////////
977 //// get global tile info via bcast over the global intercom
978
979
980 // int ierr = MPI_Bcast(&NTILES,1,MPI_INT,0,globalIntercomm);
981
982 int data[9];
983
984 int ierr = MPI_Bcast(data,9,MPI_INT,0,globalIntercomm);
985
986 NTILES = data[3];
987 TILE_X = data[5];
988 TILE_Y = data[6];
989 XGHOSTS = data[7];
990 YGHOSTS = data[8];
991
992 tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
993 tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // also calculated in compute ranks, urghh
994
995 int *xoff = malloc(NTILES*sizeof(int));
996 ASSERT(xoff);
997 int *yoff = malloc(NTILES*sizeof(int));
998 ASSERT(yoff);
999 int *xskip = malloc(NTILES*sizeof(int));
1000 ASSERT(xskip);
1001
1002 offsetTable = malloc((NTILES+1)*sizeof(tile_layout_t));
1003 ASSERT(offsetTable);
1004
1005 ierr = MPI_Bcast(xoff,NTILES,MPI_INT,0,globalIntercomm);
1006 ierr = MPI_Bcast(yoff,NTILES,MPI_INT,0,globalIntercomm);
1007 ierr = MPI_Bcast(xskip,NTILES,MPI_INT,0,globalIntercomm);
1008
1009 for (i=0;i<NTILES;++i){
1010 offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1;
1011 offsetTable[i+1].skip = xskip[i];
1012 }
1013
1014 free(xoff);
1015 free(yoff);
1016 free(xskip);
1017
1018 ///////////////////////////////////////////////////////////////
1019
1020
1021 // At this point, the multitude of various inter-communicators have
1022 // been created and stored in the various epoch "style" arrays.
1023 // The next step is to have the computational tasks "register" the
1024 // data tiles they will send. In this version of the code, we only
1025 // track and store *counts*, not the actual tileIDs.
1026 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1027 fieldInfoThisEpoch_t *p = NULL;
1028 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1029 int thisIntracommSize;
1030
1031 // Find the field we were assigned for this epoch style
1032 int curFieldIndex = -1;
1033 while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1034 p = &(thisEpochStyle[curFieldIndex]);
1035 if (MPI_COMM_NULL != p->registrationIntercomm) break;
1036 }
1037 ASSERT(NULL != p);
1038 ASSERT('\0' != p->dataFieldID);
1039 MPI_Comm_size(p->ioRanksIntracomm, &size);
1040 if (size > maxIntracommSize) maxIntracommSize = size;
1041
1042
1043 // Receive a message from *each* computational rank telling us
1044 // a count of how many tiles that rank will send during epochs
1045 // of this style. Note that some of these counts may be zero
1046 for (i = 0; i < numComputeRanks; ++i) {
1047 MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG,
1048 p->registrationIntercomm, &status);
1049 p->tileCount += status.MPI_TAG;
1050 }
1051 if (p->tileCount > maxTileCount) maxTileCount = p->tileCount;
1052 if (p->zDepth > 1) {
1053 if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount;
1054 }
1055
1056 // Sanity check
1057 MPI_Allreduce (&(p->tileCount), &count, 1,
1058 MPI_INT, MPI_SUM, p->ioRanksIntracomm);
1059 ASSERT(count == totalNumTiles);
1060 }
1061
1062
1063 // In some as-of-yet-undetermined fashion, we decide how many
1064 // buffers to allocate to hold tiles received from the computational
1065 // tasks. The number of such buffers is more-or-less arbitrary (the
1066 // code should be able to function with any number greater than zero).
1067 // More buffers means more parallelism, but uses more space.
1068 // Note that maxTileCount is an upper bound on the number of buffers
1069 // that we can usefully use. Note also that if we are assigned a 2D
1070 // field, we might be assigned to recieve a very large number of tiles
1071 // for that field in that epoch style.
1072
1073 // Hack - for now, just pick a value for numTileBufs
1074 numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount;
1075 if (numTileBufs < 4) numTileBufs = 4;
1076 if (numTileBufs > 15) numTileBufs = 15;
1077 // if (numTileBufs > 25) numTileBufs = 25;
1078
1079 allocateTileBufs(numTileBufs, maxIntracommSize);
1080 countBufs(numTileBufs);
1081
1082
1083 ////////////////////////////////////////////////////////////////////
1084 // Main loop.
1085 ////////////////////////////////////////////////////////////////////
1086
1087 for (;;) {
1088 int cmd[4];
1089
1090 // Make sure all the i/o ranks have completed processing the prior
1091 // cmd (i.e. the prior i/o epoch is complete).
1092 MPI_Barrier(ioIntracomm);
1093
1094 if (0 == ioIntracommRank) {
1095 fprintf(stderr, "I/O ranks waiting for new epoch\n");
1096 MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm);
1097
1098 MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE);
1099 fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d\n", cmd[1],cmd[3]);
1100
1101 // before we start a new epoch, have i/o rank 0:
1102 // determine output filenames for this epoch
1103 // clean up any extant files with same names
1104 // write .meta files
1105
1106 if (cmd_exit != cmd[0]){
1107
1108 fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]);
1109
1110 int epochStyle = cmd[2];
1111 int gcmIter = cmd[3];
1112
1113 fieldInfoThisEpoch_t *fieldInfo;
1114 fieldInfo = epochStyles[epochStyle];
1115 char s[1024];
1116 int res;
1117 FILE *fp;
1118
1119 if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields
1120 char f;
1121 while (f = fieldInfo->dataFieldID){
1122 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1123 fprintf(stderr,"%s\n",s);
1124 res = unlink(s);
1125 if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1126
1127 // skip writing meta files for non-pickup fields
1128 /*
1129 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1130 fp = fopen(s,"w+");
1131 fclose(fp);
1132 */
1133
1134 ++fieldInfo;
1135
1136 }
1137 }
1138
1139 else { // single pickup or pickup_seaice file
1140
1141 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1142 fprintf(stderr,"%s\n",s);
1143 res = unlink(s);
1144 if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1145
1146 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1147 fp = fopen(s,"w+");
1148 write_pickup_meta(fp, gcmIter, fieldInfo->pickup);
1149 fclose(fp);
1150
1151 }
1152 }
1153 }
1154 MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm);
1155
1156 switch (cmd[0]) {
1157
1158 case cmd_exit:
1159 // Don't bother trying to disconnect and free the
1160 // plethora of communicators; just exit.
1161 FPRINTF(stderr,"Received shutdown message\n");
1162 MPI_Finalize();
1163 exit(0);
1164 break;
1165
1166 case cmd_newEpoch:
1167 if ((currentEpochID + 1) != cmd[1]) {
1168 fprintf(stderr, "ERROR: Missing i/o epoch? was %d, "
1169 "but is now %d ??\n", currentEpochID, cmd[1]);
1170 sleep(1); // Give the message a chance to propagate
1171 abort();
1172 }
1173 currentEpochID = cmd[1];
1174
1175 memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined
1176
1177 doNewEpoch(cmd[1], cmd[2], cmd[3]);
1178 break;
1179
1180 default:
1181 fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n",
1182 cmd[0], cmd[1], cmd[2], cmd[3]);
1183 sleep(1);
1184 abort();
1185 break;
1186 }
1187
1188 }
1189
1190 }
1191
1192
1193
1194 ///////////////////////////////////////////////////////////////////////////////////
1195
1196 int
1197 findField(const char c)
1198 {
1199 int i;
1200 for (i = 0; i < numAllFields; ++i) {
1201 if (c == fieldDepths[i].dataFieldID) return i;
1202 }
1203
1204 // Give the error message a chance to propagate before exiting.
1205 fprintf(stderr, "ERROR: Field not found: '%c'\n", c);
1206 sleep(1);
1207 abort();
1208 }
1209
1210
1211
1212 // Given a number of ranks and a set of fields we will want to output,
1213 // figure out how to distribute the ranks among the fields.
1214 void
1215 computeIORankAssigments(
1216 int numComputeRanks,
1217 int numIORanks,
1218 int numFields,
1219 fieldInfoThisEpoch_t *fields,
1220 int assignments[])
1221 {
1222 int i,j,k, sum, count;
1223
1224 int numIONodes = numIORanks / numRanksPerNode;
1225 int numIORanksThisField[numFields];
1226 long int bytesPerIORankThisField[numFields];
1227 int expectedMessagesPerRankThisField[numFields];
1228 long int fieldSizes[numFields];
1229
1230 struct ioNodeInfo_t {
1231 int expectedNumMessagesThisNode;
1232 int numCoresAssigned;
1233 int *assignedFieldThisCore;
1234 } ioNodeInfo[numIONodes];
1235
1236 // Since numRanksPerNode might be dynamically determined, we have
1237 // to dynamically allocate the assignedFieldThisCore arrays.
1238 for (i = 0; i < numIONodes; ++i) {
1239 ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode);
1240 ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore);
1241 }
1242
1243 ASSERT((numIONodes*numRanksPerNode) == numIORanks);
1244
1245
1246 //////////////////////////////////////////////////////////////
1247 // Compute the size for each field in this epoch style
1248 for (i = 0; i < numFields; ++i) {
1249 ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth));
1250 fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth;
1251 }
1252
1253
1254 /////////////////////////////////////////////////////////
1255 // Distribute the available i/o ranks among the fields,
1256 // proportionally based on field size.
1257
1258 // First, assign one rank per field
1259 ASSERT(numIORanks >= numFields);
1260 for (i = 0; i < numFields; ++i) {
1261 numIORanksThisField[i] = 1;
1262 bytesPerIORankThisField[i] = fieldSizes[i];
1263 }
1264
1265 // Now apportion any extra ranks
1266 for (; i < numIORanks; ++i) {
1267
1268 // Find the field 'k' with the most bytesPerIORank
1269 k = 0;
1270 for (j = 1; j < numFields; ++j) {
1271 if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) {
1272 k = j;
1273 }
1274 }
1275
1276 // Assign an i/o rank to that field
1277 numIORanksThisField[k] += 1;
1278 bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k];
1279 }
1280
1281 ////////////////////////////////////////////////////////////
1282 // At this point, all the i/o ranks should be apportioned
1283 // among the fields. Check we didn't screw up the count.
1284 for (sum = 0, i = 0; i < numFields; ++i) {
1285 sum += numIORanksThisField[i];
1286 }
1287 ASSERT(sum == numIORanks);
1288
1289
1290
1291 //////////////////////////////////////////////////////////////////
1292 // The *number* of i/o ranks assigned to a field is based on the
1293 // field size. But the *placement* of those ranks is based on
1294 // the expected number of messages the process will receive.
1295 // [In particular, if we have several fields each with only one
1296 // assigned i/o rank, we do not want to place them all on the
1297 // same node if we don't have to.] This traffic-load balance
1298 // strategy is an approximation at best since the messages may
1299 // not all be the same size (e.g. 2D fields vs. 3D fields), so
1300 // "number of messages" is not the same thing as "traffic".
1301 // But it should suffice.
1302
1303 // Init a couple of things
1304 for (i = 0; i < numFields; ++i) {
1305 expectedMessagesPerRankThisField[i] =
1306 numComputeRanks / numIORanksThisField[i];
1307 }
1308 for (i = 0; i < numIONodes; ++i) {
1309 ioNodeInfo[i].expectedNumMessagesThisNode = 0;
1310 ioNodeInfo[i].numCoresAssigned = 0;
1311 for (j = 0; j < numRanksPerNode; ++j) {
1312 ioNodeInfo[i].assignedFieldThisCore[j] = -1;
1313 }
1314 }
1315
1316 ///////////////////////////////////////////////////////////////////
1317 // Select the i/o node with the smallest expectedNumMessages, and
1318 // assign it a rank from the field with the highest remaining
1319 // expectedMessagesPerRank. Repeat until everything is assigned.
1320 // (Yes, this could be a lot faster, but isn't worth the trouble.)
1321
1322 for (count = 0; count < numIORanks; ++count) {
1323
1324 // Make an initial choice of node
1325 for (i = 0; i < numIONodes; ++i) {
1326 if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break;
1327 }
1328 j = i;
1329 ASSERT(j < numIONodes);
1330
1331 // Search for a better choice
1332 for (++i; i < numIONodes; ++i) {
1333 if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue;
1334 if (ioNodeInfo[i].expectedNumMessagesThisNode <
1335 ioNodeInfo[j].expectedNumMessagesThisNode)
1336 {
1337 j = i;
1338 }
1339 }
1340
1341
1342 // Make an initial choice of field
1343 for (i = 0; i < numFields; ++i) {
1344 if (numIORanksThisField[i] > 0) break;
1345 }
1346 k = i;
1347 ASSERT(k < numFields);
1348
1349 // Search for a better choice
1350 for (++i; i < numFields; ++i) {
1351 if (numIORanksThisField[i] <= 0) continue;
1352 if (expectedMessagesPerRankThisField[i] >
1353 expectedMessagesPerRankThisField[k])
1354 {
1355 k = i;
1356 }
1357 }
1358
1359 // Put one rank from the chosen field onto the chosen node
1360 ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k];
1361 ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k;
1362 ioNodeInfo[j].numCoresAssigned += 1;
1363 numIORanksThisField[k] -= 1;
1364 }
1365
1366 // Sanity check - all ranks were assigned to a core
1367 for (i = 1; i < numFields; ++i) {
1368 ASSERT(0 == numIORanksThisField[i]);
1369 }
1370 // Sanity check - all cores were assigned a rank
1371 for (i = 1; i < numIONodes; ++i) {
1372 ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned);
1373 }
1374
1375
1376 /////////////////////////////////////
1377 // Return the computed assignments
1378 for (i = 0; i < numIONodes; ++i) {
1379 for (j = 0; j < numRanksPerNode; ++j) {
1380 assignments[i*numRanksPerNode + j] =
1381 ioNodeInfo[i].assignedFieldThisCore[j];
1382 }
1383 }
1384
1385 // Clean up
1386 for (i = 0; i < numIONodes; ++i) {
1387 free(ioNodeInfo[i].assignedFieldThisCore);
1388 }
1389
1390 }
1391
1392 //////////////////////////////////////////////////////////////////////////////////
1393
1394
1395
1396
1397 int
1398 isIORank(int commRank, int totalNumNodes, int numIONodes)
1399 {
1400 // Figure out if this rank is on a node that will do i/o.
1401 // Note that the i/o nodes are distributed throughout the
1402 // task, not clustered together.
1403 int ioNodeStride = totalNumNodes / numIONodes;
1404 int thisRankNode = commRank / numRanksPerNode;
1405 int n = thisRankNode / ioNodeStride;
1406 return (((n * ioNodeStride) == thisRankNode) && (n < numIONodes)) ? 1 : 0;
1407 }
1408
1409
1410 // Find the number of *physical cores* on the current node
1411 // (ignore hyperthreading). This should work for pretty much
1412 // any Linux based system (and fail horribly for anything else).
1413 int
1414 getNumCores(void)
1415 {
1416 return 20; // until we figure out why this cratered
1417
1418 char *magic = "cat /proc/cpuinfo | egrep \"core id|physical id\" | "
1419 "tr -d \"\\n\" | sed s/physical/\\\\nphysical/g | "
1420 "grep -v ^$ | sort -u | wc -l";
1421
1422 FILE *fp = popen(magic,"r");
1423 ASSERT(fp);
1424
1425 int rtnValue = -1;
1426
1427 int res = fscanf(fp,"%d",&rtnValue);
1428
1429 ASSERT(1==res);
1430
1431 pclose(fp);
1432
1433 ASSERT(rtnValue > 0);
1434 return rtnValue;
1435 }
1436
1437
1438
1439 ////////////////////////////////////////////////////////////////////////
1440 ////////////////////////////////////////////////////////////////////////
1441 // Routines called by the mitGCM code
1442
1443
1444 ////////////////////////////////////////
1445 // Various "one time" initializations
1446 void
1447 f1(
1448 MPI_Comm parentComm,
1449 int numComputeRanks,
1450 int numTiles,
1451 MPI_Comm *rtnComputeComm)
1452 {
1453 MPI_Comm newIntracomm, newIntercomm, dupParentComm;
1454 int newIntracommRank, thisIsIORank;
1455 int parentSize, parentRank;
1456 int numIONodes, numIORanks;
1457 int mpiIsInitialized, *intPtr, tagUBexists;
1458 int i, j, nF, compRoot, fieldIndex, epochStyleIndex;
1459 int totalNumNodes, numComputeNodes, newIntracommSize;
1460
1461 // Init globals
1462 totalNumTiles = numTiles;
1463 if (numRanksPerNode <= 0) {
1464 // Might also want to check for an env var (or something)
1465 numRanksPerNode = getNumCores();
1466 }
1467
1468 // Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors
1469 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1470 fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex];
1471
1472 int curFieldIndex = -1;
1473 while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') {
1474 i = findField(f->dataFieldID);
1475 if (-1 != f->zDepth) {
1476 if (f->zDepth != fieldDepths[i].numZ) {
1477 fprintf(stderr, "Inconsistent z-depth for field '%c': "
1478 "fieldDepths[%d] and epoch style %d\n",
1479 f->dataFieldID, i, epochStyleIndex);
1480 }
1481 }
1482 f->zDepth = fieldDepths[i].numZ;
1483 }
1484 }
1485
1486
1487 // Find the max MPI "tag" value
1488 MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists);
1489 ASSERT(tagUBexists);
1490 maxTagValue = *intPtr;
1491
1492
1493 // Info about the parent communicator
1494 MPI_Initialized(&mpiIsInitialized);
1495 ASSERT(mpiIsInitialized);
1496 MPI_Comm_size(parentComm, &parentSize);
1497 MPI_Comm_rank(parentComm, &parentRank);
1498
1499
1500 // Figure out how many nodes we can use for i/o.
1501 // To make things (e.g. memory calculations) simpler, we want
1502 // a node to have either all compute ranks, or all i/o ranks.
1503 // If numComputeRanks does not evenly divide numRanksPerNode, we have
1504 // to round up in favor of the compute side.
1505
1506 totalNumNodes = divCeil(parentSize, numRanksPerNode);
1507 numComputeNodes = divCeil(numComputeRanks, numRanksPerNode);
1508 numIONodes = (parentSize - numComputeRanks) / numRanksPerNode;
1509 ASSERT(numIONodes > 0);
1510 ASSERT(numIONodes <= (totalNumNodes - numComputeNodes));
1511 numIORanks = numIONodes * numRanksPerNode;
1512
1513
1514 // It is surprisingly easy to launch the job with the wrong number
1515 // of ranks, particularly if the number of compute ranks is not a
1516 // multiple of numRanksPerNode (the tendency is to just launch one
1517 // rank per core for all available cores). So we introduce a third
1518 // option for a rank: besides being an i/o rank or a compute rank,
1519 // it might instead be an "excess" rank, in which case it just
1520 // calls MPI_Finalize and exits.
1521
1522 typedef enum { isCompute, isIO, isExcess } rankType;
1523 rankType thisRanksType;
1524
1525 if (isIORank(parentRank, totalNumNodes, numIONodes)) {
1526 thisRanksType = isIO;
1527 } else if (parentRank >= numComputeRanks + numIORanks) {
1528 thisRanksType = isExcess;
1529 } else {
1530 thisRanksType = isCompute;
1531 }
1532
1533 // Split the parent communicator into parts
1534 MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm);
1535 MPI_Comm_rank(newIntracomm, &newIntracommRank);
1536
1537 // Excess ranks disappear
1538 // N.B. "parentSize" still includes these ranks.
1539 if (isExcess == thisRanksType) {
1540 MPI_Finalize();
1541 exit(0);
1542 }
1543
1544 // Sanity check
1545 MPI_Comm_size(newIntracomm, &newIntracommSize);
1546 if (isIO == thisRanksType) {
1547 ASSERT(newIntracommSize == numIORanks);
1548 } else {
1549 ASSERT(newIntracommSize == numComputeRanks);
1550 }
1551
1552
1553 // Create a special intercomm from the i/o and compute parts
1554 if (isIO == thisRanksType) {
1555 // Set globals
1556 ioIntracomm = newIntracomm;
1557 MPI_Comm_rank(ioIntracomm, &ioIntracommRank);
1558
1559 // Find the lowest computation rank
1560 for (i = 0; i < parentSize; ++i) {
1561 if (!isIORank(i, totalNumNodes, numIONodes)) break;
1562 }
1563 } else { // isCompute
1564 // Set globals
1565 computeIntracomm = newIntracomm;
1566 MPI_Comm_rank(computeIntracomm, &computeIntracommRank);
1567
1568 // Find the lowest IO rank
1569 for (i = 0; i < parentSize; ++i) {
1570 if (isIORank(i, totalNumNodes, numIONodes)) break;
1571 }
1572 }
1573 MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm);
1574
1575
1576
1577 ///////////////////////////////////////////////////////////////////
1578 // For each different i/o epoch style, split the i/o ranks among
1579 // the fields, and create an inter-communicator for each split.
1580
1581 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1582 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1583 int fieldAssignments[numIORanks];
1584 int rankAssignments[numComputeRanks + numIORanks];
1585
1586 // Count the number of fields in this epoch style
1587 for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ;
1588
1589 // Decide how to apportion the i/o ranks among the fields
1590 for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1;
1591 computeIORankAssigments(numComputeRanks, numIORanks, nF,
1592 thisEpochStyle, fieldAssignments);
1593 // Sanity check
1594 for (i=0; i < numIORanks; ++i) {
1595 ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF));
1596 }
1597
1598 // Embed the i/o rank assignments into an array holding
1599 // the assignments for *all* the ranks (i/o and compute).
1600 // Rank assignment of "-1" means "compute".
1601 j = 0;
1602 for (i = 0; i < numComputeRanks + numIORanks; ++i) {
1603 rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ?
1604 fieldAssignments[j++] : -1;
1605 }
1606 // Sanity check. Check the assignment for this rank.
1607 if (isIO == thisRanksType) {
1608 ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]);
1609 } else {
1610 ASSERT(-1 == rankAssignments[parentRank]);
1611 }
1612 ASSERT(j == numIORanks);
1613
1614 if (0 == parentRank) {
1615 printf("\ncpu assignments, epoch style %d\n", epochStyleIndex);
1616 for (i = 0; i < numComputeNodes + numIONodes ; ++i) {
1617 if (rankAssignments[i*numRanksPerNode] >= 0) {
1618 // i/o node
1619 for (j = 0; j < numRanksPerNode; ++j) {
1620 printf(" %1d", rankAssignments[i*numRanksPerNode + j]);
1621 }
1622 } else {
1623 // compute node
1624 for (j = 0; j < numRanksPerNode; ++j) {
1625 if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break;
1626 ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]);
1627 }
1628 printf(" #");
1629 for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any)
1630 printf("X");
1631 }
1632 }
1633 printf(" ");
1634 }
1635 printf("\n\n");
1636 }
1637
1638 // Find the lowest rank assigned to computation; use it as
1639 // the "root" for the upcoming intercomm creates.
1640 for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot);
1641 // Sanity check
1642 if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank);
1643
1644
1645 // If this is an I/O rank, split the newIntracomm (which, for
1646 // the i/o ranks, is a communicator among all the i/o ranks)
1647 // into the pieces as assigned.
1648
1649 if (isIO == thisRanksType) {
1650 MPI_Comm fieldIntracomm;
1651 int myField = rankAssignments[parentRank];
1652 ASSERT(myField >= 0);
1653
1654 MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm);
1655 thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm;
1656
1657 // Now create an inter-communicator between the computational
1658 // ranks, and each of the newly split off field communicators.
1659 for (i = 0; i < nF; ++i) {
1660
1661 // Do one field at a time to avoid clashes on parentComm
1662 MPI_Barrier(newIntracomm);
1663
1664 if (myField != i) continue;
1665
1666 // Create the intercomm for this field for this epoch style
1667 MPI_Intercomm_create(fieldIntracomm, 0, parentComm,
1668 compRoot, i, &(thisEpochStyle[myField].dataIntercomm));
1669
1670 // ... and dup a separate one for tile registrations
1671 MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm,
1672 &(thisEpochStyle[myField].registrationIntercomm));
1673 }
1674 }
1675 else {
1676 // This is a computational rank; create the intercommunicators
1677 // to the various split off separate field communicators.
1678
1679 for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) {
1680
1681 // Find the remote "root" process for this field
1682 int fieldRoot = -1;
1683 while (rankAssignments[++fieldRoot] != fieldIndex);
1684
1685 // Create the intercomm for this field for this epoch style
1686 MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot,
1687 fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm));
1688
1689 // ... and dup a separate one for tile registrations
1690 MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm,
1691 &(thisEpochStyle[fieldIndex].registrationIntercomm));
1692 }
1693 }
1694
1695 } // epoch style loop
1696
1697
1698 // I/O processes split off and start receiving data
1699 // NOTE: the I/O processes DO NOT RETURN from this call
1700 if (isIO == thisRanksType) ioRankMain(totalNumTiles);
1701
1702
1703 // The "compute" processes now return with their new intra-communicator.
1704 *rtnComputeComm = newIntracomm;
1705
1706 // but first, call mpiio initialization routine!
1707 initSizesAndTypes();
1708
1709 return;
1710 }
1711
1712
1713
1714
1715 // "Register" the tile-id(s) that this process will be sending
1716 void
1717 f2(int tileID)
1718 {
1719 int i, epochStyleIndex;
1720
1721 static int count = 0;
1722
1723 // This code assumes that the same tileID will apply to all the
1724 // fields. Note that we use the tileID as a tag, so we require it
1725 // be an int with a legal tag value, but we do *NOT* actually use
1726 // the tag *value* for anything (e.g. it is NOT used as an index
1727 // into an array). It is treated as an opaque handle that is
1728 // passed from the compute task(s) to the compositing routines.
1729 // Those two end-points likely assign meaning to the tileID (i.e.
1730 // use it to identify where the tile belongs within the domain),
1731 // but that is their business.
1732 //
1733 // A tileID of -1 signifies the end of registration.
1734
1735 ASSERT(computeIntracommRank >= 0);
1736
1737 if (tileID >= 0) {
1738 // In this version of the code, in addition to the tileID, we also
1739 // multiplex in the low order bit(s) of the epoch number as an
1740 // error check. So the tile value must be small enough to allow that.
1741 ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue);
1742
1743 // In this version of the code, we do not send the actual tileID's
1744 // to the i/o processes during the registration procedure. We only
1745 // send a count of the number of tiles that we will send.
1746 ++count;
1747 return;
1748 }
1749
1750
1751 // We get here when we are called with a negative tileID, signifying
1752 // the end of registration. We now need to figure out and inform
1753 // *each* i/o process in *each* field just how many tiles we will be
1754 // sending them in *each* epoch style.
1755
1756 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1757 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1758
1759 int curFieldIndex = -1;
1760 while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1761 fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]);
1762 int numRemoteRanks, *tileCounts, remainder;
1763 MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks);
1764
1765 tileCounts = alloca(numRemoteRanks * sizeof(int));
1766
1767 memset(tileCounts,0,numRemoteRanks * sizeof(int));
1768
1769 // Distribute the tiles among the i/o ranks.
1770 for (i = 0; i < numRemoteRanks; ++i) {
1771 tileCounts[i] = count / numRemoteRanks;
1772 }
1773 // Deal with any remainder
1774 remainder = count - ((count / numRemoteRanks) * numRemoteRanks);
1775 for (i = 0; i < remainder; ++i) {
1776 int target = (computeIntracommRank + i) % numRemoteRanks;
1777 tileCounts[target] += 1;
1778 }
1779
1780 // Communicate these counts to the i/o processes for this
1781 // field. Note that we send a message to each process,
1782 // even if the count is zero.
1783 for (i = 0; i < numRemoteRanks; ++i) {
1784 MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i],
1785 thisField->registrationIntercomm);
1786 }
1787
1788 } // field loop
1789 } // epoch-style loop
1790
1791 }
1792
1793
1794
1795
1796 int currentEpochID = 0;
1797 int currentEpochStyle = 0;
1798
1799 void
1800 beginNewEpoch(int newEpochID, int gcmIter, int epochStyle)
1801 {
1802 fieldInfoThisEpoch_t *p;
1803
1804 if (newEpochID != (currentEpochID + 1)) {
1805 fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n",
1806 currentEpochID, newEpochID);
1807 sleep(1); // Give the message a chance to propagate
1808 abort();
1809 }
1810
1811 ////////////////////////////////////////////////////////////////////////
1812 // Need to be sure the i/o tasks are done processing the previous epoch
1813 // before any compute tasks start sending tiles from a new epoch.
1814
1815 if (0 == computeIntracommRank) {
1816 int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter };
1817
1818 // Wait to get an ack that the i/o tasks are done with the prior epoch.
1819 MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1820 globalIntercomm, MPI_STATUS_IGNORE);
1821
1822 // Tell the i/o ranks about the new epoch.
1823 // (Just tell rank 0; it will bcast to the others)
1824 MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm);
1825 }
1826
1827 // Compute ranks wait here until rank 0 gets the ack from the i/o ranks
1828 MPI_Barrier(computeIntracomm);
1829
1830
1831 currentEpochID = newEpochID;
1832 currentEpochStyle = epochStyle;
1833
1834 // Reset the tileCount (used by f3)
1835 for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1836 p->tileCount = 0;
1837 }
1838 }
1839
1840
1841 void
1842 f3(char dataFieldID, int tileID, int epochID, void *data)
1843 {
1844 int whichField, receiver, tag;
1845
1846 static char priorDataFieldID = '\0';
1847 static int priorEpoch = -1;
1848 static int remoteCommSize;
1849 static fieldInfoThisEpoch_t *p;
1850
1851 int flag=0;
1852
1853 // Check that this global has been set
1854 ASSERT(computeIntracommRank >= 0);
1855
1856
1857 // Figure out some info about this dataFieldID. It is
1858 // likely to be another tile from the same field as last time
1859 // we were called, in which case we can reuse the "static" values
1860 // retained from the prior call. If not, we need to look it up.
1861
1862 if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) {
1863
1864 // It's not the same; we need to look it up.
1865
1866 for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
1867 if (p->dataFieldID == dataFieldID) break;
1868 }
1869 // Make sure we found a valid field
1870
1871 ASSERT(p->dataIntercomm != MPI_COMM_NULL);
1872
1873 MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize);
1874
1875 flag=1;
1876
1877 }
1878
1879 ASSERT(p->dataFieldID == dataFieldID);
1880
1881 receiver = (computeIntracommRank + p->tileCount) % remoteCommSize;
1882
1883 tag = (tileID << numCheckBits) | (epochID & bitMask);
1884
1885 //fprintf(stderr,"%d %d\n",flag,tileID);
1886
1887 /*
1888 if (tileID==189){
1889 int i,j;
1890 for (i=0;i<TILE_Y;++i){
1891 for (j=0;j<TILE_X;++j)
1892 fprintf(stderr,"%f ",((double*)data)[872+i*108+j]);
1893 fprintf(stderr,"\n");
1894 }
1895 }
1896
1897 */
1898
1899
1900
1901 FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n",
1902 computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits);
1903
1904 MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth,
1905 MPI_BYTE, receiver, tag, p->dataIntercomm);
1906
1907 p->tileCount += 1;
1908 priorDataFieldID = dataFieldID;
1909 priorEpoch = epochID;
1910 }
1911
1912
1913
1914 void
1915 f4(int epochID)
1916 {
1917 int i;
1918 ASSERT(computeIntracommRank >= 0);
1919
1920 if (0 == computeIntracommRank) {
1921 int cmd[3] = { cmd_exit, -1, -1 };
1922
1923 // Recv the ack that the prior i/o epoch is complete
1924 MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
1925 globalIntercomm, MPI_STATUS_IGNORE);
1926
1927 // Tell the i/o ranks to exit
1928 // Just tell rank 0; it will bcast to the others
1929 MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm);
1930 }
1931
1932 }
1933
1934
1935
1936 void myasyncio_set_global_sizes_(int *nx, int *ny, int *nz,
1937 int *nt, int *nb,
1938 int *tx, int *ty,
1939 int *ox, int *oy)
1940 {
1941
1942
1943 int data[] = {*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy};
1944
1945 int items = sizeof(data)/sizeof(int);
1946
1947
1948 NTILES = *nt; // total number of tiles
1949
1950
1951 TILE_X = *tx;
1952 TILE_Y = *ty;
1953 XGHOSTS = *ox;
1954 YGHOSTS = *oy;
1955 tileOneZLevelItemCount = ((TILE_X + 2*XGHOSTS) * (TILE_Y + 2*YGHOSTS));
1956 tileOneZLevelSizeInBytes = (tileOneZLevelItemCount * datumSize); // needed by compute ranks in f3
1957
1958
1959 int rank,ierr;
1960 MPI_Comm_rank(globalIntercomm,&rank);
1961
1962
1963 if (0==rank)
1964 printf("%d %d %d\n%d %d\n%d %d\n%d %d\n",*nx,*ny,*nz,*nt,*nb,*tx,*ty,*ox,*oy);
1965
1966
1967 /*
1968 if (0==rank)
1969 ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_ROOT,globalIntercomm);
1970 else
1971 ierr=MPI_Bcast(&NTILES,1,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1972 */
1973
1974 if (0==rank)
1975 ierr=MPI_Bcast(data,items,MPI_INT,MPI_ROOT,globalIntercomm);
1976 else
1977 ierr=MPI_Bcast(data,items,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1978
1979 }
1980
1981 void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip)
1982 {
1983 int rank,ierr;
1984 MPI_Comm_rank(globalIntercomm,&rank);
1985
1986 if (0==rank)
1987 ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1988 else
1989 ierr=MPI_Bcast(xoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1990
1991 if (0==rank)
1992 ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1993 else
1994 ierr=MPI_Bcast(yoff,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
1995
1996 if (0==rank)
1997 ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_ROOT,globalIntercomm);
1998 else
1999 ierr=MPI_Bcast(xskip,NTILES,MPI_INT,MPI_PROC_NULL,globalIntercomm);
2000
2001 }
2002
2003
2004
2005
2006
2007 //////////////////////////////////////////////////////////////////////
2008 // Fortran interface
2009
2010 void
2011 bron_f1(
2012 MPI_Fint *ptr_parentComm,
2013 int *ptr_numComputeRanks,
2014 int *ptr_totalNumTiles,
2015 MPI_Fint *rtnComputeComm)
2016 {
2017 // Convert the Fortran handle into a C handle
2018 MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm);
2019
2020 f1(parentComm, *ptr_numComputeRanks, *ptr_totalNumTiles, &newComm);
2021
2022 // Convert the C handle into a Fortran handle
2023 *rtnComputeComm = MPI_Comm_c2f(newComm);
2024 }
2025
2026
2027
2028 void
2029 bron_f2(int *ptr_tileID)
2030 {
2031 f2(*ptr_tileID);
2032 }
2033
2034
2035 void
2036 beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle)
2037 {
2038 beginNewEpoch(*newEpochID, *gcmIter, *epochStyle);
2039 }
2040
2041
2042 void
2043 bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data)
2044 {
2045 f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data);
2046 }
2047
2048
2049
2050 void
2051 bron_f4(int *ptr_epochID)
2052 {
2053 f4(*ptr_epochID);
2054 }
2055

  ViewVC Help
Powered by ViewVC 1.1.22