/[MITgcm]/MITgcm_contrib/llc_hires/llc_1080/code-async/recvTask.c
ViewVC logotype

Contents of /MITgcm_contrib/llc_hires/llc_1080/code-async/recvTask.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.3 - (show annotations) (download)
Sat Mar 2 19:31:13 2019 UTC (6 years, 5 months ago) by dimitri
Branch: MAIN
CVS Tags: HEAD
Changes since 1.2: +632 -365 lines
File MIME type: text/plain
updated asyncio code from Bron Nelson, which can now be use to read pickup files

1
2 // Code to do the m-on-n fan-in, recompositing, and output, of data
3 // tiles for MITgcm.
4 //
5 // There are aspects of this code that would be simpler in C++, but
6 // we deliberately wrote the code in ansi-C to make linking it with
7 // the Fortran main code easier (should be able to just include the
8 // .o on the link line).
9
10
11 #include "PACKAGES_CONFIG.h"
12 #include "SIZE.h"
13
14 #include <stdio.h>
15 #include <unistd.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <fcntl.h>
21 #include <assert.h>
22
23 #include <mpi.h>
24 #include <alloca.h>
25
26
27 #include <stdint.h>
28 #include <limits.h>
29 #include <errno.h>
30
31 #define DEBUG 1
32
33 #if (DEBUG >= 3)
34 #define FPRINTF fprintf
35 #else
36 #include <stdarg.h>
37 void FPRINTF(FILE *fp,...){return;}
38 #endif
39
40
41 #if (DEBUG >= 2)
42 // Define our own version of "assert", where we sleep rather than abort.
43 // This makes it easier to attach the debugger.
44 #define ASSERT(_expr) \
45 if (!(_expr)) { \
46 fprintf(stderr, "ASSERT failed for pid %d : `%s': %s: %d: %s\n", \
47 getpid(), #_expr, __FILE__, __LINE__, __func__); \
48 sleep(9999); \
49 }
50 #else
51 // Use the standard version of "assert"
52 #define ASSERT assert
53 #endif
54
55
56 // The hostname that each rank is running on. Currently, only rank 0
57 // uses this. It is allocated and deallocated in routine "f1"
58 char (*allHostnames)[HOST_NAME_MAX+1] = NULL;
59
60
61 // If numRanksPerNode is set to be > 0, we just use that setting.
62 // If it is <= 0, we dynamically determine the number of cores on
63 // a node, and use that. (N.B.: we assume *all* nodes being used in
64 // the run have the *same* number of MPI processes on each.)
65 int numRanksPerNode = 0;
66
67 // Just an error check; can be zero (if you're confident it's all correct).
68 #define numCheckBits 2
69 // This bitMask definition works for 2's complement
70 #define bitMask ((1 << numCheckBits) - 1)
71
72
73 ///////////////////////////////////////////////////////////////////////
74 // Fundamental info about the data fields. Most (but not all) of
75 // this is copied or derived from the values in "SIZE.h"
76
77 // double for now. Might also be float someday
78 #define datumType double
79 #define datumSize (sizeof(datumType))
80
81
82 // Info about the data fields. We assume that all the fields have the
83 // same X and Y characteristics, but may have either 1 or NUM_Z levels.
84 // bcn: or MULTDIM levels (evidently used by "sea ice").
85
86 // N.B. Please leave the seemingly unneeded "(long int)" casts in
87 // place. These individual values may not need them, but they get
88 // used in calculations where the extra length might be important.
89 #define NUM_X ((long int) sFacet)
90 #define NUM_Y (((long int) sFacet) * 13)
91 #define NUM_Z ((long int) Nr)
92 #define MULTDIM ((long int) 7)
93
94 // Some values derived from the above constants
95 #define twoDFieldSizeInBytes (NUM_X * NUM_Y * 1 * datumSize)
96 #define threeDFieldSizeInBytes (twoDFieldSizeInBytes * NUM_Z)
97 #define multDFieldSizeInBytes (twoDFieldSizeInBytes * MULTDIM)
98
99 // Info about the data tiles. We assume that all the tiles are the same
100 // size (no odd-sized last piece), they all have the same X and Y
101 // characteristics (including ghosting), and are full depth in Z
102 // (either 1, or NUM_Z, or MULTDIM, as appropriate).
103 #define TILE_X sNx
104 #define TILE_Y sNy
105 #define XGHOSTS OLx
106 #define YGHOSTS OLy
107 #define TOTAL_NUM_TILES ((sFacet / sNx) * (sFacet / sNy) * 13)
108 #define NUM_WET_TILES (nPx * nPy)
109
110 // Size of one Z level of an individual tile. This is the "in memory"
111 // size, including ghosting.
112 #define tileOneZLevelItemCount (((long)(TILE_X + 2*XGHOSTS)) * (TILE_Y + 2*YGHOSTS))
113 #define tileOneZLevelSizeInBytes (tileOneZLevelItemCount * datumSize)
114
115
116
117 //////////////////////////////////////////////////////////////////////
118 // Define the depth (i.e. number of Z-levels) of the various fields.
119
120 typedef struct dataFieldDepth {
121 char dataFieldID;
122 int numZ;
123 } dataFieldDepth_t;
124
125 dataFieldDepth_t fieldDepths[] = {
126 { 'A', MULTDIM }, // seaice, 7 == MULTDIM in SEAICE_SIZE.h
127
128 { 'B', 1 },
129 { 'C', 1 },
130 { 'D', 1 },
131 { 'E', 1 },
132 { 'F', 1 },
133 { 'G', 1 },
134 { 'H', 1 },
135 { 'I', 1 },
136 { 'J', 1 },
137 { 'K', 1 },
138 { 'L', 1 },
139 { 'M', 1 },
140 { 'N', 1 },
141 { 'O', 1 },
142 { 'P', 1 },
143 { 'Q', 1 },
144 { 'R', 1 },
145
146 { 'S', NUM_Z },
147 { 'T', NUM_Z },
148 { 'U', NUM_Z },
149 { 'V', NUM_Z },
150 { 'W', NUM_Z },
151 { 'X', NUM_Z },
152 { 'Y', NUM_Z },
153 };
154 #define numAllFields (sizeof(fieldDepths)/sizeof(dataFieldDepth_t))
155
156
157 ///////////////////////////////////////////////////////////////////////
158 // Info about the various kinds of i/o epochs
159 typedef struct epochFieldInfo {
160 char dataFieldID;
161 MPI_Comm registrationIntercomm;
162 MPI_Comm dataIntercomm;
163 MPI_Comm ioRanksIntracomm; // i/o ranks for *this* field
164 long int tileCount;
165 int zDepth; // duplicates the fieldDepth entry; filled in automatically
166 int *chunkWriters; // Which rank writes the i'th chunk of z-levels
167 int nChunks; // length of chunkWriters array
168 char filenameTemplate[128];
169 long int offset;
170 int pickup;
171 } fieldInfoThisEpoch_t;
172
173 // The normal i/o dump - style 0
174 fieldInfoThisEpoch_t fieldsForEpochStyle_0[] = {
175 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "U.%010d.%s", 0, 0 },
176 { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "V.%010d.%s", 0, 0 },
177 { 'W', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "W.%010d.%s", 0, 0 },
178 { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "Salt.%010d.%s", 0, 0 },
179 { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "Theta.%010d.%s", 0,0 },
180 { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "Eta.%010d.%s", 0,0 },
181
182 { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIarea.%010d.%s", 0,0 },
183 { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIheff.%010d.%s", 0,0 },
184 { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIhsnow.%010d.%s", 0,0 },
185 { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIuice.%010d.%s", 0,0 },
186 { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIvice.%010d.%s", 0,0 },
187 { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "SIhsalt.%010d.%s", 0,0 },
188
189 //{ 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "EtaHnm1.%010d.%s", 0,0 },
190 { 'I', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceTAUX.%010d.%s", 0,0 },
191 { 'J', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceTAUY.%010d.%s", 0,0 },
192 { 'K', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "KPPhbl.%010d.%s", 0,0 },
193 { 'L', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceSflux.%010d.%s", 0,0 },
194
195 { 'M', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceFWflx.%010d.%s", 0,0 },
196 { 'O', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceQnet.%010d.%s", 0,0 },
197 { 'P', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "PhiBot.%010d.%s", 0,0 },
198 { 'Q', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "oceQsw.%010d.%s", 0,0 },
199 //{ 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "dEtaHdt.%010d.%s", 0,0 },
200
201 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "", 0,0 },
202 };
203
204
205 // pickup file dump - style 1
206 // NOTE: if this changes, write_pickup_meta will also need to be changed
207 fieldInfoThisEpoch_t fieldsForEpochStyle_1[] = {
208 { 'U', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 1},
209 { 'V', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 1},
210 { 'T', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 2 + twoDFieldSizeInBytes * 0, 1},
211 { 'S', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 3 + twoDFieldSizeInBytes * 0, 1},
212 { 'X', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 4 + twoDFieldSizeInBytes * 0, 1},
213 { 'Y', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 5 + twoDFieldSizeInBytes * 0, 1},
214 { 'N', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 0, 1},
215 { 'R', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 1, 1},
216 { 'H', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_%010d.%s", threeDFieldSizeInBytes * 6 + twoDFieldSizeInBytes * 2, 1},
217 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "", 0 ,1},
218 };
219
220
221 // seaice pickup dump - style 2
222 // NOTE: if this changes, write_pickup_meta will also need to be changed
223 fieldInfoThisEpoch_t fieldsForEpochStyle_2[] = {
224 { 'A', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 0 + twoDFieldSizeInBytes * 0, 2},
225 { 'B', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 0, 2},
226 { 'C', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 1, 2},
227 { 'D', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 2, 2},
228 { 'G', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 3, 2},
229 { 'E', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 4, 2},
230 { 'F', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "pickup_seaice_%010d.%s", multDFieldSizeInBytes * 1 + twoDFieldSizeInBytes * 5, 2},
231 {'\0', MPI_COMM_NULL, MPI_COMM_NULL, MPI_COMM_NULL, 0, -1, NULL, 0, "", 0 },
232 };
233
234
235 fieldInfoThisEpoch_t *epochStyles[] = {
236 fieldsForEpochStyle_0,
237 fieldsForEpochStyle_1,
238 fieldsForEpochStyle_2,
239 };
240 int numEpochStyles = (sizeof(epochStyles) / sizeof(fieldInfoThisEpoch_t *));
241
242
243 typedef enum {
244 cmd_illegal,
245 cmd_newEpoch,
246 cmd_epochComplete,
247 cmd_exit,
248 } epochCmd_t;
249
250
251 ///////////////////////////////////////////////////////////////////////
252
253 // Note that a rank will only have access to one of the Intracomms,
254 // but all ranks will define the Intercomm.
255 MPI_Comm ioIntracomm = MPI_COMM_NULL;
256 int ioIntracommRank = -1;
257 MPI_Comm computeIntracomm = MPI_COMM_NULL;
258 int computeIntracommRank = -1;
259 MPI_Comm globalIntercomm = MPI_COMM_NULL;
260
261
262 #define divCeil(_x,_y) (((_x) + ((_y) - 1)) / (_y))
263 #define roundUp(_x,_y) (divCeil((_x),(_y)) * (_y))
264
265
266 typedef enum {
267 bufState_illegal,
268 bufState_Free,
269 bufState_InUse,
270 } bufState_t;
271
272
273 typedef struct buf_header{
274 struct buf_header *next;
275 bufState_t state;
276 int requestsArraySize;
277 MPI_Request *requests;
278 char *payload;
279 } bufHdr_t;
280
281
282 bufHdr_t *freeTileBufs_ptr = NULL;
283 bufHdr_t *inUseTileBufs_ptr = NULL;
284
285 int maxTagValue = -1;
286
287
288 // routine to convert double to float during memcpy
289 // need to get byteswapping in here as well
290 memcpy_r8_2_r4 (float *f, double *d, long long *n)
291 {
292 long long i, rem;
293 rem = *n%16LL;
294 for (i = 0; i < rem; i++) {
295 f [i] = d [i];
296 }
297 for (i = rem; i < *n; i += 16) {
298 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 10" : : "m" (d [i + 256 + 0]) );
299 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 11" : : "m" (f [i + 256 + 0]) );
300 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm0 # memcpy_r8_2_r4.c 12" : : "m" (d [i + 0]) : "%xmm0");
301 __asm__ __volatile__ ("movss %%xmm0, %0 # memcpy_r8_2_r4.c 13" : "=m" (f [i + 0]) : : "memory");
302 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm1 # memcpy_r8_2_r4.c 14" : : "m" (d [i + 1]) : "%xmm1");
303 __asm__ __volatile__ ("movss %%xmm1, %0 # memcpy_r8_2_r4.c 15" : "=m" (f [i + 1]) : : "memory");
304 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm2 # memcpy_r8_2_r4.c 16" : : "m" (d [i + 2]) : "%xmm2");
305 __asm__ __volatile__ ("movss %%xmm2, %0 # memcpy_r8_2_r4.c 17" : "=m" (f [i + 2]) : : "memory");
306 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm3 # memcpy_r8_2_r4.c 18" : : "m" (d [i + 3]) : "%xmm3");
307 __asm__ __volatile__ ("movss %%xmm3, %0 # memcpy_r8_2_r4.c 19" : "=m" (f [i + 3]) : : "memory");
308 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm4 # memcpy_r8_2_r4.c 20" : : "m" (d [i + 4]) : "%xmm4");
309 __asm__ __volatile__ ("movss %%xmm4, %0 # memcpy_r8_2_r4.c 21" : "=m" (f [i + 4]) : : "memory");
310 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm5 # memcpy_r8_2_r4.c 22" : : "m" (d [i + 5]) : "%xmm5");
311 __asm__ __volatile__ ("movss %%xmm5, %0 # memcpy_r8_2_r4.c 23" : "=m" (f [i + 5]) : : "memory");
312 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm6 # memcpy_r8_2_r4.c 24" : : "m" (d [i + 6]) : "%xmm6");
313 __asm__ __volatile__ ("movss %%xmm6, %0 # memcpy_r8_2_r4.c 25" : "=m" (f [i + 6]) : : "memory");
314 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm7 # memcpy_r8_2_r4.c 26" : : "m" (d [i + 7]) : "%xmm7");
315 __asm__ __volatile__ ("prefetcht0 %0 # memcpy_r8_2_r4.c 27" : : "m" (d [i + 256 + 8 + 0]) );
316 __asm__ __volatile__ ("movss %%xmm7, %0 # memcpy_r8_2_r4.c 28" : "=m" (f [i + 7]) : : "memory");
317 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm8 # memcpy_r8_2_r4.c 29" : : "m" (d [i + 8]) : "%xmm8");
318 __asm__ __volatile__ ("movss %%xmm8, %0 # memcpy_r8_2_r4.c 30" : "=m" (f [i + 8]) : : "memory");
319 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm9 # memcpy_r8_2_r4.c 31" : : "m" (d [i + 9]) : "%xmm9");
320 __asm__ __volatile__ ("movss %%xmm9, %0 # memcpy_r8_2_r4.c 32" : "=m" (f [i + 9]) : : "memory");
321 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm10 # memcpy_r8_2_r4.c 33" : : "m" (d [i + 10]) : "%xmm10");
322 __asm__ __volatile__ ("movss %%xmm10, %0 # memcpy_r8_2_r4.c 34" : "=m" (f [i + 10]) : : "memory");
323 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm11 # memcpy_r8_2_r4.c 35" : : "m" (d [i + 11]) : "%xmm11");
324 __asm__ __volatile__ ("movss %%xmm11, %0 # memcpy_r8_2_r4.c 36" : "=m" (f [i + 11]) : : "memory");
325 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm12 # memcpy_r8_2_r4.c 37" : : "m" (d [i + 12]) : "%xmm12");
326 __asm__ __volatile__ ("movss %%xmm12, %0 # memcpy_r8_2_r4.c 38" : "=m" (f [i + 12]) : : "memory");
327 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm13 # memcpy_r8_2_r4.c 39" : : "m" (d [i + 13]) : "%xmm13");
328 __asm__ __volatile__ ("movss %%xmm13, %0 # memcpy_r8_2_r4.c 40" : "=m" (f [i + 13]) : : "memory");
329 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm14 # memcpy_r8_2_r4.c 41" : : "m" (d [i + 14]) : "%xmm14");
330 __asm__ __volatile__ ("movss %%xmm14, %0 # memcpy_r8_2_r4.c 42" : "=m" (f [i + 14]) : : "memory");
331 __asm__ __volatile__ ("cvtsd2ss %0, %%xmm15 # memcpy_r8_2_r4.c 43" : : "m" (d [i + 15]) : "%xmm15");
332 __asm__ __volatile__ ("movss %%xmm15, %0 # memcpy_r8_2_r4.c 44" : "=m" (f [i + 15]) : : "memory");
333 }
334 }
335
336
337 // Debug routine
338 countBufs(int nbufs)
339 {
340 int nInUse, nFree;
341 bufHdr_t *bufPtr;
342 static int target = -1;
343
344 if (-1 == target) {
345 ASSERT(-1 != nbufs);
346 target = nbufs;
347 }
348
349 bufPtr = freeTileBufs_ptr;
350 for (nFree = 0; bufPtr != NULL; bufPtr = bufPtr->next) nFree += 1;
351
352 bufPtr = inUseTileBufs_ptr;
353 for (nInUse = 0; bufPtr != NULL; bufPtr = bufPtr->next) nInUse += 1;
354
355 if (nInUse + nFree != target) {
356 fprintf(stderr, "Rank %d: bad number of buffs: free %d, inUse %d, should be %d\n",
357 ioIntracommRank, nFree, nInUse, target);
358 }
359 ASSERT ((nInUse + nFree) == target);
360 }
361
362
363 long int readn(int fd, void *p, long int nbytes)
364 {
365
366 char *ptr = (char*)(p);
367
368 long int nleft, nread;
369
370 nleft = nbytes;
371 while (nleft > 0){
372 nread = read(fd, ptr, nleft);
373 if (nread < 0)
374 return(nread); // error
375 else if (nread == 0)
376 break; // EOF
377
378 nleft -= nread;
379 ptr += nread;
380 }
381 return (nbytes - nleft);
382 }
383
384
385 ssize_t writen(int fd, void *p, size_t nbytes)
386 {
387 char *ptr = (char*)(p);
388
389 size_t nleft;
390 ssize_t nwritten;
391
392 nleft = nbytes;
393 while (nleft > 0){
394 nwritten = write(fd, ptr, nleft);
395 if (nwritten <= 0){
396 if (errno==EINTR) continue; // POSIX, not SVr4
397 return(nwritten); // non-EINTR error
398 }
399
400 nleft -= nwritten;
401 ptr += nwritten;
402 }
403 return(nbytes - nleft);
404 }
405
406
407 void write_pickup_meta(FILE *fp, int gcmIter, int pickup)
408 {
409 int i;
410 int ndims = 2;
411 int nrecords,nfields;
412 char**f;
413 char*fld1[] = {"Uvel","Vvel","Theta","Salt","GuNm1","GvNm1","EtaN","dEtaHdt","EtaH"};
414 char*fld2[] = {"siTICES","siAREA","siHEFF","siHSNOW","siHSALT","siUICE","siVICE"};
415 // for now, just list the fields here. When the whole field specification apparatus
416 // is cleaned up, pull the names out of the epochstyle definition or whatever
417
418 ASSERT(1==pickup||2==pickup);
419
420 if (1==pickup){
421 nrecords = 6*NUM_Z+3;
422 nfields = sizeof(fld1)/sizeof(char*);
423 f = fld1;
424 }
425 else if (2==pickup){
426 nrecords = MULTDIM+6;
427 nfields = sizeof(fld2)/sizeof(char*);
428 f = fld2;
429 }
430
431 fprintf(fp," nDims = [ %3d ];\n",ndims);
432 fprintf(fp," dimList = [\n");
433 fprintf(fp," %10lu,%10d,%10lu,\n",NUM_X,1,NUM_X);
434 fprintf(fp," %10ld,%10d,%10ld\n",NUM_Y,1,NUM_Y);
435 fprintf(fp," ];\n");
436 fprintf(fp," dataprec = [ 'float64' ];\n");
437 fprintf(fp," nrecords = [ %5d ];\n",nrecords);
438 fprintf(fp," timeStepNumber = [ %10d ];\n",gcmIter);
439 fprintf(fp," timeInterval = [ %19.12E ];\n",0.0); // what should this be?
440 fprintf(fp," nFlds = [ %4d ];\n",nfields);
441 fprintf(fp," fldList = {\n");
442 for (i=0;i<nfields;++i)
443 fprintf(fp," '%-8s'",f[i]);
444 fprintf(fp,"\n };\n");
445 }
446
447
448
449 double *outBuf=NULL;//was [NUM_X*NUM_Y*NUM_Z], but only needs to be myNumZSlabs
450 size_t outBufSize=0;
451
452
453 void
454 do_write(int io_epoch, fieldInfoThisEpoch_t *whichField, int myFirstZ, int myNumZ, int gcmIter)
455 {
456 if (0==myNumZ) return;
457
458 int pickup = whichField->pickup;
459
460 ///////////////////////////////
461 // swap here, if necessary
462 // int i,j;
463
464 //i = NUM_X*NUM_Y*myNumZ;
465 //mds_byteswapr8_(&i,outBuf);
466
467 // mds_byteswapr8 expects an integer count, which is gonna overflow someday
468 // can't redefine to long without affecting a bunch of other calls
469 // so do a slab at a time here, to delay the inevitable
470 // i = NUM_X*NUM_Y;
471 //for (j=0;j<myNumZ;++j)
472 // mds_byteswapr8_(&i,&outBuf[i*j]);
473
474 // gnu builtin evidently honored by intel compilers
475
476 if (pickup) {
477 uint64_t *alias = (uint64_t*)outBuf;
478 size_t i;
479 for (i=0;i<NUM_X*NUM_Y*myNumZ;++i)
480 alias[i] = __builtin_bswap64(alias[i]);
481 }
482 else {
483 uint32_t *alias = (uint32_t*)outBuf;
484 size_t i;
485 for (i=0;i<NUM_X*NUM_Y*myNumZ;++i)
486 alias[i] = __builtin_bswap32(alias[i]);
487 }
488
489 // end of swappiness
490 //////////////////////////////////
491
492 char s[1024];
493 //sprintf(s,"henze_%d_%d_%c.dat",io_epoch,gcmIter,whichField->dataFieldID);
494
495 sprintf(s,whichField->filenameTemplate,gcmIter,"data");
496
497 int fd = open(s,O_CREAT|O_WRONLY,S_IRWXU|S_IRGRP);
498 ASSERT(fd!=-1);
499
500 size_t nbytes;
501
502 if (pickup) {
503 lseek(fd,whichField->offset,SEEK_SET);
504 lseek(fd,myFirstZ*NUM_X*NUM_Y*datumSize,SEEK_CUR);
505 nbytes = NUM_X*NUM_Y*myNumZ*datumSize;
506 }
507 else {
508 lseek(fd,myFirstZ*NUM_X*NUM_Y*sizeof(float),SEEK_CUR);
509 nbytes = NUM_X*NUM_Y*myNumZ*sizeof(float);
510 }
511
512 ssize_t bwrit = writen(fd,outBuf,nbytes);
513
514 if (-1==bwrit) perror("Henze : file write problem : ");
515
516 FPRINTF(stderr,"Wrote %d of %d bytes (%d -> %d) \n",bwrit,nbytes,myFirstZ,myNumZ);
517
518 // ASSERT(nbytes == bwrit);
519
520 if (nbytes!=bwrit)
521 fprintf(stderr,"WROTE %ld /%ld\n",bwrit,nbytes);
522
523 close(fd);
524
525
526 return;
527 }
528
529
530
531 typedef struct {
532 long int off;
533 long int skip;
534 } tile_layout_t;
535
536 tile_layout_t *offsetTable;
537
538 void
539 processSlabSection(
540 fieldInfoThisEpoch_t *whichField,
541 int tileID,
542 void *data,
543 int myNumZSlabs)
544 {
545 int z;
546
547 int pickup = whichField->pickup;
548
549 ASSERT ((tileID > 0) && (tileID <= TOTAL_NUM_TILES));
550
551 if (myNumZSlabs * twoDFieldSizeInBytes > outBufSize){
552
553 free(outBuf);
554
555 outBufSize = myNumZSlabs * twoDFieldSizeInBytes;
556
557 outBuf = malloc(outBufSize);
558 ASSERT(outBuf);
559
560 memset(outBuf,0,outBufSize);
561 }
562
563 for (z=0;z<myNumZSlabs;++z){
564
565 off_t zoffset = z*TILE_X*TILE_Y*TOTAL_NUM_TILES;
566 off_t hoffset = offsetTable[tileID].off;
567 off_t skipdst = offsetTable[tileID].skip;
568
569 //double *dst = outBuf + zoffset + hoffset;
570 void *dst;
571 if (pickup)
572 dst = outBuf + zoffset + hoffset;
573 else
574 dst = (float*)outBuf + zoffset + hoffset;
575
576 off_t zoff = z*(TILE_X+2*XGHOSTS)*(TILE_Y+2*YGHOSTS);
577 off_t hoff = YGHOSTS*(TILE_X+2*XGHOSTS) + YGHOSTS;
578 double *src = (double*)data + zoff + hoff;
579
580 off_t skipsrc = TILE_X+2*XGHOSTS;
581
582 long long n = TILE_X;
583
584 int y;
585 if (pickup)
586 for (y=0;y<TILE_Y;++y)
587 memcpy((double*)dst + y * skipdst, src + y * skipsrc, TILE_X*datumSize);
588 else
589 for (y=0;y<TILE_Y;++y)
590 memcpy_r8_2_r4((float*)dst + y * skipdst, src + y * skipsrc, &n);
591 }
592
593 return;
594 }
595
596
597
598 // Allocate some buffers to receive tile-data from the compute ranks
599 void
600 allocateTileBufs(int numTileBufs, int maxIntracommSize)
601 {
602 int i, j;
603 for (i = 0; i < numTileBufs; ++i) {
604
605 bufHdr_t *newBuf = malloc(sizeof(bufHdr_t));
606 ASSERT(NULL != newBuf);
607
608 newBuf->payload = malloc(tileOneZLevelSizeInBytes * NUM_Z);
609 ASSERT(NULL != newBuf->payload);
610
611 newBuf->requests = malloc(maxIntracommSize * sizeof(MPI_Request));
612 ASSERT(NULL != newBuf->requests);
613
614 // Init some values
615 newBuf->requestsArraySize = maxIntracommSize;
616 for (j = 0; j < maxIntracommSize; ++j) {
617 newBuf->requests[j] = MPI_REQUEST_NULL;
618 }
619
620 // Put the buf into the free list
621 newBuf->state = bufState_Free;
622 newBuf->next = freeTileBufs_ptr;
623 freeTileBufs_ptr = newBuf;
624 }
625 }
626
627
628
629 bufHdr_t *
630 getFreeBuf()
631 {
632 int j;
633 bufHdr_t *rtnValue = freeTileBufs_ptr;
634
635 if (NULL != rtnValue) {
636 ASSERT(bufState_Free == rtnValue->state);
637 freeTileBufs_ptr = rtnValue->next;
638 rtnValue->next = NULL;
639
640 // Paranoia. This should already be the case
641 for (j = 0; j < rtnValue->requestsArraySize; ++j) {
642 rtnValue->requests[j] = MPI_REQUEST_NULL;
643 }
644 }
645 return rtnValue;
646 }
647
648
649 /////////////////////////////////////////////////////////////////
650
651
652 bufHdr_t *
653 tryToReceiveDataTile(
654 MPI_Comm dataIntercomm,
655 int epochID,
656 size_t expectedMsgSize,
657 int *tileID)
658 {
659 bufHdr_t *bufHdr;
660 int pending, i, count;
661 MPI_Status mpiStatus;
662
663 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, dataIntercomm,
664 &pending, &mpiStatus);
665
666 // If no data are pending, or if we can't get a buffer to
667 // put the pending data into, return NULL
668 if (!pending) return NULL;
669 if (((bufHdr = getFreeBuf()) == NULL)) {
670 FPRINTF(stderr,"tile %d(%d) pending, but no buf to recv it\n",
671 mpiStatus.MPI_TAG, mpiStatus.MPI_TAG >> numCheckBits);
672 return NULL;
673 }
674
675 // Do sanity checks on the pending message
676 MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
677 ASSERT(count == expectedMsgSize);
678 ASSERT((mpiStatus.MPI_TAG & bitMask) == (epochID & bitMask));
679
680 // Recieve the data we saw in the iprobe
681 MPI_Recv(bufHdr->payload, expectedMsgSize, MPI_BYTE,
682 mpiStatus.MPI_SOURCE, mpiStatus.MPI_TAG,
683 dataIntercomm, &mpiStatus);
684 bufHdr->state = bufState_InUse;
685
686 // Return values
687 *tileID = mpiStatus.MPI_TAG >> numCheckBits;
688
689
690 FPRINTF(stderr, "recv tile %d(%d) from compute rank %d\n",
691 mpiStatus.MPI_TAG, *tileID, mpiStatus.MPI_SOURCE);
692
693 return bufHdr;
694 }
695
696
697
698 int
699 tryToReceiveZSlab(
700 void *buf,
701 long int expectedMsgSize,
702 MPI_Comm intracomm)
703 {
704 MPI_Status mpiStatus;
705 int pending, count;
706
707 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, intracomm, &pending, &mpiStatus);
708 if (!pending) return -1;
709
710 MPI_Get_count(&mpiStatus, MPI_BYTE, &count);
711 ASSERT(count == expectedMsgSize);
712
713 MPI_Recv(buf, count, MPI_BYTE, mpiStatus.MPI_SOURCE,
714 mpiStatus.MPI_TAG, intracomm, &mpiStatus);
715
716 FPRINTF(stderr, "recv slab %d from rank %d\n",
717 mpiStatus.MPI_TAG, mpiStatus.MPI_SOURCE);
718
719 // return the tileID
720 return mpiStatus.MPI_TAG;
721 }
722
723
724 void
725 redistributeZSlabs(
726 bufHdr_t *bufHdr,
727 int tileID,
728 int zSlabsPer,
729 fieldInfoThisEpoch_t *fieldInfo)
730 {
731 int thisFieldNumZ = fieldInfo->zDepth;
732 long int tileSizeInBytes = tileOneZLevelSizeInBytes * thisFieldNumZ;
733 long int fullPieceSize = zSlabsPer * tileOneZLevelSizeInBytes;
734 long int offset = 0;
735
736 MPI_Comm intracomm = fieldInfo->ioRanksIntracomm;
737
738 // Note that the MPI interface definition requires that this arg
739 // (i.e. "pieceSize") be of type "int". So check to be sure we
740 // haven't overflowed the size.
741 int pieceSize = (int)fullPieceSize;
742 if (pieceSize != fullPieceSize) {
743 fprintf(stderr, "ERROR: pieceSize:%d, fullPieceSize:%ld\n",
744 pieceSize, fullPieceSize);
745 fprintf(stderr, "ERROR: tileOneZLevelSizeInBytes:%ld, tileSizeInBytes:%ld\n",
746 tileOneZLevelSizeInBytes, tileSizeInBytes);
747 fprintf(stderr, "ERROR: thisFieldNumZ:%d, zSlabsPer:%d\n",
748 thisFieldNumZ , zSlabsPer);
749 }
750 ASSERT (pieceSize == fullPieceSize);
751 ASSERT (pieceSize > 0);
752
753
754 // Send each piece (aka chunk) to the rank that is responsible
755 // for writing it.
756 int curPiece = 0;
757 while ((offset + pieceSize) <= tileSizeInBytes) {
758 int recvRank = fieldInfo->chunkWriters[curPiece];
759 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
760 tileID, intracomm, &(bufHdr->requests[recvRank]));
761 ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
762
763 offset += pieceSize;
764 curPiece += 1;
765 }
766
767 // There might be one last odd-sized piece (N.B.: odd-sized in Z;
768 // the X and Y sizes of a tile are always the same).
769 if (offset < tileSizeInBytes) {
770 ASSERT (pieceSize >= tileSizeInBytes - offset);
771 pieceSize = tileSizeInBytes - offset;
772 ASSERT(pieceSize % tileOneZLevelSizeInBytes == 0);
773
774 int recvRank = fieldInfo->chunkWriters[curPiece];
775 MPI_Isend(bufHdr->payload + offset, pieceSize, MPI_BYTE, recvRank,
776 tileID, intracomm, &(bufHdr->requests[recvRank]));
777 ASSERT(MPI_REQUEST_NULL != bufHdr->requests[recvRank]);
778 curPiece += 1;
779 }
780
781 ASSERT (curPiece == fieldInfo->nChunks);
782 }
783
784
785
786 /////////////////////////////////////////////////////////////////
787 // This is called by the i/o ranks
788 void
789 doNewEpoch(int epochID, int epochStyleIndex, int gcmIter)
790 {
791 // In an i/o epoch, the i/o ranks are partitioned into groups,
792 // each group dealing with one field. The ranks within a group:
793 // (1) receive data tiles from the compute ranks
794 // (2) slice the received data tiles into slabs in the 'z'
795 // dimension, and redistribute the tile-slabs among the group.
796 // (3) receive redistributed tile-slabs, and reconstruct a complete
797 // field-slab for the whole field.
798 // (4) Write the completed field-slab to disk.
799
800 fieldInfoThisEpoch_t *fieldInfo;
801 int intracommRank, intracommSize;
802
803 int zSlabsPer;
804 int myNumZSlabs, myFirstZSlab, myLastZSlab, myNumSlabPiecesToRecv;
805
806 int ii, numTilesRecvd, numSlabPiecesRecvd;
807
808
809 // Find the descriptor for my assigned field for this epoch style.
810 // It is the one whose dataIntercomm is not null
811 fieldInfo = epochStyles[epochStyleIndex];
812 while (MPI_COMM_NULL == fieldInfo->dataIntercomm) ++fieldInfo;
813 ASSERT('\0' != fieldInfo->dataFieldID);
814
815
816 MPI_Comm_rank(fieldInfo->ioRanksIntracomm, &intracommRank);
817 MPI_Comm_size(fieldInfo->ioRanksIntracomm, &intracommSize);
818
819 zSlabsPer = divCeil(fieldInfo->zDepth, intracommSize);
820
821 // Figure out if this rank writes z-slabs, and if so, which ones
822 myNumZSlabs = 0;
823 myFirstZSlab = 0;
824 myLastZSlab = -1;
825 for (ii = 0; ii < fieldInfo->nChunks; ++ii) {
826 if (fieldInfo->chunkWriters[ii] == intracommRank) {
827 myFirstZSlab = ii * zSlabsPer;
828 // The last chunk might be odd sized
829 myNumZSlabs = ((ii + 1) < fieldInfo->nChunks) ?
830 zSlabsPer : fieldInfo->zDepth - myFirstZSlab;
831 myLastZSlab = myFirstZSlab + myNumZSlabs - 1;
832
833 break;
834 }
835 }
836 if (myNumZSlabs > 0) {
837 ASSERT ((myFirstZSlab >= 0) && (myFirstZSlab < fieldInfo->zDepth));
838 ASSERT ((myLastZSlab >= 0) && (myLastZSlab < fieldInfo->zDepth));
839 ASSERT (myLastZSlab >= myFirstZSlab);
840 }
841
842
843 // If we were not assigned any z-slabs, we don't get any redistributed
844 // tile-slabs. If we were assigned one or more slabs, we get
845 // redistributed tile-slabs for every tile.
846 myNumSlabPiecesToRecv = (0 == myNumZSlabs) ? 0 : NUM_WET_TILES;
847
848
849 numTilesRecvd = 0;
850 numSlabPiecesRecvd = 0;
851
852 ////////////////////////////////////////////////////////////////////
853 // Main loop. Handle tiles from the current epoch
854 for(;;) {
855 bufHdr_t *bufHdr;
856
857 ////////////////////////////////////////////////////////////////
858 // Check for tiles from the computational tasks
859 while (numTilesRecvd < fieldInfo->tileCount) {
860 int tileID = -1;
861 size_t msgSize = tileOneZLevelSizeInBytes * fieldInfo->zDepth;
862 bufHdr = tryToReceiveDataTile(fieldInfo->dataIntercomm, epochID,
863 msgSize, &tileID);
864 if (NULL == bufHdr) break; // No tile was received
865
866 numTilesRecvd += 1;
867 redistributeZSlabs(bufHdr, tileID, zSlabsPer, fieldInfo);
868
869 // Add the bufHdr to the "in use" list
870 bufHdr->next = inUseTileBufs_ptr;
871 inUseTileBufs_ptr = bufHdr;
872 }
873
874
875 ////////////////////////////////////////////////////////////////
876 // Check for tile-slabs redistributed from the i/o processes
877 while (numSlabPiecesRecvd < myNumSlabPiecesToRecv) {
878 long int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
879 char data[msgSize];
880
881 int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
882 if (tileID < 0) break; // No slab was received
883
884 numSlabPiecesRecvd += 1;
885 processSlabSection(fieldInfo, tileID, data, myNumZSlabs);
886
887 // Can do the write here, or at the end of the epoch.
888 // Probably want to make it asynchronous (waiting for
889 // completion at the barrier at the start of the next epoch).
890 //if (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) {
891 // ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
892 // do_write(io_epoch,whichField,myFirstZSlab,myNumZSlabs);
893 //}
894 }
895
896
897 ////////////////////////////////////////////////////////////////
898 // Sanity check for non-writers
899 if (0 == myNumSlabPiecesToRecv) {
900
901 long int msgSize = tileOneZLevelSizeInBytes * myNumZSlabs;
902 char data[msgSize];
903
904 // Check that no one has tried to re-distribute a slab to us.
905 int tileID = tryToReceiveZSlab(data, msgSize, fieldInfo->ioRanksIntracomm);
906 ASSERT (tileID < 0);
907 }
908
909
910 ////////////////////////////////////////////////////////////////
911 // Check if we can release any of the inUse buffers (i.e. the
912 // isends we used to redistribute those z-slabs have completed).
913 // We do this by detaching the inUse list, then examining each
914 // element in the list, putting each element either into the
915 // free list or back into the inUse list, as appropriate.
916 bufHdr = inUseTileBufs_ptr;
917 inUseTileBufs_ptr = NULL;
918 while (NULL != bufHdr) {
919 int count = 0;
920 int completions[intracommSize];
921 MPI_Status statuses[intracommSize];
922
923 // Acquire the "next" bufHdr now, before we change anything.
924 bufHdr_t *nextBufHeader = bufHdr->next;
925
926 // Check to see if the Isend requests have completed for this buf
927 MPI_Testsome(intracommSize, bufHdr->requests,
928 &count, completions, statuses);
929
930 if (MPI_UNDEFINED == count) {
931 // A return of UNDEFINED means that none of the requests
932 // is still outstanding, i.e. they have all completed.
933 // Put the buf on the free list.
934 bufHdr->state = bufState_Free;
935 bufHdr->next = freeTileBufs_ptr;
936 freeTileBufs_ptr = bufHdr;
937 FPRINTF(stderr,"Rank %d freed a tile buffer\n", intracommRank);
938 } else {
939 // At least one request is still outstanding.
940 // Put the buf back on the inUse list.
941 bufHdr->next = inUseTileBufs_ptr;
942 inUseTileBufs_ptr = bufHdr;
943 }
944
945 // Countinue with the next buffer
946 bufHdr = nextBufHeader;
947 }
948
949
950 ////////////////////////////////////////////////////////////////
951 // Check if the epoch is complete
952 if ((numTilesRecvd >= fieldInfo->tileCount) &&
953 (numSlabPiecesRecvd >= myNumSlabPiecesToRecv) &&
954 (NULL == inUseTileBufs_ptr))
955 {
956 ASSERT(numTilesRecvd == fieldInfo->tileCount);
957 ASSERT(numSlabPiecesRecvd == myNumSlabPiecesToRecv);
958
959 //fprintf(stderr,"rank %d %d %d %d\n",intracommRank,numTilesRecvd,numSlabPiecesRecvd,myNumZSlabs);
960
961 // Ok, wrap up the current i/o epoch
962 // Probably want to make this asynchronous (waiting for
963 // completion at the start of the next epoch).
964 do_write(epochID, fieldInfo, myFirstZSlab, myNumZSlabs, gcmIter);
965
966 // The epoch is complete
967 return;
968 }
969 }
970
971 }
972
973
974
975
976 // Main routine for the i/o tasks. Callers DO NOT RETURN from
977 // this routine; they MPI_Finalize and exit.
978 void
979 ioRankMain (void)
980 {
981 // This is one of a group of i/o processes that will be receiving tiles
982 // from the computational processes. This same process is also
983 // responsible for compositing slices of tiles together into one or
984 // more complete slabs, and then writing those slabs out to disk.
985 //
986 // When a tile is received, it is cut up into slices, and each slice is
987 // sent to the appropriate compositing task that is dealing with those
988 // "z" level(s) (possibly including ourselves). These are tagged with
989 // the tile-id. When we *receive* such a message (from ourselves, or
990 // from any other process in this group), we unpack the data (stripping
991 // the ghost cells), and put them into the slab we are assembling. Once
992 // a slab is fully assembled, it is written to disk.
993
994 int i, ierr, count, numTileBufs;
995 MPI_Status status;
996 int currentEpochID = 0;
997 int maxTileCount = 0, max3DTileCount = 0, maxIntracommSize = 0;
998
999 int numComputeRanks, epochStyleIndex;
1000 MPI_Comm_remote_size(globalIntercomm, &numComputeRanks);
1001
1002
1003 int *xoff = malloc(TOTAL_NUM_TILES*sizeof(int));
1004 ASSERT(xoff);
1005 int *yoff = malloc(TOTAL_NUM_TILES*sizeof(int));
1006 ASSERT(yoff);
1007 int *xskip = malloc(TOTAL_NUM_TILES*sizeof(int));
1008 ASSERT(xskip);
1009
1010 offsetTable = malloc((TOTAL_NUM_TILES+1)*sizeof(tile_layout_t));
1011 ASSERT(offsetTable);
1012
1013 ierr = MPI_Bcast(xoff,TOTAL_NUM_TILES,MPI_INT,0,globalIntercomm);
1014 ierr = MPI_Bcast(yoff,TOTAL_NUM_TILES,MPI_INT,0,globalIntercomm);
1015 ierr = MPI_Bcast(xskip,TOTAL_NUM_TILES,MPI_INT,0,globalIntercomm);
1016
1017 for (i=0;i<TOTAL_NUM_TILES;++i){
1018 offsetTable[i+1].off = NUM_X*(yoff[i]-1) + xoff[i] - 1;
1019 offsetTable[i+1].skip = xskip[i];
1020 }
1021
1022 free(xoff);
1023 free(yoff);
1024 free(xskip);
1025
1026 ///////////////////////////////////////////////////////////////
1027
1028
1029 // At this point, the multitude of various inter-communicators have
1030 // been created and stored in the various epoch "style" arrays.
1031 // The next step is to have the computational tasks "register" the
1032 // data tiles they will send. In this version of the code, we only
1033 // track and store *counts*, not the actual tileIDs.
1034 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1035 fieldInfoThisEpoch_t *p = NULL;
1036 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1037 int thisIntracommSize;
1038
1039 // Find the field we were assigned for this epoch style
1040 int curFieldIndex = -1;
1041 while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
1042 p = &(thisEpochStyle[curFieldIndex]);
1043 if (MPI_COMM_NULL != p->registrationIntercomm) break;
1044 }
1045 ASSERT(NULL != p);
1046 ASSERT('\0' != p->dataFieldID);
1047 MPI_Comm_size(p->ioRanksIntracomm, &thisIntracommSize);
1048 if (thisIntracommSize > maxIntracommSize) {
1049 maxIntracommSize = thisIntracommSize;
1050 }
1051
1052
1053 // Receive a message from *each* computational rank telling us
1054 // a count of how many tiles that rank will send during epochs
1055 // of this style. Note that some of these counts may be zero
1056 for (i = 0; i < numComputeRanks; ++i) {
1057 MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG,
1058 p->registrationIntercomm, &status);
1059 p->tileCount += status.MPI_TAG;
1060 }
1061 if (p->tileCount > maxTileCount) maxTileCount = p->tileCount;
1062 if (p->zDepth > 1) {
1063 if (p->tileCount > max3DTileCount) max3DTileCount = p->tileCount;
1064 }
1065
1066 // Sanity check
1067 MPI_Allreduce (&(p->tileCount), &count, 1,
1068 MPI_INT, MPI_SUM, p->ioRanksIntracomm);
1069 ASSERT(count == NUM_WET_TILES);
1070 }
1071
1072
1073 // In some as-of-yet-undetermined fashion, we decide how many
1074 // buffers to allocate to hold tiles received from the computational
1075 // tasks. The number of such buffers is more-or-less arbitrary (the
1076 // code should be able to function with any number greater than zero).
1077 // More buffers means more parallelism, but uses more space.
1078 // Note that maxTileCount is an upper bound on the number of buffers
1079 // that we can usefully use. Note also that if we are assigned a 2D
1080 // field, we might be assigned to recieve a very large number of tiles
1081 // for that field in that epoch style.
1082
1083 // Hack - for now, just pick a value for numTileBufs
1084 numTileBufs = (max3DTileCount > 0) ? max3DTileCount : maxTileCount;
1085 if (numTileBufs < 4) numTileBufs = 4;
1086 if (numTileBufs > 25) numTileBufs = 25;
1087
1088 allocateTileBufs(numTileBufs, maxIntracommSize);
1089 countBufs(numTileBufs);
1090
1091
1092 ////////////////////////////////////////////////////////////////////
1093 // Main loop.
1094 ////////////////////////////////////////////////////////////////////
1095
1096 for (;;) {
1097 int cmd[4];
1098
1099 // Make sure all the i/o ranks have completed processing the prior
1100 // cmd (i.e. the prior i/o epoch is complete).
1101 MPI_Barrier(ioIntracomm);
1102
1103 if (0 == ioIntracommRank) {
1104 fprintf(stderr, "I/O ranks waiting for new epoch at time %f\n",MPI_Wtime());
1105 MPI_Send(NULL, 0, MPI_BYTE, 0, cmd_epochComplete, globalIntercomm);
1106
1107 MPI_Recv(cmd, 4, MPI_INT, 0, 0, globalIntercomm, MPI_STATUS_IGNORE);
1108 fprintf(stderr, "I/O ranks begining new epoch: %d, gcmIter = %d, at time %f\n",
1109 cmd[1], cmd[3], MPI_Wtime());
1110
1111 // before we start a new epoch, have i/o rank 0:
1112 // determine output filenames for this epoch
1113 // clean up any extant files with same names
1114 // write .meta files
1115
1116 if (cmd_exit != cmd[0]){
1117
1118 fprintf(stderr,"new epoch: epoch %d, style %d, gcmIter %d\n", cmd[1],cmd[2],cmd[3]);
1119
1120 int epochStyle = cmd[2];
1121 int gcmIter = cmd[3];
1122
1123 fieldInfoThisEpoch_t *fieldInfo;
1124 fieldInfo = epochStyles[epochStyle];
1125 char s[1024];
1126 int res;
1127 FILE *fp;
1128
1129 if (fieldInfo->pickup==0){ // for non-pickups, need to loop over individual fields
1130 char f;
1131 while (f = fieldInfo->dataFieldID){
1132 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1133 fprintf(stderr,"%s\n",s);
1134 res = unlink(s);
1135 if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1136
1137 // skip writing meta files for non-pickup fields
1138 /*
1139 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1140 fp = fopen(s,"w+");
1141 fclose(fp);
1142 */
1143
1144 ++fieldInfo;
1145
1146 }
1147 }
1148
1149 else { // single pickup or pickup_seaice file
1150
1151 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"data");
1152 fprintf(stderr,"%s\n",s);
1153 res = unlink(s);
1154 if (-1==res && ENOENT!=errno) fprintf(stderr,"unable to rm %s\n",s);
1155
1156 sprintf(s,fieldInfo->filenameTemplate,gcmIter,"meta");
1157 fp = fopen(s,"w+");
1158 write_pickup_meta(fp, gcmIter, fieldInfo->pickup);
1159 fclose(fp);
1160
1161 }
1162 }
1163 }
1164 MPI_Bcast(cmd, 4, MPI_INT, 0, ioIntracomm);
1165
1166 if (0 == ioIntracommRank)
1167 fprintf(stderr,"i/o handshake completed %d %d %f\n",cmd[1],cmd[3],MPI_Wtime());
1168
1169 switch (cmd[0]) {
1170
1171 case cmd_exit:
1172 // Don't bother trying to disconnect and free the
1173 // plethora of communicators; just exit.
1174 FPRINTF(stderr,"Received shutdown message\n");
1175 MPI_Finalize();
1176 exit(0);
1177 break;
1178
1179 case cmd_newEpoch:
1180 if ((currentEpochID + 1) != cmd[1]) {
1181 fprintf(stderr, "ERROR: Missing i/o epoch? was %d, "
1182 "but is now %d ??\n", currentEpochID, cmd[1]);
1183 sleep(1); // Give the message a chance to propagate
1184 abort();
1185 }
1186 currentEpochID = cmd[1];
1187
1188 memset(outBuf,0,outBufSize); // zero the outBuf, so dry tiles are well defined
1189
1190 doNewEpoch(cmd[1], cmd[2], cmd[3]);
1191 break;
1192
1193 default:
1194 fprintf(stderr, "Unexpected epoch command: %d %d %d %d\n",
1195 cmd[0], cmd[1], cmd[2], cmd[3]);
1196 sleep(1);
1197 abort();
1198 break;
1199 }
1200
1201 }
1202
1203 }
1204
1205
1206
1207 ///////////////////////////////////////////////////////////////////////////////////
1208
1209 int
1210 findField(const char c)
1211 {
1212 int i;
1213 for (i = 0; i < numAllFields; ++i) {
1214 if (c == fieldDepths[i].dataFieldID) return i;
1215 }
1216
1217 // Give the error message a chance to propagate before exiting.
1218 fprintf(stderr, "ERROR: Field not found: '%c'\n", c);
1219 sleep(1);
1220 abort();
1221 }
1222
1223
1224
1225 // Given a number of ranks and a set of fields we will want to output,
1226 // figure out how to distribute the ranks among the fields.
1227 // We attempt to distribute according to three separate criteria:
1228 // 1) Try to make the number of ranks assigned to a field be
1229 // proportional to the number of bytes in that field.
1230 // 2) Try to even out the total number of MPI messages that are
1231 // sent to any given i/o node.
1232 // 3) Try to even out the amount of output being written from
1233 // any given i/o node.
1234
1235 void
1236 computeIORankAssigments(
1237 int numComputeRanks,
1238 int numIORanks,
1239 int numFields,
1240 fieldInfoThisEpoch_t *fields,
1241 int assignments[],
1242 int nToWrite[])
1243 {
1244 int i,j,k, iteration, sum, count;
1245
1246 int numIONodes = numIORanks / numRanksPerNode;
1247 int numIORanksThisField[numFields], nRanks[numFields];
1248 long int bytesPerIORankThisField[numFields];
1249 int expectedMessagesPerRankThisField[numFields];
1250 int isWriter[numIORanks];
1251 long int fieldSizes[numFields];
1252
1253 struct ioNodeInfo_t {
1254 int expectedNumMessagesThisNode;
1255 int numCoresAssigned;
1256 int *assignedFieldThisCore;
1257 } ioNodeInfo[numIONodes];
1258
1259 // Since numRanksPerNode might be dynamically determined, we have
1260 // to dynamically allocate the assignedFieldThisCore arrays.
1261 for (i = 0; i < numIONodes; ++i) {
1262 ioNodeInfo[i].assignedFieldThisCore = malloc(sizeof(int)*numRanksPerNode);
1263 ASSERT(NULL != ioNodeInfo[i].assignedFieldThisCore);
1264 }
1265
1266 ASSERT((numIONodes*numRanksPerNode) == numIORanks);
1267
1268 memset (assignments, 0, numIORanks*sizeof(int));
1269 memset (isWriter, 0, numIORanks*sizeof(int));
1270
1271
1272 //////////////////////////////////////////////////////////////
1273 // Compute the size for each field in this epoch style
1274 for (i = 0; i < numFields; ++i) {
1275 ASSERT((1 == fields[i].zDepth) || (NUM_Z == fields[i].zDepth) || (MULTDIM == fields[i].zDepth));
1276 fieldSizes[i] = twoDFieldSizeInBytes * fields[i].zDepth;
1277 }
1278
1279
1280 /////////////////////////////////////////////////////////////////
1281 // Phase 1: Distribute the number of available i/o ranks among
1282 // the fields, proportionally based on field size.
1283
1284 // First, assign one rank per field
1285 ASSERT(numIORanks >= numFields);
1286 for (i = 0; i < numFields; ++i) {
1287 numIORanksThisField[i] = 1;
1288 bytesPerIORankThisField[i] = fieldSizes[i];
1289 }
1290
1291 // Now apportion any extra ranks
1292 for (; i < numIORanks; ++i) {
1293
1294 // Find the field 'k' with the most bytesPerIORank
1295 k = 0;
1296 for (j = 1; j < numFields; ++j) {
1297 if (bytesPerIORankThisField[j] > bytesPerIORankThisField[k]) {
1298 k = j;
1299 }
1300 }
1301
1302 // Assign an i/o rank to that field
1303 numIORanksThisField[k] += 1;
1304 bytesPerIORankThisField[k] = fieldSizes[k] / numIORanksThisField[k];
1305 }
1306
1307 /////////////////////////////////////////////////////////////////
1308 // At this point, the number of i/o ranks should be apportioned
1309 // among the fields. Check we didn't screw up the count.
1310 for (sum = 0, i = 0; i < numFields; ++i) {
1311 sum += numIORanksThisField[i];
1312 }
1313 ASSERT(sum == numIORanks);
1314
1315 // Make a copy of numIORanksThisField
1316 memcpy (nRanks, numIORanksThisField, numFields*sizeof(int));
1317
1318
1319
1320 //////////////////////////////////////////////////////////////////
1321 // Phase 2: try to even out the number of messages per node.
1322 // The *number* of i/o ranks assigned to a field is based on the
1323 // field size. But the *placement* of those ranks is based on
1324 // the expected number of messages the process will receive.
1325 // [In particular, if we have several fields each with only one
1326 // assigned i/o rank, we do not want to place them all on the
1327 // same node if we don't have to.] This traffic-load balance
1328 // strategy is an approximation at best since the messages may
1329 // not all be the same size (e.g. 2D fields vs. 3D fields), so
1330 // "number of messages" is not the same thing as "traffic".
1331 // But it should suffice.
1332
1333 // Init a couple of things
1334 for (i = 0; i < numFields; ++i) {
1335 expectedMessagesPerRankThisField[i] =
1336 numComputeRanks / numIORanksThisField[i];
1337 }
1338 for (i = 0; i < numIONodes; ++i) {
1339 ioNodeInfo[i].expectedNumMessagesThisNode = 0;
1340 ioNodeInfo[i].numCoresAssigned = 0;
1341 for (j = 0; j < numRanksPerNode; ++j) {
1342 ioNodeInfo[i].assignedFieldThisCore[j] = -1;
1343 }
1344 }
1345
1346 ///////////////////////////////////////////////////////////////////
1347 // Select the i/o node with the smallest expectedNumMessages, and
1348 // assign it a rank from the field with the highest remaining
1349 // expectedMessagesPerRank. Repeat until everything is assigned.
1350 // (Yes, this could be a lot faster, but isn't worth the trouble.)
1351
1352 for (count = 0; count < numIORanks; ++count) {
1353
1354 // Make an initial choice of node
1355 for (i = 0; i < numIONodes; ++i) {
1356 if (ioNodeInfo[i].numCoresAssigned < numRanksPerNode) break;
1357 }
1358 j = i;
1359 ASSERT(j < numIONodes);
1360
1361 // Search for a better choice
1362 for (++i; i < numIONodes; ++i) {
1363 if (ioNodeInfo[i].numCoresAssigned >= numRanksPerNode) continue;
1364 if (ioNodeInfo[i].expectedNumMessagesThisNode <
1365 ioNodeInfo[j].expectedNumMessagesThisNode)
1366 {
1367 j = i;
1368 }
1369 }
1370
1371
1372 // Make an initial choice of field
1373 for (i = 0; i < numFields; ++i) {
1374 if (nRanks[i] > 0) break;
1375 }
1376 k = i;
1377 ASSERT(k < numFields);
1378
1379 // Search for a better choice
1380 for (++i; i < numFields; ++i) {
1381 if (nRanks[i] <= 0) continue;
1382 if (expectedMessagesPerRankThisField[i] >
1383 expectedMessagesPerRankThisField[k])
1384 {
1385 k = i;
1386 }
1387 }
1388
1389 // Put one rank from the chosen field onto the chosen node
1390 ioNodeInfo[j].expectedNumMessagesThisNode += expectedMessagesPerRankThisField[k];
1391 ioNodeInfo[j].assignedFieldThisCore[ioNodeInfo[j].numCoresAssigned] = k;
1392 ioNodeInfo[j].numCoresAssigned += 1;
1393 nRanks[k] -= 1;
1394 }
1395
1396 // Sanity check - all ranks were assigned to a core
1397 for (i = 1; i < numFields; ++i) {
1398 ASSERT(0 == nRanks[i]);
1399 }
1400 // Sanity check - all cores were assigned a rank
1401 for (i = 1; i < numIONodes; ++i) {
1402 ASSERT(numRanksPerNode == ioNodeInfo[i].numCoresAssigned);
1403 }
1404
1405
1406 //////////////////////////////////////////////////////////////////
1407 // Phase 3: Select which ranks will be assigned to write out the
1408 // fields. We attempt to balance the amount that each node is
1409 // assigned to write out. Since Phase 1 and Phase 2 have already
1410 // fixed a number of things, our options for balancing things
1411 // is somewhat restricted.
1412
1413
1414 // Count how many nodes each field is distributed across
1415 int numIONodesThisField[numFields];
1416 for (i = 0; i < numFields; ++i) {
1417 numIONodesThisField[i] = 0;
1418 for (j = 0; j < numIONodes; ++j) {
1419 for (k = 0; k < numRanksPerNode; ++k) {
1420 if (ioNodeInfo[j].assignedFieldThisCore[k] == i) {
1421 numIONodesThisField[i] += 1;
1422 break;
1423 }
1424 }
1425 }
1426 ASSERT (numIONodesThisField[i] > 0);
1427 ASSERT (numIONodesThisField[i] <= numIONodes);
1428 }
1429
1430
1431 // Init a per-node running count of z-levels-to-write
1432 memset (nToWrite, 0, numIONodes*sizeof(int));
1433
1434
1435 // Iterate through all the fields, although we will select
1436 // which field to operate on (i.e. curField) non-sequentially.
1437 for (iteration = 0; iteration < numFields; ++iteration) {
1438
1439 // Pick the field distributed across the fewest number of nodes, on
1440 // the theory that this field will have the least flexibility.
1441 int curField = 0;
1442 for (j = 1; j < numFields; ++j) {
1443 if (numIONodesThisField[j] < numIONodesThisField[curField]) {
1444 curField = j;
1445 }
1446 }
1447 ASSERT (numIONodesThisField[curField] <= numIONodes);
1448
1449 // Now that we have chosen a field, identify any i/o nodes
1450 // that have rank(s) assigned to that field.
1451 int nAssigned[numIONodes];
1452 for (i = 0; i < numIONodes; ++i) {
1453 nAssigned[i] = 0;
1454 for (j = 0; j < numRanksPerNode; ++j) {
1455 if (ioNodeInfo[i].assignedFieldThisCore[j] == curField) {
1456 nAssigned[i] += 1;
1457 }
1458 }
1459 }
1460
1461
1462 // We do the writes in units of entire z-levels. If a rank is a
1463 // writer, it is assigned to write a chunk of one or more z-levels.
1464 int chunkSize = divCeil (fields[curField].zDepth, numIORanksThisField[curField]);
1465 int nChunks = divCeil (fields[curField].zDepth, chunkSize);
1466 int curChunk;
1467
1468 // Designate the same number of ranks to be writers as there are chunks
1469 fields[curField].chunkWriters = malloc (nChunks*sizeof(int));
1470 ASSERT (fields[curField].chunkWriters != NULL);
1471 fields[curField].nChunks = nChunks;
1472 int nAvailable[numIONodes];
1473 memcpy (nAvailable, nAssigned, numIONodes*sizeof(int));
1474 for (curChunk = 0; curChunk < nChunks; ++curChunk) {
1475
1476 // Note that the last chunk might be an odd size (if chunkSize
1477 // does not evenly divide zDepth).
1478 if ((curChunk + 1) == nChunks) {
1479 chunkSize = fields[curField].zDepth - curChunk*chunkSize;
1480 }
1481
1482 // Pick a node that has at least one rank assigned to curField
1483 // that has not already been designated as a writer.
1484 // There must still be at least one, or we would have exited
1485 // the loop already.
1486 int curNode;
1487 for (curNode = 0; curNode < numIONodes; ++curNode) {
1488 if (nAvailable[curNode] > 0) break;
1489 }
1490 ASSERT (curNode < numIONodes);
1491
1492 // curNode is a candidate node; try to find an acceptable
1493 // candidate node with a smaller nToWrite
1494 for (i = curNode + 1; i < numIONodes; ++i) {
1495 if ((nAvailable[i] > 0) && (nToWrite[i] < nToWrite[curNode])) {
1496 curNode = i;
1497 }
1498 }
1499
1500 // Find an appropriate rank on the chosen node
1501 for (j = 0; j < numRanksPerNode; ++j) {
1502 if ( (ioNodeInfo[curNode].assignedFieldThisCore[j] == curField) &&
1503 (!isWriter[curNode*numRanksPerNode + j]) )
1504 {
1505 break;
1506 }
1507 }
1508 // Double-check that we found an appropriate rank.
1509 ASSERT (j < numRanksPerNode);
1510
1511
1512 // We've picked a rank to be the writer of the current chunk.
1513 // Now we need to figure out which rank this will wind up being
1514 // in the "ioRanksIntracomm" for this field (when that intracomm
1515 // eventually gets created), so we can set "chunkWriters".
1516 int eventualCommRank = 0;
1517 for (i = 0; i < curNode; ++i) eventualCommRank += nAssigned[i];
1518 for (i = 0; i < j; ++i) {
1519 if (ioNodeInfo[curNode].assignedFieldThisCore[i] == curField) {
1520 eventualCommRank += 1;
1521 }
1522 }
1523
1524
1525 // Update the info for this choice of node+rank
1526 fields[curField].chunkWriters[curChunk] = eventualCommRank;
1527 isWriter[curNode*numRanksPerNode + j] = 1;
1528 nAvailable[curNode] -= 1;
1529 nToWrite[curNode] += chunkSize;
1530
1531 }
1532
1533 // We've finished with this curField; mark it so that we don't
1534 // re-select this same curField the next time through the loop.
1535 numIONodesThisField[curField] = numIONodes + 1;
1536 }
1537
1538
1539 //////////////////////////////////////////////////////////////////
1540 // Return the computed assignments
1541 // N.B. We are also returning info in "fields[*].chunkWriters"
1542 for (i = 0; i < numIONodes; ++i) {
1543 for (j = 0; j < numRanksPerNode; ++j) {
1544 assignments[i*numRanksPerNode + j] =
1545 ioNodeInfo[i].assignedFieldThisCore[j];
1546 }
1547 }
1548
1549
1550 // Clean up
1551 for (i = 0; i < numIONodes; ++i) {
1552 free(ioNodeInfo[i].assignedFieldThisCore);
1553 }
1554
1555 }
1556
1557 //////////////////////////////////////////////////////////////////////////////////
1558
1559
1560 int
1561 isIORank(int commRank, int totalNumNodes, int numIONodes)
1562 {
1563 // Figure out if this rank is on a node that will do i/o.
1564 // Note that the i/o nodes are distributed throughout the
1565 // task, not clustered together.
1566 int thisRankNode = commRank / numRanksPerNode;
1567
1568 int ioNodeStride = totalNumNodes / numIONodes;
1569 int extra = totalNumNodes % numIONodes;
1570
1571 int inflectionPoint = (ioNodeStride+1)*extra;
1572 if (thisRankNode <= inflectionPoint) {
1573 return (thisRankNode % (ioNodeStride+1)) == 0;
1574 } else {
1575 return ((thisRankNode - inflectionPoint) % ioNodeStride) == 0;
1576 }
1577
1578 }
1579
1580
1581
1582 // Get the number of MPI ranks that are running on one node.
1583 // We assume that the MPI ranks are launched in sequence, filling one
1584 // node before going to the next, and that each node has the same
1585 // number of ranks (except possibly the last node, which is allowed
1586 // to be short).
1587 int
1588 getNumRanksPerNode (MPI_Comm comm)
1589 {
1590 char myHostname[HOST_NAME_MAX+1];
1591 int status, count;
1592 int commSize, commRank;
1593
1594 MPI_Comm_size (comm, &commSize);
1595 MPI_Comm_rank (comm, &commRank);
1596
1597 status = gethostname (myHostname, HOST_NAME_MAX);
1598 myHostname[HOST_NAME_MAX] = '\0';
1599 ASSERT (0 == status);
1600
1601 if (0 == commRank) {
1602 int nBlocks, ii, jj;
1603 assert (allHostnames != NULL);
1604
1605 // We assume these are already in-order and so don't
1606 // need to be sorted.
1607
1608 // Count how many ranks have the same hostname as rank 0
1609 for (count = 0;
1610 (count < commSize) &&
1611 (strcmp(&(allHostnames[count][0]), myHostname) == 0);
1612 ++count);
1613 ASSERT (count > 0);
1614
1615 // Check that the count is consistent for each block of hostnames
1616 // (except possibly the last block, which might not be full).
1617 nBlocks = commSize / count;
1618
1619 for (jj = 1; jj < nBlocks; ++jj) {
1620 char *p = &(allHostnames[jj*count][0]);
1621 for (ii = 0; ii < count; ++ii) {
1622 char *q = &(allHostnames[jj*count + ii][0]);
1623 ASSERT (strcmp (p, q) == 0);
1624 }
1625 }
1626 }
1627
1628 MPI_Bcast (&count, 1, MPI_INT, 0, comm);
1629 return count;
1630 }
1631
1632
1633 ////////////////////////////////////////////////////////////////////////
1634 ////////////////////////////////////////////////////////////////////////
1635 // Routines called by the mitGCM code
1636
1637
1638 ////////////////////////////////////////
1639 // Various "one time" initializations
1640 void
1641 f1(
1642 MPI_Comm parentComm,
1643 int numComputeRanks,
1644 int numTiles,
1645 MPI_Comm *rtnComputeComm)
1646 {
1647 // Note that we ignore the argument "numTiles". This value used to
1648 // be needed, but now we get the information directly from "SIZE.h"
1649
1650 MPI_Comm newIntracomm, newIntercomm, dupParentComm;
1651 int newIntracommRank, thisIsIORank;
1652 int parentSize, parentRank;
1653 int numIONodes, numIORanks;
1654 int mpiIsInitialized, *intPtr, tagUBexists;
1655 int i, j, nF, compRoot, fieldIndex, epochStyleIndex;
1656 int totalNumNodes, numComputeNodes, newIntracommSize;
1657
1658
1659 // Init globals
1660
1661 // Info about the parent communicator
1662 MPI_Initialized(&mpiIsInitialized);
1663 ASSERT(mpiIsInitialized);
1664 MPI_Comm_size(parentComm, &parentSize);
1665 MPI_Comm_rank(parentComm, &parentRank);
1666
1667 // Find the max MPI "tag" value
1668 MPI_Attr_get(MPI_COMM_WORLD, MPI_TAG_UB, &intPtr, &tagUBexists);
1669 ASSERT(tagUBexists);
1670 maxTagValue = *intPtr;
1671
1672 // Gather all the hostnames to rank zero
1673 char myHostname[HOST_NAME_MAX+1];
1674 int status;
1675 status = gethostname (myHostname, HOST_NAME_MAX);
1676 myHostname[HOST_NAME_MAX] = '\0';
1677 ASSERT (0 == status);
1678 if (parentRank != 0) {
1679 // send my hostname to rank 0
1680 MPI_Gather (myHostname, HOST_NAME_MAX+1, MPI_CHAR,
1681 NULL, 0, MPI_CHAR, 0, parentComm);
1682 }
1683 else {
1684 allHostnames = malloc (parentSize*(HOST_NAME_MAX+1));
1685 assert (allHostnames != NULL);
1686
1687 // Collect the hostnames from all the ranks
1688 MPI_Gather (myHostname, HOST_NAME_MAX+1, MPI_CHAR,
1689 allHostnames, HOST_NAME_MAX+1, MPI_CHAR,
1690 0, parentComm);
1691 }
1692
1693 if (numRanksPerNode <= 0) {
1694 // Might also want to check for an env var (or something)
1695 // N.B.: getNumRanksPerNode uses an MPI collective on the communicator
1696 // and needs the global "allHostnames" to already be set in rank 0.
1697 numRanksPerNode = getNumRanksPerNode(parentComm);
1698 }
1699
1700 // Fill in the zDepth field of the fieldInfoThisEpoch_t descriptors
1701 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1702 fieldInfoThisEpoch_t *f, *thisEpochStyle = epochStyles[epochStyleIndex];
1703
1704 int curFieldIndex = -1;
1705 while ((f = &(thisEpochStyle[++curFieldIndex]))->dataFieldID != '\0') {
1706 i = findField(f->dataFieldID);
1707 if (-1 != f->zDepth) {
1708 if (f->zDepth != fieldDepths[i].numZ) {
1709 fprintf(stderr, "Inconsistent z-depth for field '%c': "
1710 "fieldDepths[%d] and epoch style %d\n",
1711 f->dataFieldID, i, epochStyleIndex);
1712 }
1713 }
1714 f->zDepth = fieldDepths[i].numZ;
1715 }
1716 }
1717
1718
1719
1720 // Figure out how many nodes we can use for i/o.
1721 // To make things (e.g. memory calculations) simpler, we want
1722 // a node to have either all compute ranks, or all i/o ranks.
1723 // If numComputeRanks does not evenly divide numRanksPerNode, we have
1724 // to round up in favor of the compute side.
1725
1726 totalNumNodes = divCeil(parentSize, numRanksPerNode);
1727 numComputeNodes = divCeil(numComputeRanks, numRanksPerNode);
1728 numIONodes = (parentSize - numComputeRanks) / numRanksPerNode;
1729 ASSERT(numIONodes > 0);
1730 ASSERT(numIONodes <= (totalNumNodes - numComputeNodes));
1731 numIORanks = numIONodes * numRanksPerNode;
1732
1733
1734 // Print out the hostnames, identifying which ones are i/o nodes
1735
1736 if (0 == parentRank) {
1737 printf ("\n%d total nodes, %d i/o, %d compute\n",
1738 totalNumNodes, numIONodes, numComputeNodes);
1739 for (i = 0; i < parentSize; i += numRanksPerNode) {
1740 if (isIORank (i, totalNumNodes, numIONodes)) {
1741 printf ("\n(%s)", &(allHostnames[i][0]));
1742 } else {
1743 printf (" %s", &(allHostnames[i][0]));
1744 }
1745 }
1746 printf ("\n\n");
1747 }
1748 fflush (stdout);
1749
1750
1751
1752 // It is surprisingly easy to launch the job with the wrong number
1753 // of ranks, particularly if the number of compute ranks is not a
1754 // multiple of numRanksPerNode (the tendency is to just launch one
1755 // rank per core for all available cores). So we introduce a third
1756 // option for a rank: besides being an i/o rank or a compute rank,
1757 // it might instead be an "excess" rank, in which case it just
1758 // calls MPI_Finalize and exits.
1759
1760 typedef enum { isCompute, isIO, isExcess } rankType;
1761 rankType thisRanksType;
1762
1763 if (isIORank(parentRank, totalNumNodes, numIONodes)) {
1764 thisRanksType = isIO;
1765 } else if (parentRank >= numComputeRanks + numIORanks) {
1766 thisRanksType = isExcess;
1767 } else {
1768 thisRanksType = isCompute;
1769 }
1770
1771 // Split the parent communicator into parts
1772 MPI_Comm_split(parentComm, thisRanksType, parentRank, &newIntracomm);
1773 MPI_Comm_rank(newIntracomm, &newIntracommRank);
1774
1775 // Excess ranks disappear
1776 // N.B. "parentComm" (and parentSize) still includes these ranks, so
1777 // we cannot do MPI *collectives* using parentComm after this point,
1778 // although the communicator can still be used for other things.
1779 if (isExcess == thisRanksType) {
1780 MPI_Finalize();
1781 exit(0);
1782 }
1783
1784 // Sanity check
1785 MPI_Comm_size(newIntracomm, &newIntracommSize);
1786 if (isIO == thisRanksType) {
1787 ASSERT(newIntracommSize == numIORanks);
1788 } else {
1789 ASSERT(newIntracommSize == numComputeRanks);
1790 }
1791
1792
1793 // Create a special intercomm from the i/o and compute parts
1794 if (isIO == thisRanksType) {
1795 // Set globals
1796 ioIntracomm = newIntracomm;
1797 MPI_Comm_rank(ioIntracomm, &ioIntracommRank);
1798
1799 // Find the lowest computation rank
1800 for (i = 0; i < parentSize; ++i) {
1801 if (!isIORank(i, totalNumNodes, numIONodes)) break;
1802 }
1803 } else { // isCompute
1804 // Set globals
1805 computeIntracomm = newIntracomm;
1806 MPI_Comm_rank(computeIntracomm, &computeIntracommRank);
1807
1808 // Find the lowest IO rank
1809 for (i = 0; i < parentSize; ++i) {
1810 if (isIORank(i, totalNumNodes, numIONodes)) break;
1811 }
1812 }
1813 MPI_Intercomm_create(newIntracomm, 0, parentComm, i, 0, &globalIntercomm);
1814
1815
1816
1817 ///////////////////////////////////////////////////////////////////
1818 // For each different i/o epoch style, split the i/o ranks among
1819 // the fields, and create an inter-communicator for each split.
1820
1821 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
1822 if (0 == parentRank) {
1823 printf ("Set up epoch style %d\n", epochStyleIndex);
1824 }
1825
1826 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
1827 int fieldAssignments[numIORanks];
1828 int rankAssignments[numComputeRanks + numIORanks];
1829
1830 // Count the number of fields in this epoch style
1831 for (nF = 0; thisEpochStyle[nF].dataFieldID != '\0'; ++nF) ;
1832
1833 // Decide how to apportion the i/o ranks among the fields.
1834 // (Currently, this call also sets the "chunkWriters" array.)
1835 for (i=0; i < numIORanks; ++i) fieldAssignments[i] = -1;
1836 int nToWrite[numIORanks/numRanksPerNode];
1837 computeIORankAssigments(numComputeRanks, numIORanks, nF,
1838 thisEpochStyle, fieldAssignments, nToWrite);
1839 // Sanity check
1840 for (i=0; i < numIORanks; ++i) {
1841 ASSERT((fieldAssignments[i] >= 0) && (fieldAssignments[i] < nF));
1842 }
1843
1844 // Embed the i/o rank assignments into an array holding
1845 // the assignments for *all* the ranks (i/o and compute).
1846 // Rank assignment of "-1" means "compute".
1847 j = 0;
1848 for (i = 0; i < numComputeRanks + numIORanks; ++i) {
1849 rankAssignments[i] = isIORank(i, totalNumNodes, numIONodes) ?
1850 fieldAssignments[j++] : -1;
1851 }
1852 // Sanity check. Check the assignment for this rank.
1853 if (isIO == thisRanksType) {
1854 ASSERT(fieldAssignments[newIntracommRank] == rankAssignments[parentRank]);
1855 } else {
1856 ASSERT(-1 == rankAssignments[parentRank]);
1857 }
1858 ASSERT(j == numIORanks);
1859
1860
1861 if (0 == parentRank) {
1862 printf ("Create communicators for epoch style %d\n", epochStyleIndex);
1863 }
1864 fflush (stdout);
1865
1866
1867 // Find the lowest rank assigned to computation; use it as
1868 // the "root" for the upcoming intercomm creates.
1869 for (compRoot = 0; rankAssignments[compRoot] != -1; ++compRoot);
1870 // Sanity check
1871 if ((isCompute == thisRanksType) && (0 == newIntracommRank)) ASSERT(compRoot == parentRank);
1872
1873 // If this is an I/O rank, split the newIntracomm (which, for
1874 // the i/o ranks, is a communicator among all the i/o ranks)
1875 // into the pieces as assigned.
1876
1877 if (isIO == thisRanksType) {
1878 MPI_Comm fieldIntracomm;
1879 int myField = rankAssignments[parentRank];
1880 ASSERT(myField >= 0);
1881
1882 MPI_Comm_split(newIntracomm, myField, parentRank, &fieldIntracomm);
1883 thisEpochStyle[myField].ioRanksIntracomm = fieldIntracomm;
1884
1885 // Now create an inter-communicator between the computational
1886 // ranks, and each of the newly split off field communicators.
1887 for (i = 0; i < nF; ++i) {
1888
1889 // Do one field at a time to avoid clashes on parentComm
1890 MPI_Barrier(newIntracomm);
1891
1892 if (myField != i) continue;
1893
1894 // Create the intercomm for this field for this epoch style
1895 MPI_Intercomm_create(fieldIntracomm, 0, parentComm,
1896 compRoot, i, &(thisEpochStyle[myField].dataIntercomm));
1897
1898 // ... and dup a separate one for tile registrations
1899 MPI_Comm_dup(thisEpochStyle[myField].dataIntercomm,
1900 &(thisEpochStyle[myField].registrationIntercomm));
1901
1902
1903 // Sanity check: make sure the chunkWriters array looks ok.
1904 {
1905 int ii, jj, commSize, nChunks = thisEpochStyle[myField].nChunks;
1906 ASSERT (nChunks > 0);
1907 ASSERT (thisEpochStyle[myField].chunkWriters != NULL);
1908 MPI_Comm_size (thisEpochStyle[myField].dataIntercomm, &commSize);
1909 for (ii = 0; ii < nChunks; ++ii) {
1910 ASSERT (thisEpochStyle[myField].chunkWriters[ii] >= 0);
1911 ASSERT (thisEpochStyle[myField].chunkWriters[ii] < commSize);
1912 for (jj = ii+1; jj < nChunks; ++jj) {
1913 ASSERT (thisEpochStyle[myField].chunkWriters[ii] !=
1914 thisEpochStyle[myField].chunkWriters[jj]);
1915 }
1916 }
1917 }
1918 }
1919 }
1920 else {
1921 // This is a computational rank; create the intercommunicators
1922 // to the various split off separate field communicators.
1923
1924 for (fieldIndex = 0; fieldIndex < nF; ++fieldIndex) {
1925
1926 // Find the remote "root" process for this field
1927 int fieldRoot = -1;
1928 while (rankAssignments[++fieldRoot] != fieldIndex);
1929
1930 // Create the intercomm for this field for this epoch style
1931 MPI_Intercomm_create(newIntracomm, 0, parentComm, fieldRoot,
1932 fieldIndex, &(thisEpochStyle[fieldIndex].dataIntercomm));
1933
1934 // ... and dup a separate one for tile registrations
1935 MPI_Comm_dup(thisEpochStyle[fieldIndex].dataIntercomm,
1936 &(thisEpochStyle[fieldIndex].registrationIntercomm));
1937 }
1938 }
1939
1940
1941 // Print a "map" of the core assignments. Compute nodes are indicated
1942 // by a "#" for the whole node, i/o nodes have the individual cores
1943 // within the node broken out and their field assignment printed.
1944 // If the core is writing to the disk, the field assignment is printed
1945 // in parentheses.
1946 if (0 == parentRank) {
1947 int fieldIOCounts[nF];
1948 memset (fieldIOCounts, 0, nF*sizeof(int));
1949
1950 printf ("Writer assignments, epoch style %d\n", epochStyleIndex);
1951 for (i = 0; i < nF; ++i) {
1952 fieldInfoThisEpoch_t *p = &(thisEpochStyle[i]);
1953 int chunkSize = divCeil(p->zDepth,p->nChunks);
1954 printf ("\n");
1955 printf ( "field %2d ('%c'): %1d levels, %1d writers, %1d"
1956 " levels per writer (last writer = %d levels)\n",
1957 i, p->dataFieldID, p->zDepth, p->nChunks,
1958 chunkSize, p->zDepth - ((p->nChunks - 1)*chunkSize));
1959 for (j = 0; j < thisEpochStyle[i].nChunks; ++j) {
1960 printf (" %1d", thisEpochStyle[i].chunkWriters[j]);
1961 }
1962 printf ("\n");
1963 }
1964 printf ("\n");
1965
1966 printf("\ncpu assignments, epoch style %d\n", epochStyleIndex);
1967 int whichIONode = -1;
1968 for (i = 0; i < numComputeNodes + numIONodes ; ++i) {
1969
1970 if (rankAssignments[i*numRanksPerNode] >= 0) {
1971
1972 // i/o node
1973 ++whichIONode;
1974 printf("\n%s (%d Z, %ld bytes):",
1975 &(allHostnames[i*numRanksPerNode][0]),
1976 nToWrite[whichIONode],
1977 nToWrite[whichIONode]*twoDFieldSizeInBytes);
1978
1979 for (j = 0; j < numRanksPerNode; ++j) {
1980
1981 int assignedField = rankAssignments[i*numRanksPerNode + j];
1982 int k, commRank, nChunks, isWriter;
1983 nChunks = thisEpochStyle[assignedField].nChunks;
1984 commRank = fieldIOCounts[assignedField];
1985
1986 isWriter = 0;
1987 for (k = 0; k < nChunks; ++k) {
1988 if (thisEpochStyle[assignedField].chunkWriters[k] == commRank) {
1989 isWriter = 1;
1990 break;
1991 }
1992 }
1993 printf(isWriter ? " (%1d)" : " %1d ", assignedField);
1994
1995 fieldIOCounts[assignedField] += 1;
1996 }
1997 printf("\n");
1998
1999 } else {
2000
2001 // compute node
2002 for (j = 0; j < numRanksPerNode; ++j) {
2003 if ((i*numRanksPerNode + j) >= (numComputeRanks + numIORanks)) break;
2004 ASSERT(-1 == rankAssignments[i*numRanksPerNode + j]);
2005 }
2006 printf(" #");
2007 for (; j < numRanksPerNode; ++j) { // "excess" ranks (if any)
2008 printf("X");
2009 }
2010 }
2011 printf(" ");
2012 }
2013 printf("\n\n");
2014 }
2015
2016
2017
2018 } // epoch style loop
2019
2020 // Note: only non-null in rank 0, but it's ok to "free" NULL
2021 free(allHostnames);
2022
2023
2024 // I/O processes split off and start receiving data
2025 // NOTE: the I/O processes DO NOT RETURN from this call
2026 if (isIO == thisRanksType) ioRankMain();
2027
2028
2029 // The "compute" processes now return with their new intra-communicator.
2030 *rtnComputeComm = newIntracomm;
2031
2032 // but first, call mpiio initialization routine!
2033 initSizesAndTypes();
2034
2035 fflush (stdout);
2036 return;
2037 }
2038
2039
2040
2041
2042 // "Register" the tile-id(s) that this process will be sending
2043 void
2044 f2(int tileID)
2045 {
2046 int i, epochStyleIndex;
2047
2048 static int count = 0;
2049
2050 // This code assumes that the same tileID will apply to all the
2051 // fields. Note that we use the tileID as a tag, so we require it
2052 // be an int with a legal tag value, but we do *NOT* actually use
2053 // the tag *value* for anything (e.g. it is NOT used as an index
2054 // into an array). It is treated as an opaque handle that is
2055 // passed from the compute task(s) to the compositing routines.
2056 // Those two end-points likely assign meaning to the tileID (i.e.
2057 // use it to identify where the tile belongs within the domain),
2058 // but that is their business.
2059 //
2060 // A tileID of -1 signifies the end of registration.
2061
2062 ASSERT(computeIntracommRank >= 0);
2063
2064 if (tileID >= 0) {
2065 // In this version of the code, in addition to the tileID, we also
2066 // multiplex in the low order bit(s) of the epoch number as an
2067 // error check. So the tile value must be small enough to allow that.
2068 ASSERT(((tileID<<numCheckBits) + ((1<<numCheckBits)-1)) < maxTagValue);
2069
2070 // In this version of the code, we do not send the actual tileID's
2071 // to the i/o processes during the registration procedure. We only
2072 // send a count of the number of tiles that we will send.
2073 ++count;
2074 return;
2075 }
2076
2077
2078 // We get here when we are called with a negative tileID, signifying
2079 // the end of registration. We now need to figure out and inform
2080 // *each* i/o process in *each* field just how many tiles we will be
2081 // sending them in *each* epoch style.
2082
2083 for (epochStyleIndex = 0; epochStyleIndex < numEpochStyles; ++epochStyleIndex) {
2084 fieldInfoThisEpoch_t *thisEpochStyle = epochStyles[epochStyleIndex];
2085
2086 int curFieldIndex = -1;
2087 while ('\0' != thisEpochStyle[++curFieldIndex].dataFieldID) {
2088 fieldInfoThisEpoch_t *thisField = &(thisEpochStyle[curFieldIndex]);
2089 int numRemoteRanks, *tileCounts, remainder;
2090 MPI_Comm_remote_size(thisField->dataIntercomm, &numRemoteRanks);
2091
2092 tileCounts = alloca(numRemoteRanks * sizeof(int));
2093
2094 memset(tileCounts,0,numRemoteRanks * sizeof(int));
2095
2096 // Distribute the tiles among the i/o ranks.
2097 for (i = 0; i < numRemoteRanks; ++i) {
2098 tileCounts[i] = count / numRemoteRanks;
2099 }
2100 // Deal with any remainder
2101 remainder = count - ((count / numRemoteRanks) * numRemoteRanks);
2102 for (i = 0; i < remainder; ++i) {
2103 int target = (computeIntracommRank + i) % numRemoteRanks;
2104 tileCounts[target] += 1;
2105 }
2106
2107 // Communicate these counts to the i/o processes for this
2108 // field. Note that we send a message to each process,
2109 // even if the count is zero.
2110 for (i = 0; i < numRemoteRanks; ++i) {
2111 MPI_Send(NULL, 0, MPI_BYTE, i, tileCounts[i],
2112 thisField->registrationIntercomm);
2113 }
2114
2115 } // field loop
2116 } // epoch-style loop
2117
2118 }
2119
2120
2121
2122
2123 int currentEpochID = 0;
2124 int currentEpochStyle = 0;
2125
2126 void
2127 beginNewEpoch(int newEpochID, int gcmIter, int epochStyle)
2128 {
2129 fieldInfoThisEpoch_t *p;
2130
2131 if (newEpochID != (currentEpochID + 1)) {
2132 fprintf(stderr, "ERROR: Missing i/o epoch? was %d, is now %d ??\n",
2133 currentEpochID, newEpochID);
2134 sleep(1); // Give the message a chance to propagate
2135 abort();
2136 }
2137
2138
2139
2140 // not necessary for correctness, but for better timings
2141 MPI_Barrier(computeIntracomm);
2142 if (0 == computeIntracommRank)
2143 fprintf(stderr,"compute ready to emit %d %d %f\n",newEpochID,gcmIter,MPI_Wtime());
2144
2145
2146
2147 ////////////////////////////////////////////////////////////////////////
2148 // Need to be sure the i/o tasks are done processing the previous epoch
2149 // before any compute tasks start sending tiles from a new epoch.
2150
2151 if (0 == computeIntracommRank) {
2152 int cmd[4] = { cmd_newEpoch, newEpochID, epochStyle, gcmIter };
2153
2154 // Wait to get an ack that the i/o tasks are done with the prior epoch.
2155 MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
2156 globalIntercomm, MPI_STATUS_IGNORE);
2157
2158 // Tell the i/o ranks about the new epoch.
2159 // (Just tell rank 0; it will bcast to the others)
2160 MPI_Send(cmd, 4, MPI_INT, 0, 0, globalIntercomm);
2161 }
2162
2163 // Compute ranks wait here until rank 0 gets the ack from the i/o ranks
2164 MPI_Barrier(computeIntracomm);
2165
2166 if (0 == computeIntracommRank)
2167 fprintf(stderr,"compute handshake completed %d %d %f\n",newEpochID,gcmIter,MPI_Wtime());
2168
2169
2170 currentEpochID = newEpochID;
2171 currentEpochStyle = epochStyle;
2172
2173 // Reset the tileCount (used by f3)
2174 for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
2175 p->tileCount = 0;
2176 }
2177 }
2178
2179
2180 void
2181 f3(char dataFieldID, int tileID, int epochID, void *data)
2182 {
2183 int whichField, receiver, tag;
2184
2185 static char priorDataFieldID = '\0';
2186 static int priorEpoch = -1;
2187 static int remoteCommSize;
2188 static fieldInfoThisEpoch_t *p;
2189
2190 int flag=0;
2191
2192 // Check that this global has been set
2193 ASSERT(computeIntracommRank >= 0);
2194
2195
2196 // Figure out some info about this dataFieldID. It is
2197 // likely to be another tile from the same field as last time
2198 // we were called, in which case we can reuse the "static" values
2199 // retained from the prior call. If not, we need to look it up.
2200
2201 if ((dataFieldID != priorDataFieldID) || (epochID != priorEpoch)) {
2202
2203 // It's not the same; we need to look it up.
2204
2205 for (p = epochStyles[currentEpochStyle]; p->dataFieldID != '\0'; ++p) {
2206 if (p->dataFieldID == dataFieldID) break;
2207 }
2208 // Make sure we found a valid field
2209 ASSERT(p->dataIntercomm != MPI_COMM_NULL);
2210
2211 MPI_Comm_remote_size(p->dataIntercomm, &remoteCommSize);
2212
2213 flag=1;
2214
2215 }
2216
2217 ASSERT(p->dataFieldID == dataFieldID);
2218
2219 receiver = (computeIntracommRank + p->tileCount) % remoteCommSize;
2220 tag = (tileID << numCheckBits) | (epochID & bitMask);
2221
2222 FPRINTF(stderr,"Rank %d sends field '%c', tile %d, to i/o task %d with tag %d(%d)\n",
2223 computeIntracommRank, dataFieldID, tileID, receiver, tag, tag >> numCheckBits);
2224
2225 MPI_Send(data, tileOneZLevelSizeInBytes * p->zDepth,
2226 MPI_BYTE, receiver, tag, p->dataIntercomm);
2227
2228 p->tileCount += 1;
2229 priorDataFieldID = dataFieldID;
2230 priorEpoch = epochID;
2231 }
2232
2233
2234
2235 void
2236 f4(int epochID)
2237 {
2238 int i;
2239 ASSERT(computeIntracommRank >= 0);
2240
2241 if (0 == computeIntracommRank) {
2242 int cmd[3] = { cmd_exit, -1, -1 };
2243
2244 // Recv the ack that the prior i/o epoch is complete
2245 MPI_Recv(NULL, 0, MPI_BYTE, 0, cmd_epochComplete,
2246 globalIntercomm, MPI_STATUS_IGNORE);
2247
2248 // Tell the i/o ranks to exit
2249 // Just tell rank 0; it will bcast to the others
2250 MPI_Send(cmd, 3, MPI_INT, 0, 0, globalIntercomm);
2251 }
2252
2253 }
2254
2255
2256
2257 void asyncio_tile_arrays_(int *xoff, int *yoff, int *xskip)
2258 {
2259 int rank, ierr, rootProc;
2260
2261 MPI_Comm_rank(globalIntercomm,&rank);
2262 rootProc = (0 == rank) ? MPI_ROOT : MPI_PROC_NULL;
2263
2264 ierr = MPI_Bcast (xoff, TOTAL_NUM_TILES, MPI_INT, rootProc, globalIntercomm);
2265 ierr = MPI_Bcast (yoff, TOTAL_NUM_TILES, MPI_INT, rootProc, globalIntercomm);
2266 ierr = MPI_Bcast (xskip, TOTAL_NUM_TILES, MPI_INT, rootProc, globalIntercomm);
2267 }
2268
2269
2270
2271
2272
2273 //////////////////////////////////////////////////////////////////////
2274 // Fortran interface
2275
2276 void
2277 bron_f1(
2278 MPI_Fint *ptr_parentComm,
2279 int *ptr_numComputeRanks,
2280 int *ptr_numTiles,
2281 MPI_Fint *rtnComputeComm)
2282 {
2283 // Convert the Fortran handle into a C handle
2284 MPI_Comm newComm, parentComm = MPI_Comm_f2c(*ptr_parentComm);
2285
2286 f1(parentComm, *ptr_numComputeRanks, *ptr_numTiles, &newComm);
2287
2288 // Convert the C handle into a Fortran handle
2289 *rtnComputeComm = MPI_Comm_c2f(newComm);
2290 }
2291
2292
2293
2294 void
2295 bron_f2(int *ptr_tileID)
2296 {
2297 f2(*ptr_tileID);
2298 }
2299
2300
2301 void
2302 beginnewepoch_(int *newEpochID, int *gcmIter, int *epochStyle)
2303 {
2304 beginNewEpoch(*newEpochID, *gcmIter, *epochStyle);
2305 }
2306
2307
2308 void
2309 bron_f3(char *ptr_dataFieldID, int *ptr_tileID, int *ptr_epochID, void *data)
2310 {
2311 f3(*ptr_dataFieldID, *ptr_tileID, *ptr_epochID, data);
2312 }
2313
2314
2315
2316 void
2317 bron_f4(int *ptr_epochID)
2318 {
2319 f4(*ptr_epochID);
2320 }
2321
2322

  ViewVC Help
Powered by ViewVC 1.1.22