Annotation of sys/dev/raidframe/rf_engine.c, Revision 1.1.1.1
1.1 nbrk 1: /* $OpenBSD: rf_engine.c,v 1.15 2003/04/27 11:22:54 ho Exp $ */
2: /* $NetBSD: rf_engine.c,v 1.10 2000/08/20 16:51:03 thorpej Exp $ */
3:
4: /*
5: * Copyright (c) 1995 Carnegie-Mellon University.
6: * All rights reserved.
7: *
8: * Author: William V. Courtright II, Mark Holland, Rachad Youssef
9: *
10: * Permission to use, copy, modify and distribute this software and
11: * its documentation is hereby granted, provided that both the copyright
12: * notice and this permission notice appear in all copies of the
13: * software, derivative works or modified versions, and any portions
14: * thereof, and that both notices appear in supporting documentation.
15: *
16: * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
17: * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
18: * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
19: *
20: * Carnegie Mellon requests users of this software to return to
21: *
22: * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
23: * School of Computer Science
24: * Carnegie Mellon University
25: * Pittsburgh PA 15213-3890
26: *
27: * any improvements or extensions that they make and grant Carnegie the
28: * rights to redistribute these changes.
29: */
30:
31: /****************************************************************************
32: * *
33: * engine.c -- Code for DAG execution engine. *
34: * *
35: * Modified to work as follows (holland): *
36: * A user-thread calls into DispatchDAG, which fires off the nodes that *
37: * are direct successors to the header node. DispatchDAG then returns, *
38: * and the rest of the I/O continues asynchronously. As each node *
39: * completes, the node execution function calls FinishNode(). FinishNode *
40: * scans the list of successors to the node and increments the antecedent *
41: * counts. Each node that becomes enabled is placed on a central node *
42: * queue. A dedicated dag-execution thread grabs nodes off of this *
43: * queue and fires them. *
44: * *
45: * NULL nodes are never fired. *
46: * *
47: * Terminator nodes are never fired, but rather cause the callback *
48: * associated with the DAG to be invoked. *
49: * *
50: * If a node fails, the dag either rolls forward to the completion or *
51: * rolls back, undoing previously-completed nodes and fails atomically. *
52: * The direction of recovery is determined by the location of the failed *
53: * node in the graph. If the failure occurred before the commit node in *
54: * the graph, backward recovery is used. Otherwise, forward recovery is *
55: * used. *
56: * *
57: ****************************************************************************/
58:
59: #include "rf_threadstuff.h"
60:
61: #include <sys/errno.h>
62:
63: #include "rf_dag.h"
64: #include "rf_engine.h"
65: #include "rf_etimer.h"
66: #include "rf_general.h"
67: #include "rf_dagutils.h"
68: #include "rf_shutdown.h"
69: #include "rf_raid.h"
70:
71: int rf_BranchDone(RF_DagNode_t *);
72: int rf_NodeReady(RF_DagNode_t *);
73: void rf_FireNode(RF_DagNode_t *);
74: void rf_FireNodeArray(int, RF_DagNode_t **);
75: void rf_FireNodeList(RF_DagNode_t *);
76: void rf_PropagateResults(RF_DagNode_t *, int);
77: void rf_ProcessNode(RF_DagNode_t *, int);
78:
79: void rf_DAGExecutionThread(RF_ThreadArg_t);
80: #ifdef RAID_AUTOCONFIG
81: #define RF_ENGINE_PID 10
82: void rf_DAGExecutionThread_pre(RF_ThreadArg_t);
83: extern pid_t lastpid;
84: #endif /* RAID_AUTOCONFIG */
85: void **rf_hook_cookies;
86: extern int numraid;
87:
88: #define DO_INIT(_l_,_r_) \
89: do { \
90: int _rc; \
91: _rc = rf_create_managed_mutex(_l_, &(_r_)->node_queue_mutex); \
92: if (_rc) { \
93: return(_rc); \
94: } \
95: _rc = rf_create_managed_cond(_l_, &(_r_)->node_queue_cond); \
96: if (_rc) { \
97: return(_rc); \
98: } \
99: } while (0)
100:
101: /*
102: * Synchronization primitives for this file. DO_WAIT should be enclosed
103: * in a while loop.
104: */
105:
106: /*
107: * XXX Is this spl-ing really necessary ?
108: */
109: #define DO_LOCK(_r_) \
110: do { \
111: ks = splbio(); \
112: RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
113: } while (0)
114:
115: #define DO_UNLOCK(_r_) \
116: do { \
117: RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
118: splx(ks); \
119: } while (0)
120:
121: #define DO_WAIT(_r_) \
122: RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
123:
124: /* XXX RF_SIGNAL_COND? */
125: #define DO_SIGNAL(_r_) \
126: RF_BROADCAST_COND((_r_)->node_queue)
127:
128: void rf_ShutdownEngine(void *);
129:
130: void
131: rf_ShutdownEngine(void *arg)
132: {
133: RF_Raid_t *raidPtr;
134:
135: raidPtr = (RF_Raid_t *) arg;
136: raidPtr->shutdown_engine = 1;
137: DO_SIGNAL(raidPtr);
138: }
139:
140: int
141: rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
142: RF_Config_t *cfgPtr)
143: {
144: int rc;
145: char raidname[16];
146:
147: DO_INIT(listp, raidPtr);
148:
149: raidPtr->node_queue = NULL;
150: raidPtr->dags_in_flight = 0;
151:
152: rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg);
153: if (rc)
154: return (rc);
155:
156: /*
157: * We create the execution thread only once per system boot. No need
158: * to check return code b/c the kernel panics if it can't create the
159: * thread.
160: */
161: if (rf_engineDebug) {
162: printf("raid%d: %s engine thread\n", raidPtr->raidid,
163: (initproc)?"Starting":"Creating");
164: }
165: if (rf_hook_cookies == NULL) {
166: rf_hook_cookies =
167: malloc(numraid * sizeof(void *),
168: M_RAIDFRAME, M_NOWAIT);
169: if (rf_hook_cookies == NULL)
170: return (ENOMEM);
171: bzero(rf_hook_cookies, numraid * sizeof(void *));
172: }
173: #ifdef RAID_AUTOCONFIG
174: if (initproc == NULL) {
175: rf_hook_cookies[raidPtr->raidid] =
176: startuphook_establish(rf_DAGExecutionThread_pre,
177: raidPtr);
178: } else {
179: #endif /* RAID_AUTOCONFIG */
180: snprintf(&raidname[0], 16, "raid%d", raidPtr->raidid);
181: if (RF_CREATE_THREAD(raidPtr->engine_thread,
182: rf_DAGExecutionThread, raidPtr, &raidname[0])) {
183: RF_ERRORMSG("RAIDFRAME: Unable to start engine"
184: " thread\n");
185: return (ENOMEM);
186: }
187: if (rf_engineDebug) {
188: printf("raid%d: Engine thread started\n",
189: raidPtr->raidid);
190: }
191: RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
192: #ifdef RAID_AUTOCONFIG
193: }
194: #endif
195: /* XXX Something is missing here... */
196: #ifdef debug
197: printf("Skipping the WAIT_START !!!\n");
198: #endif
199: /* Engine thread is now running and waiting for work. */
200: if (rf_engineDebug) {
201: printf("raid%d: Engine thread running and waiting for events\n",
202: raidPtr->raidid);
203: }
204: rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
205: if (rc) {
206: RF_ERRORMSG3("Unable to add to shutdown list file %s line %d"
207: " rc=%d\n", __FILE__, __LINE__, rc);
208: rf_ShutdownEngine(NULL);
209: }
210: return (rc);
211: }
212:
213: int
214: rf_BranchDone(RF_DagNode_t *node)
215: {
216: int i;
217:
218: /*
219: * Return true if forward execution is completed for a node and it's
220: * succedents.
221: */
222: switch (node->status) {
223: case rf_wait:
224: /* Should never be called in this state. */
225: RF_PANIC();
226: break;
227: case rf_fired:
228: /* Node is currently executing, so we're not done. */
229: return (RF_FALSE);
230: case rf_good:
231: /* For each succedent. */
232: for (i = 0; i < node->numSuccedents; i++)
233: /* Recursively check branch. */
234: if (!rf_BranchDone(node->succedents[i]))
235: return RF_FALSE;
236:
237: return RF_TRUE; /*
238: * Node and all succedent branches aren't in
239: * fired state.
240: */
241: break;
242: case rf_bad:
243: /* Succedents can't fire. */
244: return (RF_TRUE);
245: case rf_recover:
246: /* Should never be called in this state. */
247: RF_PANIC();
248: break;
249: case rf_undone:
250: case rf_panic:
251: /* XXX Need to fix this case. */
252: /* For now, assume that we're done. */
253: return (RF_TRUE);
254: break;
255: default:
256: /* Illegal node status. */
257: RF_PANIC();
258: break;
259: }
260: }
261:
262: int
263: rf_NodeReady(RF_DagNode_t *node)
264: {
265: int ready;
266:
267: switch (node->dagHdr->status) {
268: case rf_enable:
269: case rf_rollForward:
270: if ((node->status == rf_wait) &&
271: (node->numAntecedents == node->numAntDone))
272: ready = RF_TRUE;
273: else
274: ready = RF_FALSE;
275: break;
276: case rf_rollBackward:
277: RF_ASSERT(node->numSuccDone <= node->numSuccedents);
278: RF_ASSERT(node->numSuccFired <= node->numSuccedents);
279: RF_ASSERT(node->numSuccFired <= node->numSuccDone);
280: if ((node->status == rf_good) &&
281: (node->numSuccDone == node->numSuccedents))
282: ready = RF_TRUE;
283: else
284: ready = RF_FALSE;
285: break;
286: default:
287: printf("Execution engine found illegal DAG status"
288: " in rf_NodeReady\n");
289: RF_PANIC();
290: break;
291: }
292:
293: return (ready);
294: }
295:
296:
297: /*
298: * User context and dag-exec-thread context:
299: * Fire a node. The node's status field determines which function, do or undo,
300: * to be fired.
301: * This routine assumes that the node's status field has alread been set to
302: * "fired" or "recover" to indicate the direction of execution.
303: */
304: void
305: rf_FireNode(RF_DagNode_t *node)
306: {
307: switch (node->status) {
308: case rf_fired:
309: /* Fire the do function of a node. */
310: if (rf_engineDebug>1) {
311: printf("raid%d: Firing node 0x%lx (%s)\n",
312: node->dagHdr->raidPtr->raidid,
313: (unsigned long) node, node->name);
314: }
315: if (node->flags & RF_DAGNODE_FLAG_YIELD) {
316: #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
317: /* thread_block(); */
318: /* printf("Need to block the thread here...\n"); */
319: /*
320: * XXX thread_block is actually mentioned in
321: * /usr/include/vm/vm_extern.h
322: */
323: #else
324: thread_block();
325: #endif
326: }
327: (*(node->doFunc)) (node);
328: break;
329: case rf_recover:
330: /* Fire the undo function of a node. */
331: if (rf_engineDebug>1) {
332: printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
333: node->dagHdr->raidPtr->raidid,
334: (unsigned long) node, node->name);
335: }
336: if (node->flags & RF_DAGNODE_FLAG_YIELD) {
337: #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
338: /* thread_block(); */
339: /* printf("Need to block the thread here...\n"); */
340: /*
341: * XXX thread_block is actually mentioned in
342: * /usr/include/vm/vm_extern.h
343: */
344: #else
345: thread_block();
346: #endif
347: }
348: (*(node->undoFunc)) (node);
349: break;
350: default:
351: RF_PANIC();
352: break;
353: }
354: }
355:
356:
357: /*
358: * User context:
359: * Attempt to fire each node in a linear array.
360: * The entire list is fired atomically.
361: */
362: void
363: rf_FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
364: {
365: RF_DagStatus_t dstat;
366: RF_DagNode_t *node;
367: int i, j;
368:
369: /* First, mark all nodes which are ready to be fired. */
370: for (i = 0; i < numNodes; i++) {
371: node = nodeList[i];
372: dstat = node->dagHdr->status;
373: RF_ASSERT((node->status == rf_wait) ||
374: (node->status == rf_good));
375: if (rf_NodeReady(node)) {
376: if ((dstat == rf_enable) || (dstat == rf_rollForward)) {
377: RF_ASSERT(node->status == rf_wait);
378: if (node->commitNode)
379: node->dagHdr->numCommits++;
380: node->status = rf_fired;
381: for (j = 0; j < node->numAntecedents; j++)
382: node->antecedents[j]->numSuccFired++;
383: } else {
384: RF_ASSERT(dstat == rf_rollBackward);
385: RF_ASSERT(node->status == rf_good);
386: /* Only one commit node per graph. */
387: RF_ASSERT(node->commitNode == RF_FALSE);
388: node->status = rf_recover;
389: }
390: }
391: }
392: /* Now, fire the nodes. */
393: for (i = 0; i < numNodes; i++) {
394: if ((nodeList[i]->status == rf_fired) ||
395: (nodeList[i]->status == rf_recover))
396: rf_FireNode(nodeList[i]);
397: }
398: }
399:
400:
401: /*
402: * User context:
403: * Attempt to fire each node in a linked list.
404: * The entire list is fired atomically.
405: */
406: void
407: rf_FireNodeList(RF_DagNode_t *nodeList)
408: {
409: RF_DagNode_t *node, *next;
410: RF_DagStatus_t dstat;
411: int j;
412:
413: if (nodeList) {
414: /* First, mark all nodes which are ready to be fired. */
415: for (node = nodeList; node; node = next) {
416: next = node->next;
417: dstat = node->dagHdr->status;
418: RF_ASSERT((node->status == rf_wait) ||
419: (node->status == rf_good));
420: if (rf_NodeReady(node)) {
421: if ((dstat == rf_enable) ||
422: (dstat == rf_rollForward)) {
423: RF_ASSERT(node->status == rf_wait);
424: if (node->commitNode)
425: node->dagHdr->numCommits++;
426: node->status = rf_fired;
427: for (j = 0; j < node->numAntecedents;
428: j++)
429: node->antecedents[j]
430: ->numSuccFired++;
431: } else {
432: RF_ASSERT(dstat == rf_rollBackward);
433: RF_ASSERT(node->status == rf_good);
434: /* Only one commit node per graph. */
435: RF_ASSERT(node->commitNode == RF_FALSE);
436: node->status = rf_recover;
437: }
438: }
439: }
440: /* Now, fire the nodes. */
441: for (node = nodeList; node; node = next) {
442: next = node->next;
443: if ((node->status == rf_fired) ||
444: (node->status == rf_recover))
445: rf_FireNode(node);
446: }
447: }
448: }
449:
450:
451: /*
452: * Interrupt context:
453: * For each succedent,
454: * propagate required results from node to succedent.
455: * increment succedent's numAntDone.
456: * place newly-enable nodes on node queue for firing.
457: *
458: * To save context switches, we don't place NIL nodes on the node queue,
459: * but rather just process them as if they had fired. Note that NIL nodes
460: * that are the direct successors of the header will actually get fired by
461: * DispatchDAG, which is fine because no context switches are involved.
462: *
463: * Important: when running at user level, this can be called by any
464: * disk thread, and so the increment and check of the antecedent count
465: * must be locked. I used the node queue mutex and locked down the
466: * entire function, but this is certainly overkill.
467: */
468: void
469: rf_PropagateResults(RF_DagNode_t *node, int context)
470: {
471: RF_DagNode_t *s, *a;
472: RF_Raid_t *raidPtr;
473: int i, ks;
474: /* A list of NIL nodes to be finished. */
475: RF_DagNode_t *finishlist = NULL;
476: /* List of nodes with failed truedata antecedents. */
477: RF_DagNode_t *skiplist = NULL;
478: RF_DagNode_t *firelist = NULL; /* A list of nodes to be fired. */
479: RF_DagNode_t *q = NULL, *qh = NULL, *next;
480: int j, skipNode;
481:
482: raidPtr = node->dagHdr->raidPtr;
483:
484: DO_LOCK(raidPtr);
485:
486: /* Debug - validate fire counts. */
487: for (i = 0; i < node->numAntecedents; i++) {
488: a = *(node->antecedents + i);
489: RF_ASSERT(a->numSuccFired >= a->numSuccDone);
490: RF_ASSERT(a->numSuccFired <= a->numSuccedents);
491: a->numSuccDone++;
492: }
493:
494: switch (node->dagHdr->status) {
495: case rf_enable:
496: case rf_rollForward:
497: for (i = 0; i < node->numSuccedents; i++) {
498: s = *(node->succedents + i);
499: RF_ASSERT(s->status == rf_wait);
500: (s->numAntDone)++;
501: if (s->numAntDone == s->numAntecedents) {
502: /* Look for NIL nodes. */
503: if (s->doFunc == rf_NullNodeFunc) {
504: /*
505: * Don't fire NIL nodes, just process
506: * them.
507: */
508: s->next = finishlist;
509: finishlist = s;
510: } else {
511: /*
512: * Look to see if the node is to be
513: * skipped.
514: */
515: skipNode = RF_FALSE;
516: for (j = 0; j < s->numAntecedents; j++)
517: if ((s->antType[j] ==
518: rf_trueData) &&
519: (s->antecedents[j]->status
520: == rf_bad))
521: skipNode = RF_TRUE;
522: if (skipNode) {
523: /*
524: * This node has one or more
525: * failed true data
526: * dependencies, so skip it.
527: */
528: s->next = skiplist;
529: skiplist = s;
530: } else {
531: /*
532: * Add s to list of nodes (q)
533: * to execute.
534: */
535: if (context != RF_INTR_CONTEXT)
536: {
537: /*
538: * We only have to
539: * enqueue if we're at
540: * intr context.
541: */
542: /*
543: * Put node on a list to
544: * be fired after we
545: * unlock.
546: */
547: s->next = firelist;
548: firelist = s;
549: } else {
550: /*
551: * Enqueue the node for
552: * the dag exec thread
553: * to fire.
554: */
555: RF_ASSERT(rf_NodeReady(s));
556: if (q) {
557: q->next = s;
558: q = s;
559: } else {
560: qh = q = s;
561: qh->next = NULL;
562: }
563: }
564: }
565: }
566: }
567: }
568:
569: if (q) {
570: /*
571: * Transfer our local list of nodes to the node
572: * queue.
573: */
574: q->next = raidPtr->node_queue;
575: raidPtr->node_queue = qh;
576: DO_SIGNAL(raidPtr);
577: }
578: DO_UNLOCK(raidPtr);
579:
580: for (; skiplist; skiplist = next) {
581: next = skiplist->next;
582: skiplist->status = rf_skipped;
583: for (i = 0; i < skiplist->numAntecedents; i++) {
584: skiplist->antecedents[i]->numSuccFired++;
585: }
586: if (skiplist->commitNode) {
587: skiplist->dagHdr->numCommits++;
588: }
589: rf_FinishNode(skiplist, context);
590: }
591: for (; finishlist; finishlist = next) {
592: /* NIL nodes: no need to fire them. */
593: next = finishlist->next;
594: finishlist->status = rf_good;
595: for (i = 0; i < finishlist->numAntecedents; i++) {
596: finishlist->antecedents[i]->numSuccFired++;
597: }
598: if (finishlist->commitNode)
599: finishlist->dagHdr->numCommits++;
600: /*
601: * Okay, here we're calling rf_FinishNode() on nodes
602: * that have the null function as their work proc.
603: * Such a node could be the terminal node in a DAG.
604: * If so, it will cause the DAG to complete, which will
605: * in turn free memory used by the DAG, which includes
606: * the node in question.
607: * Thus, we must avoid referencing the node at all
608: * after calling rf_FinishNode() on it.
609: */
610: /* Recursive call. */
611: rf_FinishNode(finishlist, context);
612: }
613: /* Fire all nodes in firelist. */
614: rf_FireNodeList(firelist);
615: break;
616:
617: case rf_rollBackward:
618: for (i = 0; i < node->numAntecedents; i++) {
619: a = *(node->antecedents + i);
620: RF_ASSERT(a->status == rf_good);
621: RF_ASSERT(a->numSuccDone <= a->numSuccedents);
622: RF_ASSERT(a->numSuccDone <= a->numSuccFired);
623:
624: if (a->numSuccDone == a->numSuccFired) {
625: if (a->undoFunc == rf_NullNodeFunc) {
626: /*
627: * Don't fire NIL nodes, just process
628: * them.
629: */
630: a->next = finishlist;
631: finishlist = a;
632: } else {
633: if (context != RF_INTR_CONTEXT) {
634: /*
635: * We only have to enqueue if
636: * we're at intr context.
637: */
638: /*
639: * Put node on a list to
640: * be fired after we
641: * unlock.
642: */
643: a->next = firelist;
644: firelist = a;
645: } else {
646: /*
647: * Enqueue the node for
648: * the dag exec thread
649: * to fire.
650: */
651: RF_ASSERT(rf_NodeReady(a));
652: if (q) {
653: q->next = a;
654: q = a;
655: } else {
656: qh = q = a;
657: qh->next = NULL;
658: }
659: }
660: }
661: }
662: }
663: if (q) {
664: /*
665: * Transfer our local list of nodes to the node
666: * queue.
667: */
668: q->next = raidPtr->node_queue;
669: raidPtr->node_queue = qh;
670: DO_SIGNAL(raidPtr);
671: }
672: DO_UNLOCK(raidPtr);
673: for (; finishlist; finishlist = next) {
674: /* NIL nodes: no need to fire them. */
675: next = finishlist->next;
676: finishlist->status = rf_good;
677: /*
678: * Okay, here we're calling rf_FinishNode() on nodes
679: * that have the null function as their work proc.
680: * Such a node could be the first node in a DAG.
681: * If so, it will cause the DAG to complete, which will
682: * in turn free memory used by the DAG, which includes
683: * the node in question.
684: * Thus, we must avoid referencing the node at all
685: * after calling rf_FinishNode() on it.
686: */
687: rf_FinishNode(finishlist, context);
688: /* Recursive call. */
689: }
690: /* Fire all nodes in firelist. */
691: rf_FireNodeList(firelist);
692:
693: break;
694: default:
695: printf("Engine found illegal DAG status in"
696: " rf_PropagateResults()\n");
697: RF_PANIC();
698: break;
699: }
700: }
701:
702:
703: /*
704: * Process a fired node which has completed.
705: */
706: void
707: rf_ProcessNode(RF_DagNode_t *node, int context)
708: {
709: RF_Raid_t *raidPtr;
710:
711: raidPtr = node->dagHdr->raidPtr;
712:
713: switch (node->status) {
714: case rf_good:
715: /* Normal case, don't need to do anything. */
716: break;
717: case rf_bad:
718: if ((node->dagHdr->numCommits > 0) ||
719: (node->dagHdr->numCommitNodes == 0)) {
720: /* Crossed commit barrier. */
721: node->dagHdr->status = rf_rollForward;
722: if (rf_engineDebug || 1) {
723: printf("raid%d: node (%s) returned fail,"
724: " rolling forward\n", raidPtr->raidid,
725: node->name);
726: }
727: } else {
728: /* Never reached commit barrier. */
729: node->dagHdr->status = rf_rollBackward;
730: if (rf_engineDebug || 1) {
731: printf("raid%d: node (%s) returned fail,"
732: " rolling backward\n", raidPtr->raidid,
733: node->name);
734: }
735: }
736: break;
737: case rf_undone:
738: /* Normal rollBackward case, don't need to do anything. */
739: break;
740: case rf_panic:
741: /* An undo node failed !!! */
742: printf("UNDO of a node failed !!!/n");
743: break;
744: default:
745: printf("node finished execution with an illegal status !!!\n");
746: RF_PANIC();
747: break;
748: }
749:
750: /*
751: * Enqueue node's succedents (antecedents if rollBackward) for
752: * execution.
753: */
754: rf_PropagateResults(node, context);
755: }
756:
757:
758: /*
759: * User context or dag-exec-thread context:
760: * This is the first step in post-processing a newly-completed node.
761: * This routine is called by each node execution function to mark the node
762: * as complete and fire off any successors that have been enabled.
763: */
764: int
765: rf_FinishNode(RF_DagNode_t *node, int context)
766: {
767: /* As far as I can tell, retcode is not used -wvcii. */
768: int retcode = RF_FALSE;
769: node->dagHdr->numNodesCompleted++;
770: rf_ProcessNode(node, context);
771:
772: return (retcode);
773: }
774:
775:
776: /*
777: * User context:
778: * Submit dag for execution, return non-zero if we have to wait for completion.
779: * If and only if we return non-zero, we'll cause cbFunc to get invoked with
780: * cbArg when the DAG has completed.
781: *
782: * For now we always return 1. If the DAG does not cause any I/O, then the
783: * callback may get invoked before DispatchDAG returns. There's code in state
784: * 5 of ContinueRaidAccess to handle this.
785: *
786: * All we do here is fire the direct successors of the header node. The DAG
787: * execution thread does the rest of the dag processing.
788: */
789: int
790: rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), void *cbArg)
791: {
792: RF_Raid_t *raidPtr;
793:
794: raidPtr = dag->raidPtr;
795: if (dag->tracerec) {
796: RF_ETIMER_START(dag->tracerec->timer);
797: }
798: if (rf_engineDebug || rf_validateDAGDebug) {
799: if (rf_ValidateDAG(dag))
800: RF_PANIC();
801: }
802: if (rf_engineDebug>1) {
803: printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
804: }
805: raidPtr->dags_in_flight++; /*
806: * Debug only: blow off proper
807: * locking.
808: */
809: dag->cbFunc = cbFunc;
810: dag->cbArg = cbArg;
811: dag->numNodesCompleted = 0;
812: dag->status = rf_enable;
813: rf_FireNodeArray(dag->numSuccedents, dag->succedents);
814: return (1);
815: }
816:
817:
818: /*
819: * Dedicated kernel thread:
820: * The thread that handles all DAG node firing.
821: * To minimize locking and unlocking, we grab a copy of the entire node queue
822: * and then set the node queue to NULL before doing any firing of nodes.
823: * This way we only have to release the lock once. Of course, it's probably
824: * rare that there's more than one node in the queue at any one time, but it
825: * sometimes happens.
826: *
827: * In the kernel, this thread runs at spl0 and is not swappable. I copied these
828: * characteristics from the aio_completion_thread.
829: */
830:
831: #ifdef RAID_AUTOCONFIG
832: void
833: rf_DAGExecutionThread_pre(RF_ThreadArg_t arg)
834: {
835: RF_Raid_t *raidPtr;
836: char raidname[16];
837: pid_t oldpid = lastpid;
838:
839: raidPtr = (RF_Raid_t *) arg;
840:
841: if (rf_engineDebug) {
842: printf("raid%d: Starting engine thread\n", raidPtr->raidid);
843: }
844:
845: lastpid = RF_ENGINE_PID + raidPtr->raidid - 1;
846: snprintf(raidname, sizeof raidname, "raid%d", raidPtr->raidid);
847:
848: if (RF_CREATE_THREAD(raidPtr->engine_thread, rf_DAGExecutionThread,
849: raidPtr, &raidname[0])) {
850: RF_ERRORMSG("RAIDFRAME: Unable to start engine thread\n");
851: return;
852: }
853:
854: lastpid = oldpid;
855: if (rf_engineDebug) {
856: printf("raid%d: Engine thread started\n", raidPtr->raidid);
857: }
858: RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
859: }
860: #endif /* RAID_AUTOCONFIG */
861:
862: void
863: rf_DAGExecutionThread(RF_ThreadArg_t arg)
864: {
865: RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
866: RF_Raid_t *raidPtr;
867: int ks;
868: int s;
869:
870: raidPtr = (RF_Raid_t *) arg;
871:
872: while (!(&raidPtr->engine_tg)->created)
873: (void) tsleep((void *)&(&raidPtr->engine_tg)->created, PWAIT,
874: "raidinit", 0);
875:
876: if (rf_engineDebug) {
877: printf("raid%d: Engine thread is running\n", raidPtr->raidid);
878: }
879: /* XXX What to put here ? XXX */
880:
881: s = splbio();
882:
883: RF_THREADGROUP_RUNNING(&raidPtr->engine_tg);
884:
885: rf_hook_cookies[raidPtr->raidid] =
886: shutdownhook_establish(rf_shutdown_hook, (void *)raidPtr);
887:
888: DO_LOCK(raidPtr);
889: while (!raidPtr->shutdown_engine) {
890:
891: while (raidPtr->node_queue != NULL) {
892: local_nq = raidPtr->node_queue;
893: fire_nq = NULL;
894: term_nq = NULL;
895: raidPtr->node_queue = NULL;
896: DO_UNLOCK(raidPtr);
897:
898: /* First, strip out the terminal nodes. */
899: while (local_nq) {
900: nd = local_nq;
901: local_nq = local_nq->next;
902: switch (nd->dagHdr->status) {
903: case rf_enable:
904: case rf_rollForward:
905: if (nd->numSuccedents == 0) {
906: /*
907: * End of the dag, add to
908: * callback list.
909: */
910: nd->next = term_nq;
911: term_nq = nd;
912: } else {
913: /*
914: * Not the end, add to the
915: * fire queue.
916: */
917: nd->next = fire_nq;
918: fire_nq = nd;
919: }
920: break;
921: case rf_rollBackward:
922: if (nd->numAntecedents == 0) {
923: /*
924: * End of the dag, add to the
925: * callback list.
926: */
927: nd->next = term_nq;
928: term_nq = nd;
929: } else {
930: /*
931: * Not the end, add to the
932: * fire queue.
933: */
934: nd->next = fire_nq;
935: fire_nq = nd;
936: }
937: break;
938: default:
939: RF_PANIC();
940: break;
941: }
942: }
943:
944: /*
945: * Execute callback of dags which have reached the
946: * terminal node.
947: */
948: while (term_nq) {
949: nd = term_nq;
950: term_nq = term_nq->next;
951: nd->next = NULL;
952: (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
953: raidPtr->dags_in_flight--; /* Debug only. */
954: }
955:
956: /* Fire remaining nodes. */
957: rf_FireNodeList(fire_nq);
958:
959: DO_LOCK(raidPtr);
960: }
961: while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL)
962: DO_WAIT(raidPtr);
963: }
964: DO_UNLOCK(raidPtr);
965:
966: if (rf_hook_cookies && rf_hook_cookies[raidPtr->raidid] != NULL) {
967: shutdownhook_disestablish(rf_hook_cookies[raidPtr->raidid]);
968: rf_hook_cookies[raidPtr->raidid] = NULL;
969: }
970:
971: RF_THREADGROUP_DONE(&raidPtr->engine_tg);
972:
973: splx(s);
974: kthread_exit(0);
975: }
CVSweb