My Project
Loading...
Searching...
No Matches
shared.cc
Go to the documentation of this file.
1#include "threadconf.h"
2#include <iostream>
3#include "kernel/mod2.h"
4#include "Singular/ipid.h"
5#include "Singular/ipshell.h"
7#include "Singular/lists.h"
8#include "Singular/blackbox.h"
9#include "Singular/feOpt.h"
11#include <cstring>
12#include <string>
13#include <errno.h>
14#include <stdio.h>
15#include <vector>
16#include <map>
17#include <iterator>
18#include <queue>
19#include <assert.h>
20#include "thread.h"
21#include "lintree.h"
22
23#include "singthreads.h"
24
25using namespace std;
26
27#ifdef ENABLE_THREADS
28extern char *global_argv0;
29#endif
30
32
33namespace LibThread {
34
35#ifdef ENABLE_THREADS
36const int have_threads = 1;
37#else
38const int have_threads = 0;
39#endif
40
41class Command {
42private:
43 const char *name;
44 const char *error;
47 int argc;
48public:
49 Command(const char *n, leftv r, leftv a)
50 {
51 name = n;
52 result = r;
53 error = NULL;
54 argc = 0;
55 for (leftv t = a; t != NULL; t = t->next) {
56 argc++;
57 }
58 args = (leftv *) omAlloc0(sizeof(leftv) * argc);
59 int i = 0;
60 for (leftv t = a; t != NULL; t = t->next) {
61 args[i++] = t;
62 }
63 result->rtyp = NONE;
64 result->data = NULL;
65 }
67 omFree(args);
68 }
69 void check_argc(int n) {
70 if (error) return;
71 if (argc != n) error = "wrong number of arguments";
72 }
73 void check_argc(int lo, int hi) {
74 if (error) return;
75 if (argc < lo || argc > hi) error = "wrong number of arguments";
76 }
77 void check_argc_min(int n) {
78 if (error) return;
79 if (argc < n) error = "wrong number of arguments";
80 }
81 void check_arg(int i, int type, const char *err) {
82 if (error) return;
83 if (args[i]->Typ() != type) error = err;
84 }
85 void check_init(int i, const char *err) {
86 if (error) return;
87 leftv arg = args[i];
88 if (arg->Data() == NULL || *(void **)(arg->Data()) == NULL)
89 error = err;
90 }
91 void check_arg(int i, int type, int type2, const char *err) {
92 if (error) return;
93 if (args[i]->Typ() != type && args[i]->Typ() != type2) error = err;
94 }
95 int argtype(int i) {
96 return args[i]->Typ();
97 }
98 int nargs() {
99 return argc;
100 }
101 void *arg(int i) {
102 return args[i]->Data();
103 }
104 template <typename T>
105 T *shared_arg(int i) {
106 return *(T **)(arg(i));
107 }
108 long int_arg(int i) {
109 return (long)(args[i]->Data());
110 }
111 void report(const char *err) {
112 error = err;
113 }
114 // intentionally not bool, so we can also do
115 // q = p + test_arg(p, type);
116 int test_arg(int i, int type) {
117 if (i >= argc) return 0;
118 return args[i]->Typ() == type;
119 }
120 void set_result(long n) {
122 result->data = (char *)n;
123 }
124 void set_result(const char *s) {
126 result->data = omStrDup(s);
127 }
128 void set_result(int type, void *p) {
129 result->rtyp = type;
130 result->data = (char *) p;
131 }
132 void set_result(int type, long n) {
133 result->rtyp = type;
134 result->data = (char *) n;
135 }
136 void no_result() {
137 result->rtyp = NONE;
138 }
139 bool ok() {
140 return error == NULL;
141 }
143 if (error) {
144 Werror("%s: %s", name, error);
145 }
146 return error != NULL;
147 }
148 BOOLEAN abort(const char *err) {
149 report(err);
150 return status();
151 }
152};
153
155private:
158 int type;
159 std::string name;
160public:
162 virtual ~SharedObject() { }
163 void set_type(int type_init) { type = type_init; }
164 int get_type() { return type; }
165 void set_name(std::string &name_init) { name = name_init; }
166 void set_name(const char *s) {
167 name = std::string(s);
168 }
169 std::string &get_name() { return name; }
170 void incref(int by = 1) {
171 lock.lock();
172 refcount += 1;
173 lock.unlock();
174 }
175 long decref() {
176 int result;
177 lock.lock();
178 result = --refcount;
179 lock.unlock();
180 return result;
181 }
182 long getref() {
183 return refcount;
184 }
185 virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2) {
186 return TRUE;
187 }
188 virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
189 return TRUE;
190 }
191};
192
194 obj->incref();
195}
196
198 if (obj->decref() == 0) {
199 // delete obj;
200 }
201}
202
203typedef std::map<std::string, SharedObject *> SharedObjectTable;
204
205class Region : public SharedObject {
206private:
208public:
211 virtual ~Region() { }
212 Lock *get_lock() { return &region_lock; }
213 void lock() {
214 if (!region_lock.is_locked())
216 }
217 void unlock() {
220 }
221 int is_locked() {
222 return region_lock.is_locked();
223 }
224};
225
232
245
248
250 Lock *lock, int type, string &name, SharedConstructor scons)
251{
252 int was_locked = lock->is_locked();
254 if (!was_locked)
255 lock->lock();
256 if (table.count(name)) {
257 result = table[name];
258 if (result->get_type() != type)
259 result = NULL;
260 } else {
261 result = scons();
262 result->set_type(type);
263 result->set_name(name);
264 table.insert(pair<string,SharedObject *>(name, result));
265 }
266 if (!was_locked)
267 lock->unlock();
268 return result;
269}
270
272 Lock *lock, string &name)
273{
274 int was_locked = lock->is_locked();
276 if (!was_locked)
277 lock->lock();
278 if (table.count(name)) {
279 result = table[name];
280 }
281 if (!was_locked)
282 lock->unlock();
283 return result;
284}
285
287private:
290protected:
291 int tx_begin() {
292 if (!region)
293 lock->lock();
294 else {
295 if (!lock->is_locked()) {
296 return 0;
297 }
298 }
299 return 1;
300 }
301 void tx_end() {
302 if (!region)
303 lock->unlock();
304 }
305public:
308 }
309 void set_region(Region *region_init) {
310 region = region_init;
311 if (region_init) {
312 lock = region_init->get_lock();
313 } else {
314 lock = new Lock();
315 }
316 }
317 virtual ~Transactional() { if (!region && lock) delete lock; }
318};
319
320class TxTable: public Transactional {
321private:
322 std::map<string, string> entries;
323public:
325 virtual ~TxTable() { }
326 int put(string &key, string &value) {
327 int result = 0;
328 if (!tx_begin()) return -1;
329 if (entries.count(key)) {
330 entries[key] = value;
331 } else {
332 entries.insert(pair<string, string>(key, value));
333 result = 1;
334 }
335 tx_end();
336 return result;
337 }
338 int get(string &key, string &value) {
339 int result = 0;
340 if (!tx_begin()) return -1;
341 if (entries.count(key)) {
342 value = entries[key];
343 result = 1;
344 }
345 tx_end();
346 return result;
347 }
348 int check(string &key) {
349 int result;
350 if (!tx_begin()) return -1;
351 result = entries.count(key);
352 tx_end();
353 return result;
354 }
355};
356
357class TxList: public Transactional {
358private:
359 vector<string> entries;
360public:
362 virtual ~TxList() { }
363 int put(size_t index, string &value) {
364 int result = -1;
365 if (!tx_begin()) return -1;
366 if (index >= 1 && index <= entries.size()) {
367 entries[index-1] = value;
368 result = 1;
369 } else {
370 entries.resize(index+1);
371 entries[index-1] = value;
372 result = 0;
373 }
374 tx_end();
375 return result;
376 }
377 int get(size_t index, string &value) {
378 int result = 0;
379 if (!tx_begin()) return -1;
380 if (index >= 1 && index <= entries.size()) {
381 result = (entries[index-1].size() != 0);
382 if (result)
383 value = entries[index-1];
384 }
385 tx_end();
386 return result;
387 }
388 long size() {
389 long result;
390 if (!tx_begin()) return -1;
391 result = (long) entries.size();
392 tx_end();
393 return result;
394 }
395};
396
398private:
399 queue<string> q;
402public:
404 virtual ~SingularChannel() { }
405 void send(string item) {
406 lock.lock();
407 q.push(item);
408 cond.signal();
409 lock.unlock();
410 }
411 string receive() {
412 lock.lock();
413 while (q.empty()) {
414 cond.wait();
415 }
416 string result = q.front();
417 q.pop();
418 if (!q.empty())
419 cond.signal();
420 lock.unlock();
421 return result;
422 }
423 long count() {
424 lock.lock();
425 long result = q.size();
426 lock.unlock();
427 return result;
428 }
429};
430
432private:
433 string value;
434 int init;
437public:
439 virtual ~SingularSyncVar() { }
440 void acquire() {
441 lock.lock();
442 }
443 void release() {
444 lock.unlock();
445 }
446 void wait_init() {
447 while (!init)
448 cond.wait();
449 }
451 if (value.size() == 0) return NULL;
453 }
454 void update(leftv val) {
456 init = 1;
457 cond.broadcast();
458 }
459 int write(string item) {
460 int result = 0;
461 lock.lock();
462 if (!init) {
463 value = item;
464 init = 1;
465 cond.broadcast();
466 result = 1;
467 }
468 lock.unlock();
469 return result;
470 }
471 string read() {
472 lock.lock();
473 while (!init)
474 cond.wait();
475 string result = value;
476 lock.unlock();
477 return result;
478 }
479 int check() {
480 lock.lock();
481 int result = init;
482 lock.unlock();
483 return result;
484 }
485};
486
487void *shared_init(blackbox *b) {
488 return omAlloc0(sizeof(SharedObject *));
489}
490
492 acquireShared(obj);
493 void *result = omAlloc0(sizeof(SharedObject *));
494 *(SharedObject **)result = obj;
495 return result;
496}
497
498void shared_destroy(blackbox *b, void *d) {
499 SharedObject *obj = *(SharedObject **)d;
500 if (obj) {
502 *(SharedObject **)d = NULL;
503 }
504}
505
506void rlock_destroy(blackbox *b, void *d) {
507 SharedObject *obj = *(SharedObject **)d;
508 ((Region *) obj)->unlock();
509 if (obj) {
511 *(SharedObject **)d = NULL;
512 }
513}
514
515void *shared_copy(blackbox *b, void *d) {
516 SharedObject *obj = *(SharedObject **)d;
517 void *result = shared_init(b);
518 *(SharedObject **)result = obj;
519 if (obj)
520 acquireShared(obj);
521 return result;
522}
523
525 if (r->Typ() == l->Typ()) {
526 if (l->rtyp == IDHDL) {
527 omFree(IDDATA((idhdl)l->data));
528 IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
529 } else {
530 leftv ll=l->LData();
531 if (ll==NULL)
532 {
533 return TRUE; // out of array bounds or similiar
534 }
535 if (ll->data) {
537 omFree(ll->data);
538 }
539 ll->data = shared_copy(NULL,r->Data());
540 }
541 } else {
542 Werror("assign %s(%d) = %s(%d)",
543 Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
544 return TRUE;
545 }
546 return FALSE;
547}
548
550 if (r->Typ() == l->Typ()) {
551 if (l->rtyp == IDHDL) {
552 omFree(IDDATA((idhdl)l->data));
553 IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
554 } else {
555 leftv ll=l->LData();
556 if (ll==NULL)
557 {
558 return TRUE; // out of array bounds or similiar
559 }
560 rlock_destroy(NULL, ll->data);
561 omFree(ll->data);
562 ll->data = shared_copy(NULL,r->Data());
563 }
564 } else {
565 Werror("assign %s(%d) = %s(%d)",
566 Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
567 return TRUE;
568 }
569 return FALSE;
570}
571
572
574 int lt = l->Typ();
575 int rt = r->Typ();
576 if (lt != DEF_CMD && lt != rt) {
577 const char *rn=Tok2Cmdname(rt);
578 const char *ln=Tok2Cmdname(lt);
579 Werror("cannot assign %s (%d) to %s (%d)\n", rn, rt, ln, lt);
580 return TRUE;
581 }
582 return FALSE;
583}
584
586 SharedObject *obj = *(SharedObject **)a1->Data();
587 return obj->op2(op, res, a1, a2);
588}
589
591 SharedObject *obj = *(SharedObject **)a1->Data();
592 return obj->op3(op, res, a1, a2, a3);
593}
594
595char *shared_string(blackbox *b, void *d) {
596 char buf[80];
597 SharedObject *obj = *(SharedObject **)d;
598 if (!obj)
599 return omStrDup("<uninitialized shared object>");
600 int type = obj->get_type();
601 string &name = obj->get_name();
602 const char *type_name = "unknown";
603 if (type == type_channel)
604 type_name = "channel";
605 else if (type == type_atomic_table)
606 type_name = "atomic_table";
607 else if (type == type_shared_table)
608 type_name = "shared_table";
609 else if (type == type_atomic_list)
610 type_name = "atomic_list";
611 else if (type == type_shared_list)
612 type_name = "shared_list";
613 else if (type == type_syncvar)
614 type_name = "syncvar";
615 else if (type == type_region)
616 type_name = "region";
617 else if (type == type_regionlock)
618 type_name = "regionlock";
619 else if (type == type_thread) {
620 sprintf(buf, "<thread #%s>", name.c_str());
621 return omStrDup(buf);
622 }
623 else if (type == type_threadpool) {
624 if (name.size() > 0) {
625 name_lock.lock();
626 sprintf(buf, "<threadpool \"%.40s\"@%p>", name.c_str(), obj);
628 } else
629 sprintf(buf, "<threadpool @%p>", obj);
630 return omStrDup(buf);
631 }
632 else if (type == type_job) {
633 if (name.size() > 0) {
634 name_lock.lock();
635 sprintf(buf, "<job \"%.40s\"@%p>", name.c_str(), obj);
637 } else
638 sprintf(buf, "<job @%p>", obj);
639 return omStrDup(buf);
640 }
641 else if (type == type_trigger) {
642 if (name.size() > 0) {
643 name_lock.lock();
644 sprintf(buf, "<trigger \"%.40s\"@%p>", name.c_str(), obj);
646 } else
647 sprintf(buf, "<trigger @%p>", obj);
648 return omStrDup(buf);
649 } else {
650 sprintf(buf, "<unknown type %d>", type);
651 return omStrDup(buf);
652 }
653 sprintf(buf, "<%s \"%.40s\">", type_name, name.c_str());
654 return omStrDup(buf);
655}
656
657char *rlock_string(blackbox *b, void *d) {
658 char buf[80];
659 SharedObject *obj = *(SharedObject **)d;
660 if (!obj)
661 return omStrDup("<uninitialized region lock>");
662 sprintf(buf, "<region lock \"%.40s\">", obj->get_name().c_str());
663 return omStrDup(buf);
664}
665
666void report(const char *fmt, const char *name) {
667 char buf[80];
668 sprintf(buf, fmt, name);
669 WerrorS(buf);
670}
671
672int wrong_num_args(const char *name, leftv arg, int n) {
673 for (int i=1; i<=n; i++) {
674 if (!arg) {
675 report("%s: too few arguments", name);
676 return TRUE;
677 }
678 arg = arg->next;
679 }
680 if (arg) {
681 report("%s: too many arguments", name);
682 return TRUE;
683 }
684 return FALSE;
685}
686
687int not_a_uri(const char *name, leftv arg) {
688 if (arg->Typ() != STRING_CMD) {
689 report("%s: not a valid URI", name);
690 return TRUE;
691 }
692 return FALSE;
693}
694
695int not_a_region(const char *name, leftv arg) {
696 if (arg->Typ() != type_region || !arg->Data()) {
697 report("%s: not a region", name);
698 return TRUE;
699 }
700 return FALSE;
701}
702
703
704char *str(leftv arg) {
705 return (char *)(arg->Data());
706}
707
709 return new TxTable();
710}
711
713 return new TxList();
714}
715
717 return new SingularChannel();
718}
719
721 return new SingularSyncVar();
722}
723
725 return new Region();
726}
727
728static void appendArg(vector<leftv> &argv, string &s) {
729 if (s.size() == 0) return;
731 if (val->Typ() == NONE) {
732 omFreeBin(val, sleftv_bin);
733 return;
734 }
735 argv.push_back(val);
736}
737
738static void appendArg(vector<leftv> &argv, leftv arg) {
739 argv.push_back(arg);
740}
741
742static void appendArgCopy(vector<leftv> &argv, leftv arg) {
744 val->Copy(arg);
745 argv.push_back(val);
746}
747
748
750 const char *procname, const vector<leftv> &argv)
751{
752 leftv procnode = (leftv) omAlloc0Bin(sleftv_bin);
753 procnode->name = omStrDup(procname);
754 procnode->req_packhdl = basePack;
755 int error = procnode->Eval();
756 if (error) {
757 Werror("procedure \"%s\" not found", procname);
758 omFreeBin(procnode, sleftv_bin);
759 return TRUE;
760 }
761 memset(&result, 0, sizeof(result));
762 leftv *tail = &procnode->next;
763 for (unsigned i = 0; i < argv.size(); i++) {
764 *tail = argv[i];
765 tail = &(*tail)->next;
766 }
767 *tail = NULL;
768 error = iiExprArithM(&result, procnode, '(');
769 procnode->CleanUp();
770 omFreeBin(procnode, sleftv_bin);
771 if (error) {
772 Werror("procedure call of \"%s\" failed", procname);
773 return TRUE;
774 }
775 return FALSE;
776}
777
779 if (wrong_num_args("makeAtomicTable", arg, 1))
780 return TRUE;
781 if (not_a_uri("makeAtomicTable", arg))
782 return TRUE;
783 string uri = str(arg);
786 ((TxTable *) obj)->set_region(NULL);
788 result->data = new_shared(obj);
789 return FALSE;
790}
791
793 if (wrong_num_args("makeAtomicList", arg, 1))
794 return TRUE;
795 if (not_a_uri("makeAtomicList", arg))
796 return TRUE;
797 string uri = str(arg);
800 ((TxList *) obj)->set_region(NULL);
801 result->rtyp = type_atomic_list;
802 result->data = new_shared(obj);
803 return FALSE;
804}
805
807 if (wrong_num_args("makeSharedTable", arg, 2))
808 return TRUE;
809 if (not_a_region("makeSharedTable", arg))
810 return TRUE;
811 if (not_a_uri("makeSharedTable", arg->next))
812 return TRUE;
813 Region *region = *(Region **) arg->Data();
814 fflush(stdout);
815 string s = str(arg->next);
818 ((TxTable *) obj)->set_region(region);
820 result->data = new_shared(obj);
821 return FALSE;
822}
823
825 if (wrong_num_args("makeSharedList", arg, 2))
826 return TRUE;
827 if (not_a_region("makeSharedList", arg))
828 return TRUE;
829 if (not_a_uri("makeSharedList", arg->next))
830 return TRUE;
831 Region *region = *(Region **) arg->Data();
832 string s = str(arg->next);
834 region->get_lock(), type_shared_list, s, consList);
835 ((TxList *) obj)->set_region(region);
836 result->rtyp = type_shared_list;
837 result->data = new_shared(obj);
838 return FALSE;
839}
840
842 if (wrong_num_args("makeChannel", arg, 1))
843 return TRUE;
844 if (not_a_uri("makeChannel", arg))
845 return TRUE;
846 string uri = str(arg);
849 result->rtyp = type_channel;
850 result->data = new_shared(obj);
851 return FALSE;
852}
853
855 if (wrong_num_args("makeSyncVar", arg, 1))
856 return TRUE;
857 if (not_a_uri("makeSyncVar", arg))
858 return TRUE;
859 string uri = str(arg);
862 result->rtyp = type_syncvar;
863 result->data = new_shared(obj);
864 return FALSE;
865}
866
868 if (wrong_num_args("makeRegion", arg, 1))
869 return TRUE;
870 if (not_a_uri("makeRegion", arg))
871 return TRUE;
872 string uri = str(arg);
875 result->rtyp = type_region;
876 result->data = new_shared(obj);
877 return FALSE;
878}
879
881 if (wrong_num_args("findSharedObject", arg, 1))
882 return TRUE;
883 if (not_a_uri("findSharedObject", arg))
884 return TRUE;
885 string uri = str(arg);
887 &global_objects_lock, uri);
888 result->rtyp = INT_CMD;
889 result->data = (char *)(long)(obj != NULL);
890 return FALSE;
891}
892
894 if (wrong_num_args("findSharedObject", arg, 1))
895 return TRUE;
896 if (not_a_uri("findSharedObject", arg))
897 return TRUE;
898 string uri = str(arg);
900 &global_objects_lock, uri);
901 int type = obj ? obj->get_type() : -1;
902 const char *type_name = "undefined";
903 if (type == type_channel)
904 type_name = "channel";
905 else if (type == type_atomic_table)
906 type_name = "atomic_table";
907 else if (type == type_shared_table)
908 type_name = "shared_table";
909 else if (type == type_atomic_list)
910 type_name = "atomic_list";
911 else if (type == type_shared_list)
912 type_name = "shared_list";
913 else if (type == type_syncvar)
914 type_name = "syncvar";
915 else if (type == type_region)
916 type_name = "region";
917 else if (type == type_regionlock)
918 type_name = "regionlock";
919 result->rtyp = STRING_CMD;
920 result->data = (char *)(omStrDup(type_name));
921 return FALSE;
922}
923
925 if (wrong_num_args("bindSharedObject", arg, 1))
926 return TRUE;
927 if (not_a_uri("bindSharedObject", arg))
928 return TRUE;
929 string uri = str(arg);
931 &global_objects_lock, uri);
932 if (!obj) {
933 WerrorS("bindSharedObject: cannot find object");
934 return TRUE;
935 }
936 result->rtyp = obj->get_type();
937 result->data = new_shared(obj);
938 return FALSE;
939}
940
942 if (wrong_num_args("getTable", arg, 2))
943 return TRUE;
944 if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
945 WerrorS("getTable: not a valid table");
946 return TRUE;
947 }
948 if (arg->next->Typ() != STRING_CMD) {
949 WerrorS("getTable: not a valid table key");
950 return TRUE;
951 }
952 TxTable *table = *(TxTable **) arg->Data();
953 if (!table) {
954 WerrorS("getTable: table has not been initialized");
955 return TRUE;
956 }
957 string key = (char *)(arg->next->Data());
958 string value;
959 int success = table->get(key, value);
960 if (success < 0) {
961 WerrorS("getTable: region not acquired");
962 return TRUE;
963 }
964 if (success == 0) {
965 WerrorS("getTable: key not found");
966 return TRUE;
967 }
968 leftv tmp = LinTree::from_string(value);
969 result->rtyp = tmp->Typ();
970 result->data = tmp->Data();
971 return FALSE;
972}
973
975 if (wrong_num_args("inTable", arg, 2))
976 return TRUE;
977 if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
978 WerrorS("inTable: not a valid table");
979 return TRUE;
980 }
981 if (arg->next->Typ() != STRING_CMD) {
982 WerrorS("inTable: not a valid table key");
983 return TRUE;
984 }
985 TxTable *table = *(TxTable **) arg->Data();
986 if (!table) {
987 WerrorS("inTable: table has not been initialized");
988 return TRUE;
989 }
990 string key = (char *)(arg->next->Data());
991 int success = table->check(key);
992 if (success < 0) {
993 WerrorS("inTable: region not acquired");
994 return TRUE;
995 }
996 result->rtyp = INT_CMD;
997 result->data = (char *)(long)(success);
998 return FALSE;
999}
1000
1002 if (wrong_num_args("putTable", arg, 3))
1003 return TRUE;
1004 if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
1005 WerrorS("putTable: not a valid table");
1006 return TRUE;
1007 }
1008 if (arg->next->Typ() != STRING_CMD) {
1009 WerrorS("putTable: not a valid table key");
1010 return TRUE;
1011 }
1012 TxTable *table = *(TxTable **) arg->Data();
1013 if (!table) {
1014 WerrorS("putTable: table has not been initialized");
1015 return TRUE;
1016 }
1017 string key = (char *)(arg->next->Data());
1018 string value = LinTree::to_string(arg->next->next);
1019 int success = table->put(key, value);
1020 if (success < 0) {
1021 WerrorS("putTable: region not acquired");
1022 return TRUE;
1023 }
1024 result->rtyp = NONE;
1025 return FALSE;
1026}
1027
1029 if (wrong_num_args("getList", arg, 2))
1030 return TRUE;
1031 if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1032 WerrorS("getList: not a valid list (atomic or shared)");
1033 return TRUE;
1034 }
1035 if (arg->next->Typ() != INT_CMD) {
1036 WerrorS("getList: index must be an integer");
1037 return TRUE;
1038 }
1039 TxList *list = *(TxList **) arg->Data();
1040 if (!list) {
1041 WerrorS("getList: list has not been initialized");
1042 return TRUE;
1043 }
1044 long index = (long)(arg->next->Data());
1045 string value;
1046 int success = list->get(index, value);
1047 if (success < 0) {
1048 WerrorS("getList: region not acquired");
1049 return TRUE;
1050 }
1051 if (success == 0) {
1052 WerrorS("getList: no value at position");
1053 return TRUE;
1054 }
1055 leftv tmp = LinTree::from_string(value);
1056 result->rtyp = tmp->Typ();
1057 result->data = tmp->Data();
1058 return FALSE;
1059}
1060
1062 if (wrong_num_args("putList", arg, 3))
1063 return TRUE;
1064 if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1065 WerrorS("putList: not a valid list (shared or atomic)");
1066 return TRUE;
1067 }
1068 if (arg->next->Typ() != INT_CMD) {
1069 WerrorS("putList: index must be an integer");
1070 return TRUE;
1071 }
1072 TxList *list = *(TxList **) arg->Data();
1073 if (!list) {
1074 WerrorS("putList: list has not been initialized");
1075 return TRUE;
1076 }
1077 long index = (long)(arg->next->Data());
1078 string value = LinTree::to_string(arg->next->next);
1079 int success = list->put(index, value);
1080 if (success < 0) {
1081 WerrorS("putList: region not acquired");
1082 return TRUE;
1083 }
1084 result->rtyp = NONE;
1085 return FALSE;
1086}
1087
1089 if (wrong_num_args("lockRegion", arg, 1))
1090 return TRUE;
1091 if (not_a_region("lockRegion", arg))
1092 return TRUE;
1093 Region *region = *(Region **)arg->Data();
1094 if (region->is_locked()) {
1095 WerrorS("lockRegion: region is already locked");
1096 return TRUE;
1097 }
1098 region->lock();
1099 result->rtyp = NONE;
1100 return FALSE;
1101}
1102
1104 if (wrong_num_args("lockRegion", arg, 1))
1105 return TRUE;
1106 if (not_a_region("lockRegion", arg))
1107 return TRUE;
1108 Region *region = *(Region **)arg->Data();
1109 if (region->is_locked()) {
1110 WerrorS("lockRegion: region is already locked");
1111 return TRUE;
1112 }
1113 region->lock();
1114 result->rtyp = type_regionlock;
1115 result->data = new_shared(region);
1116 return FALSE;
1117}
1118
1119
1121 if (wrong_num_args("unlockRegion", arg, 1))
1122 return TRUE;
1123 if (not_a_region("unlockRegion", arg))
1124 return TRUE;
1125 Region *region = *(Region **)arg->Data();
1126 if (!region->is_locked()) {
1127 WerrorS("unlockRegion: region is not locked");
1128 return TRUE;
1129 }
1130 region->unlock();
1131 result->rtyp = NONE;
1132 return FALSE;
1133}
1134
1136 if (wrong_num_args("sendChannel", arg, 2))
1137 return TRUE;
1138 if (arg->Typ() != type_channel) {
1139 WerrorS("sendChannel: argument is not a channel");
1140 return TRUE;
1141 }
1142 SingularChannel *channel = *(SingularChannel **)arg->Data();
1143 if (!channel) {
1144 WerrorS("sendChannel: channel has not been initialized");
1145 return TRUE;
1146 }
1147 channel->send(LinTree::to_string(arg->next));
1148 result->rtyp = NONE;
1149 return FALSE;
1150}
1151
1153 if (wrong_num_args("receiveChannel", arg, 1))
1154 return TRUE;
1155 if (arg->Typ() != type_channel) {
1156 WerrorS("receiveChannel: argument is not a channel");
1157 return TRUE;
1158 }
1159 SingularChannel *channel = *(SingularChannel **)arg->Data();
1160 if (!channel) {
1161 WerrorS("receiveChannel: channel has not been initialized");
1162 return TRUE;
1163 }
1164 string item = channel->receive();
1165 leftv val = LinTree::from_string(item);
1166 result->rtyp = val->Typ();
1167 result->data = val->Data();
1168 return FALSE;
1169}
1170
1172 if (wrong_num_args("statChannel", arg, 1))
1173 return TRUE;
1174 if (arg->Typ() != type_channel) {
1175 WerrorS("statChannel: argument is not a channel");
1176 return TRUE;
1177 }
1178 SingularChannel *channel = *(SingularChannel **)arg->Data();
1179 if (!channel) {
1180 WerrorS("receiveChannel: channel has not been initialized");
1181 return TRUE;
1182 }
1183 long n = channel->count();
1184 result->rtyp = INT_CMD;
1185 result->data = (char *)n;
1186 return FALSE;
1187}
1188
1190 if (wrong_num_args("writeSyncVar", arg, 2))
1191 return TRUE;
1192 if (arg->Typ() != type_syncvar) {
1193 WerrorS("writeSyncVar: argument is not a syncvar");
1194 return TRUE;
1195 }
1196 SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1197 if (!syncvar) {
1198 WerrorS("writeSyncVar: syncvar has not been initialized");
1199 return TRUE;
1200 }
1201 if (!syncvar->write(LinTree::to_string(arg->next))) {
1202 WerrorS("writeSyncVar: variable already has a value");
1203 return TRUE;
1204 }
1205 result->rtyp = NONE;
1206 return FALSE;
1207}
1208
1210 Command cmd("updateSyncVar", result, arg);
1211 cmd.check_argc_min(2);
1212 cmd.check_arg(0, type_syncvar, "first argument must be a syncvar");
1213 cmd.check_init(0, "syncvar has not been initialized");
1214 cmd.check_arg(1, STRING_CMD, "second argument must be a string");
1215 if (cmd.ok()) {
1216 SingularSyncVar *syncvar = cmd.shared_arg<SingularSyncVar>(0);
1217 char *procname = (char *) cmd.arg(1);
1218 arg = arg->next->next;
1219 syncvar->acquire();
1220 syncvar->wait_init();
1221 vector<leftv> argv;
1222 appendArg(argv, syncvar->get());
1223 while (arg) {
1224 appendArgCopy(argv, arg);
1225 arg = arg->next;
1226 }
1227 int error = executeProc(*result, procname, argv);
1228 if (!error) {
1229 syncvar->update(result);
1230 }
1231 syncvar->release();
1232 return error;
1233 }
1234 return cmd.status();
1235}
1236
1237
1239 if (wrong_num_args("readSyncVar", arg, 1))
1240 return TRUE;
1241 if (arg->Typ() != type_syncvar) {
1242 WerrorS("readSyncVar: argument is not a syncvar");
1243 return TRUE;
1244 }
1245 SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1246 if (!syncvar) {
1247 WerrorS("readSyncVar: syncvar has not been initialized");
1248 return TRUE;
1249 }
1250 string item = syncvar->read();
1251 leftv val = LinTree::from_string(item);
1252 result->rtyp = val->Typ();
1253 result->data = val->Data();
1254 return FALSE;
1255}
1256
1258 if (wrong_num_args("statSyncVar", arg, 1))
1259 return TRUE;
1260 if (arg->Typ() != type_syncvar) {
1261 WerrorS("statSyncVar: argument is not a syncvar");
1262 return TRUE;
1263 }
1264 SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1265 if (!syncvar) {
1266 WerrorS("statSyncVar: syncvar has not been initialized");
1267 return TRUE;
1268 }
1269 int init = syncvar->check();
1270 result->rtyp = INT_CMD;
1271 result->data = (char *)(long) init;
1272 return FALSE;
1273}
1274
1276 SharedObject *obj = *(SharedObject **)(val->Data());
1277 acquireShared(obj);
1278 lintree.put(obj);
1279}
1280
1282 int type = lintree.get_prev<int>();
1283 SharedObject *obj = lintree.get<SharedObject *>();
1285 result->rtyp = type;
1286 result->data = (void *)new_shared(obj);
1287 return result;
1288}
1289
1290void ref_shared(LinTree::LinTree &lintree, int by) {
1291 SharedObject *obj = lintree.get<SharedObject *>();
1292 while (by > 0) {
1293 obj->incref();
1294 by--;
1295 }
1296 while (by < 0) {
1297 obj->decref();
1298 by++;
1299 }
1300}
1301
1302void installShared(int type) {
1304}
1305
1306void makeSharedType(int &type, const char *name) {
1307 if (type != 0) return;
1308 blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1309 b->blackbox_Init = shared_init;
1310 b->blackbox_destroy = shared_destroy;
1311 b->blackbox_Copy = shared_copy;
1312 b->blackbox_String = shared_string;
1313 b->blackbox_Assign = shared_assign;
1314 b->blackbox_CheckAssign = shared_check_assign;
1315 // b->blackbox_Op2 = shared_op2;
1316 // b->blackbox_Op3 = shared_op3;
1317 type = setBlackboxStuff(b, name);
1318 installShared(type);
1319}
1320
1321void makeRegionlockType(int &type, const char *name) {
1322 if (type != 0) return;
1323 blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1324 b->blackbox_Init = shared_init;
1325 b->blackbox_destroy = rlock_destroy;
1326 b->blackbox_Copy = shared_copy;
1327 b->blackbox_String = shared_string;
1328 b->blackbox_Assign = rlock_assign;
1329 b->blackbox_CheckAssign = shared_check_assign;
1330 type = setBlackboxStuff(b, name);
1331 installShared(type);
1332}
1333
1334#define MAX_THREADS 128
1335
1337public:
1341 void *(*thread_func)(ThreadState *, void *);
1342 void *arg, *result;
1343 pthread_t id;
1344 pthread_t parent;
1348 queue<string> to_thread;
1349 queue<string> from_thread;
1351 to_thread(), from_thread() {
1352 active = false;
1353 running = false;
1354 index = -1;
1355 }
1357 // We do nothing here. This is to prevent the condition
1358 // variable destructor from firing upon program exit,
1359 // which would invoke undefined behavior if the thread
1360 // is still running.
1361 }
1362};
1363
1365
1367
1368void setOption(int ch) {
1369 int index = feGetOptIndex(ch);
1370 feSetOptValue((feOptIndex) index, (int) 1);
1371}
1372
1374 master_lock.lock();
1377#ifdef ENABLE_THREADS
1379 siInit(global_argv0);
1380#endif
1381 setOption('q');
1382 // setOption('b');
1383}
1384
1385void *thread_main(void *arg) {
1386 ThreadState *ts = (ThreadState *)arg;
1387 thread_init();
1388 return ts->thread_func(ts, ts->arg);
1389}
1390
1391void *interpreter_thread(ThreadState *ts, void *arg) {
1392 ts->lock.lock();
1393 for (;;) {
1394 bool eval = false;
1395 while (ts->to_thread.empty())
1396 ts->to_cond.wait();
1397 /* TODO */
1398 string expr = ts->to_thread.front();
1399 switch (expr[0]) {
1400 case '\0': case 'q':
1401 ts->lock.unlock();
1402 return NULL;
1403 case 'x':
1404 eval = false;
1405 break;
1406 case 'e':
1407 eval = true;
1408 break;
1409 }
1410 ts->to_thread.pop();
1411 expr = ts->to_thread.front();
1412 /* this will implicitly eval commands */
1413 leftv val = LinTree::from_string(expr);
1414 expr = LinTree::to_string(val);
1415 ts->to_thread.pop();
1416 if (eval)
1417 ts->from_thread.push(expr);
1418 ts->from_cond.signal();
1419 }
1420 ts->lock.unlock();
1421 return NULL;
1422}
1423
1425private:
1427public:
1428 InterpreterThread(ThreadState *ts_init) : SharedObject(), ts(ts_init) { }
1432 ts = NULL;
1433 }
1434};
1435
1436static ThreadState *newThread(void *(*thread_func)(ThreadState *, void *),
1437 void *arg, const char **error) {
1438 ThreadState *ts = NULL;
1439 if (error) *error = NULL;
1440 thread_lock.lock();
1441 for (int i=0; i<MAX_THREADS; i++) {
1442 if (!thread_state[i].active) {
1443 ts = thread_state + i;
1444 ts->index = i;
1445 ts->parent = pthread_self();
1446 ts->active = true;
1447 ts->running = true;
1448 ts->to_thread = queue<string>();
1449 ts->from_thread = queue<string>();
1450 ts->thread_func = thread_func;
1451 ts->arg = arg;
1452 ts->result = NULL;
1453 if (pthread_create(&ts->id, NULL, thread_main, ts)<0) {
1454 if (error)
1455 *error = "createThread: internal error: failed to create thread";
1456 goto fail;
1457 }
1458 goto exit;
1459 }
1460 }
1461 if (error) *error = "createThread: too many threads";
1462 fail:
1463 ts = NULL;
1464 exit:
1466 return ts;
1467}
1468
1469ThreadState *createThread(void *(*thread_func)(ThreadState *, void *),
1470 void *arg) {
1471 return newThread(thread_func, arg, NULL);
1472}
1473
1475 void *result;
1476 pthread_join(ts->id, NULL);
1477 result = ts->result;
1478 thread_lock.lock();
1479 ts->running = false;
1480 ts->active = false;
1482 return result;
1483}
1484
1487 if (*error) return NULL;
1488 InterpreterThread *thread = new InterpreterThread(ts);
1489 char buf[10];
1490 sprintf(buf, "%d", ts->index);
1491 string name(buf);
1492 thread->set_name(name);
1493 thread->set_type(type_thread);
1494 return thread;
1495}
1496
1498 Command cmd("createThread", result, arg);
1499 cmd.check_argc(0);
1500 const char *error;
1501 if (!have_threads)
1502 cmd.report("thread support not available");
1503 if (!cmd.ok()) return cmd.status();
1505 if (error) {
1506 return cmd.abort(error);
1507 }
1508 cmd.set_result(type_thread, new_shared(thread));
1509 return cmd.status();
1510}
1511
1513 ThreadState *ts = thread->getThreadState();
1514 if (ts && ts->parent != pthread_self()) {
1515 return false;
1516 }
1517 ts->lock.lock();
1518 string quit("q");
1519 ts->to_thread.push(quit);
1520 ts->to_cond.signal();
1521 ts->lock.unlock();
1522 pthread_join(ts->id, NULL);
1523 thread_lock.lock();
1524 ts->running = false;
1525 ts->active = false;
1526 thread->clearThreadState();
1528 return true;
1529}
1530
1532 if (wrong_num_args("joinThread", arg, 1))
1533 return TRUE;
1534 if (arg->Typ() != type_thread) {
1535 WerrorS("joinThread: argument is not a thread");
1536 return TRUE;
1537 }
1538 InterpreterThread *thread = *(InterpreterThread **)arg->Data();
1539 if (!joinInterpreterThread(thread)) {
1540 WerrorS("joinThread: can only be called from parent thread");
1541 return TRUE;
1542 }
1543 return FALSE;
1544}
1545
1546class ThreadPool;
1547class Trigger;
1548
1549class Job : public SharedObject {
1550public:
1552 long prio;
1553 size_t id;
1555 vector<Job *> deps;
1556 vector<Job *> notify;
1557 vector<Trigger *> triggers;
1558 vector<string> args;
1559 string result; // lintree-encoded
1560 void *data;
1561 bool fast;
1562 bool done;
1568 result(), args(), notify(), triggers(), prio(0)
1569 { set_type(type_job); }
1570 ~Job();
1571 void addDep(Job *job) {
1572 deps.push_back(job);
1573 }
1574 void addDep(vector<Job *> &jobs);
1575 void addDep(long ndeps, Job **jobs);
1576 void addNotify(vector<Job *> &jobs);
1577 void addNotify(Job *job);
1578 virtual bool ready();
1579 virtual void execute() = 0;
1580 void run();
1581};
1582
1584 bool operator()(const Job* lhs, const Job* rhs) {
1585 if (lhs->fast < rhs->fast) {
1586 return true;
1587 }
1588 if (lhs->prio < rhs->prio) {
1589 return true;
1590 }
1591 if (lhs->prio == rhs->prio) {
1592 return lhs->id > rhs->id;
1593 }
1594 return false;
1595 }
1596};
1597
1598class Trigger : public Job {
1599public:
1600 virtual bool accept(leftv arg) = 0;
1601 virtual void activate(leftv arg) = 0;
1603};
1604
1606 vector<Job *>::iterator it;
1607 for (it = deps.begin(); it != deps.end(); it++) {
1608 if (!(*it)->done) return false;
1609 }
1610 return true;
1611}
1612
1614 vector<Job *>::iterator it;
1615 for (it = deps.begin(); it != deps.end(); it++) {
1616 releaseShared(*it);
1617 }
1618}
1619
1620typedef queue<Job *> JobQueue;
1621
1622class Scheduler;
1623
1627 int num;
1628};
1629
1632
1633class ThreadPool : public SharedObject {
1634public:
1637 ThreadPool(Scheduler *sched, int n);
1638 ThreadPool(int n);
1639 ~ThreadPool();
1640 ThreadState *getThread(int i);
1641 void shutdown(bool wait);
1642 void addThread(ThreadState *thread);
1643 void attachJob(Job *job);
1644 void detachJob(Job *job);
1645 void queueJob(Job *job);
1646 void broadcastJob(Job *job);
1647 void cancelDeps(Job * job);
1648 void cancelJob(Job *job);
1649 void waitJob(Job *job);
1650 void clearThreadState();
1651};
1652
1653
1654class Scheduler : public SharedObject {
1655private:
1657 size_t jobid;
1663 vector<ThreadState *> threads;
1664 vector<ThreadPool *> thread_owners;
1665 priority_queue<Job *, vector<Job *>, JobCompare> global_queue;
1666 vector<JobQueue *> thread_queues;
1667 vector<Job *> pending;
1670 friend class Job;
1671public:
1673 Scheduler(int n) :
1675 single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1676 lock(true), cond(&lock), response(&lock),
1678 maxconcurrency(n), running(0)
1679 {
1680 thread_queues.push_back(new JobQueue());
1681 }
1683 maxconcurrency = n;
1684 }
1686 return maxconcurrency;
1687 }
1689 int n;
1690 for (unsigned i = 0; i <thread_owners.size(); i++) {
1691 if (thread_owners[i] == pool)
1692 n++;
1693 }
1694 return n;
1695 }
1696 virtual ~Scheduler() {
1697 for (unsigned i = 0; i < thread_queues.size(); i++) {
1698 JobQueue *q = thread_queues[i];
1699 while (!q->empty()) {
1700 Job *job = q->front();
1701 q->pop();
1702 releaseShared(job);
1703 }
1704 }
1705 thread_queues.clear();
1706 threads.clear();
1707 }
1708 ThreadState *getThread(int i) { return threads[i]; }
1709 void shutdown(bool wait) {
1710 if (single_threaded) {
1711 SchedInfo *info = new SchedInfo();
1712 info->num = 0;
1713 info->scheduler = this;
1714 acquireShared(this);
1715 info->job = NULL;
1717 return;
1718 }
1719 lock.lock();
1720 if (wait) {
1721 while (!global_queue.empty()) {
1722 response.wait();
1723 }
1724 }
1725 shutting_down = true;
1726 while (shutdown_counter < nthreads) {
1727 cond.broadcast();
1728 response.wait();
1729 }
1730 lock.unlock();
1731 for (unsigned i = 0; i <threads.size(); i++) {
1733 }
1734 }
1735 void addThread(ThreadPool *owner, ThreadState *thread) {
1736 lock.lock();
1737 thread_owners.push_back(owner);
1738 threads.push_back(thread);
1739 thread_queues.push_back(new JobQueue());
1740 lock.unlock();
1741 }
1742 void attachJob(ThreadPool *pool, Job *job) {
1743 lock.lock();
1744 job->pool = pool;
1745 job->id = jobid++;
1746 acquireShared(job);
1747 if (job->ready()) {
1748 global_queue.push(job);
1749 cond.signal();
1750 }
1751 else if (job->pending_index < 0) {
1752 job->pool = pool;
1753 job->pending_index = pending.size();
1754 pending.push_back(job);
1755 }
1756 lock.unlock();
1757 }
1758 void detachJob(Job *job) {
1759 lock.lock();
1760 long i = job->pending_index;
1761 job->pending_index = -1;
1762 if (i >= 0) {
1763 job = pending.back();
1764 pending.resize(pending.size()-1);
1765 pending[i] = job;
1766 job->pending_index = i;
1767 }
1768 lock.unlock();
1769 }
1770 void queueJob(Job *job) {
1771 lock.lock();
1772 global_queue.push(job);
1773 cond.signal();
1774 lock.unlock();
1775 }
1776 void broadcastJob(ThreadPool *pool, Job *job) {
1777 lock.lock();
1778 for (unsigned i = 0; i <thread_queues.size(); i++) {
1779 if (thread_owners[i] == pool) {
1780 acquireShared(job);
1781 thread_queues[i]->push(job);
1782 }
1783 }
1784 lock.unlock();
1785 }
1786 void cancelDeps(Job * job) {
1787 vector<Job *> &notify = job->notify;
1788 for (unsigned i = 0; i <notify.size(); i++) {
1789 Job *next = notify[i];
1790 if (!next->cancelled) {
1791 cancelJob(next);
1792 }
1793 }
1794 }
1795 void cancelJob(Job *job) {
1796 lock.lock();
1797 if (!job->cancelled) {
1798 job->cancelled = true;
1799 if (!job->running && !job->done) {
1800 job->done = true;
1801 cancelDeps(job);
1802 }
1803 }
1804 lock.unlock();
1805 }
1806 void waitJob(Job *job) {
1807 if (single_threaded) {
1808 SchedInfo *info = new SchedInfo();
1809 info->num = 0;
1810 info->scheduler = this;
1811 acquireShared(this);
1812 info->job = job;
1814 } else {
1815 lock.lock();
1816 for (;;) {
1817 if (job->done || job->cancelled) {
1818 break;
1819 }
1820 response.wait();
1821 }
1822 response.signal(); // forward signal
1823 lock.unlock();
1824 }
1825 }
1827 threads.clear();
1828 }
1829 static void notifyDeps(Scheduler *scheduler, Job *job) {
1830 vector<Job *> &notify = job->notify;
1831 job->incref(notify.size());
1832 for (unsigned i = 0; i <notify.size(); i++) {
1833 Job *next = notify[i];
1834 if (!next->queued && next->ready() && !next->cancelled) {
1835 next->queued = true;
1836 scheduler->queueJob(next);
1837 }
1838 }
1839 vector<Trigger *> &triggers = job->triggers;
1840 leftv arg = NULL;
1841 if (triggers.size() > 0 && job->result.size() > 0)
1842 arg = LinTree::from_string(job->result);
1843 for (unsigned i = 0; i < triggers.size(); i++) {
1844 Trigger *trigger = triggers[i];
1845 if (trigger->accept(arg)) {
1846 trigger->activate(arg);
1847 if (trigger->ready())
1848 scheduler->queueJob(trigger);
1849 }
1850 }
1851 if (arg) {
1852 arg->CleanUp();
1853 omFreeBin(arg, sleftv_bin);
1854 }
1855 }
1856 static void *main(ThreadState *ts, void *arg) {
1857 SchedInfo *info = (SchedInfo *) arg;
1858 Scheduler *scheduler = info->scheduler;
1859 ThreadPool *oldThreadPool = currentThreadPoolRef;
1860 // TODO: set current thread pool
1861 // currentThreadPoolRef = pool;
1862 Lock &lock = scheduler->lock;
1863 ConditionVariable &cond = scheduler->cond;
1864 ConditionVariable &response = scheduler->response;
1865 JobQueue *my_queue = scheduler->thread_queues[info->num];
1866 if (!scheduler->single_threaded)
1867 thread_init();
1868 lock.lock();
1869 for (;;) {
1870 if (info->job && info->job->done)
1871 break;
1872 if (scheduler->shutting_down) {
1873 scheduler->shutdown_counter++;
1874 scheduler->response.signal();
1875 break;
1876 }
1877 if (!my_queue->empty()) {
1878 Job *job = my_queue->front();
1879 my_queue->pop();
1880 if (!scheduler->global_queue.empty())
1881 cond.signal();
1882 currentJobRef = job;
1883 job->run();
1885 notifyDeps(scheduler, job);
1886 releaseShared(job);
1887 scheduler->response.signal();
1888 continue;
1889 } else if (!scheduler->global_queue.empty()) {
1890 Job *job = scheduler->global_queue.top();
1891 scheduler->global_queue.pop();
1892 if (!scheduler->global_queue.empty())
1893 cond.signal();
1894 currentJobRef = job;
1895 job->run();
1897 notifyDeps(scheduler, job);
1898 releaseShared(job);
1899 scheduler->response.signal();
1900 continue;
1901 } else {
1902 if (scheduler->single_threaded) {
1903 break;
1904 }
1905 cond.wait();
1906 }
1907 }
1908 // TODO: correct current thread pool
1909 // releaseShared(currentThreadPoolRef);
1910 currentThreadPoolRef = oldThreadPool;
1911 scheduler->lock.unlock();
1912 delete info;
1913 return NULL;
1914 }
1915};
1916
1917ThreadPool::ThreadPool(int n) : SharedObject(), nthreads(n) {
1918 scheduler = new Scheduler(n);
1920}
1921ThreadPool::ThreadPool(Scheduler *sched, int n) : SharedObject(), nthreads(n) {
1922 scheduler = sched;
1923 acquireShared(sched);
1924}
1927}
1931 scheduler->addThread(this, thread);
1932}
1934 scheduler->attachJob(this, job);
1935}
1937 scheduler->detachJob(job);
1938}
1940 scheduler->queueJob(job);
1941}
1943 scheduler->broadcastJob(this, job);
1944}
1946 scheduler->cancelDeps(job);
1947}
1949 scheduler->cancelJob(job);
1950}
1952 scheduler->waitJob(job);
1953}
1956}
1957
1958void Job::addDep(vector<Job *> &jobs) {
1959 deps.insert(deps.end(), jobs.begin(), jobs.end());
1960}
1961
1962void Job::addDep(long ndeps, Job **jobs) {
1963 for (long i = 0; i < ndeps; i++) {
1964 deps.push_back(jobs[i]);
1965 }
1966}
1967
1968void Job::addNotify(vector<Job *> &jobs) {
1969 notify.insert(notify.end(), jobs.begin(), jobs.end());
1970 if (done) {
1972 }
1973}
1974
1976 notify.push_back(job);
1977 if (done) {
1979 }
1980}
1981
1982void Job::run() {
1983 if (!cancelled) {
1984 running = true;
1987 execute();
1989 pool->scheduler->lock.lock();
1990 running = false;
1991 }
1992 done = true;
1993}
1994
1995class AccTrigger : public Trigger {
1996private:
1997 long count;
1998public:
1999 AccTrigger(long count_init): Trigger(), count(count_init) {
2000 }
2001 virtual bool ready() {
2002 if (!Trigger::ready()) return false;
2003 return args.size() >= count;
2004 }
2005 virtual bool accept(leftv arg) {
2006 return true;
2007 }
2008 virtual void activate(leftv arg) {
2009 while (arg != NULL && !ready()) {
2010 args.push_back(LinTree::to_string(arg));
2011 if (ready()) {
2012 return;
2013 }
2014 arg = arg->next;
2015 }
2016 }
2017 virtual void execute() {
2019 l->Init(args.size());
2020 for (unsigned i = 0; i < args.size(); i++) {
2022 memcpy(&l->m[i], val, sizeof(*val));
2023 omFreeBin(val, sleftv_bin);
2024 }
2025 sleftv val;
2026 memset(&val, 0, sizeof(val));
2027 val.rtyp = LIST_CMD;
2028 val.data = l;
2029 result = LinTree::to_string(&val);
2030 // val.CleanUp();
2031 }
2032};
2033
2034class CountTrigger : public Trigger {
2035private:
2036 long count;
2037public:
2038 CountTrigger(long count_init): Trigger(), count(count_init) {
2039 }
2040 virtual bool ready() {
2041 if (!Trigger::ready()) return false;
2042 return count <= 0;
2043 }
2044 virtual bool accept(leftv arg) {
2045 return arg == NULL;
2046 }
2047 virtual void activate(leftv arg) {
2048 if (!ready()) {
2049 count--;
2050 }
2051 }
2052 virtual void execute() {
2053 // do nothing
2054 }
2055};
2056
2057class SetTrigger : public Trigger {
2058private:
2059 vector<bool> set;
2060 long count;
2061public:
2062 SetTrigger(long count_init) : Trigger(), count(0),
2063 set(count_init) {
2064 }
2065 virtual bool ready() {
2066 if (!Trigger::ready()) return false;
2067 return count == set.size();
2068 }
2069 virtual bool accept(leftv arg) {
2070 return arg->Typ() == INT_CMD;
2071 }
2072 virtual void activate(leftv arg) {
2073 if (!ready()) {
2074 long value = (long) arg->Data();
2075 if (value < 0 || value >= count) return;
2076 if (set[value]) return;
2077 set[value] = true;
2078 count++;
2079 }
2080 }
2081 virtual void execute() {
2082 // do nothing
2083 }
2084};
2085
2086
2087class ProcTrigger : public Trigger {
2088private:
2089 string procname;
2091public:
2092 ProcTrigger(const char *p) : Trigger(), procname(p), success(false) {
2093 }
2094 virtual bool ready() {
2095 if (!Trigger::ready()) return false;
2096 return success;
2097 }
2098 virtual bool accept(leftv arg) {
2099 return TRUE;
2100 }
2101 virtual void activate(leftv arg) {
2102 if (!ready()) {
2104 vector<leftv> argv;
2105 for (unsigned i = 0; i < args.size(); i++) {
2106 appendArg(argv, args[i]);
2107 }
2108 int error = false;
2109 while (arg) {
2110 appendArgCopy(argv, arg);
2111 arg = arg->next;
2112 }
2113 sleftv val;
2114 if (!error)
2115 error = executeProc(val, procname.c_str(), argv);
2116 if (!error) {
2117 if (val.Typ() == NONE || (val.Typ() == INT_CMD &&
2118 (long) val.Data()))
2119 {
2120 success = true;
2121 }
2122 val.CleanUp();
2123 }
2124 pool->scheduler->lock.lock();
2125 }
2126 }
2127 virtual void execute() {
2128 // do nothing
2129 }
2130};
2131
2133 long n;
2134 Command cmd("createThreadPool", result, arg);
2135 cmd.check_argc(1, 2);
2136 cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2137 if (cmd.ok()) {
2138 n = (long) cmd.arg(0);
2139 if (n < 0) cmd.report("number of threads must be non-negative");
2140 else if (n >= 256) cmd.report("number of threads too large");
2141 if (!have_threads && n != 0)
2142 cmd.report("in single-threaded mode, number of threads must be zero");
2143 }
2144 if (cmd.ok()) {
2145 ThreadPool *pool = new ThreadPool((int) n);
2147 for (int i = 0; i <n; i++) {
2148 const char *error;
2149 SchedInfo *info = new SchedInfo();
2150 info->scheduler = pool->scheduler;
2151 acquireShared(pool->scheduler);
2152 info->job = NULL;
2153 info->num = i;
2155 if (!thread) {
2156 // TODO: clean up bad pool
2157 return cmd.abort(error);
2158 }
2159 pool->addThread(thread);
2160 }
2162 }
2163 return cmd.status();
2164}
2165
2167 Command cmd("createThreadPoolSet", result, arg);
2168 cmd.check_argc(2);
2169 cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2170 cmd.check_arg(1, LIST_CMD, "second argument must be a list of integers");
2171 lists l;
2172 int n;
2173 if (cmd.ok()) {
2174 l = (lists) (cmd.arg(1));
2175 n = lSize(l)+1;
2176 if (n == 0)
2177 return cmd.abort("second argument must not be empty");
2178 for (int i = 0; i < n; i++) {
2179 if (l->m[i].Typ() != INT_CMD)
2180 return cmd.abort("second argument must be a list of integers");
2181 }
2182 }
2183 lists pools = (lists) omAlloc0Bin(slists_bin);
2184 pools->Init(n);
2185 if (cmd.ok()) {
2186 long s = 0;
2187 for (int i = 0; i < n; i++) {
2188 s += (long) (l->m[i].Data());
2189 }
2190 Scheduler *sched = new Scheduler((int)s);
2191 sched->set_maxconcurrency(cmd.int_arg(0));
2192 for (int i = 0; i < n; i++) {
2193 long m = (long) (l->m[i].Data());
2194 ThreadPool *pool = new ThreadPool(sched, (int) m);
2196 for (int j = 0; j < m; j++) {
2197 const char *error;
2198 SchedInfo *info = new SchedInfo();
2199 info->scheduler = pool->scheduler;
2200 acquireShared(pool->scheduler);
2201 info->job = NULL;
2202 info->num = i;
2204 if (!thread) {
2205 // TODO: clean up bad pool
2206 return cmd.abort(error);
2207 }
2208 pool->addThread(thread);
2209 }
2210 pools->m[i].rtyp = type_threadpool;
2211 pools->m[i].data = new_shared(pool);
2212 }
2213 cmd.set_result(LIST_CMD, pools);
2214 }
2215 return cmd.status();
2216}
2217
2218ThreadPool *createThreadPool(int nthreads, int prioThreads = 0) {
2219 ThreadPool *pool = new ThreadPool((int) nthreads);
2221 for (int i = 0; i <nthreads; i++) {
2222 const char *error;
2223 SchedInfo *info = new SchedInfo();
2224 info->scheduler = pool->scheduler;
2225 acquireShared(pool);
2226 info->job = NULL;
2227 info->num = i;
2229 if (!thread) {
2230 return NULL;
2231 }
2232 pool->addThread(thread);
2233 }
2234 return pool;
2235}
2236
2237void release(ThreadPool *pool) {
2238 releaseShared(pool);
2239}
2240
2241void retain(ThreadPool *pool) {
2242 acquireShared(pool);
2243}
2244
2246 return currentThreadPoolRef;
2247}
2248
2250 Command cmd("getThreadPoolWorkers", result, arg);
2251 cmd.check_argc(1);
2252 cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2253 cmd.check_init(0, "threadpool not initialized");
2254 int r = 0;
2255 if (cmd.ok()) {
2256 ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2257 Scheduler *sched = pool->scheduler;
2258 sched->lock.lock();
2259 r = sched->threadpool_size(pool);
2260 sched->lock.unlock();
2261 cmd.set_result(INT_CMD, r);
2262 }
2263 return cmd.status();
2264}
2265
2267 Command cmd("setThreadPoolWorkers", result, arg);
2268 cmd.check_argc(2);
2269 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2270 cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2271 cmd.check_init(0, "threadpool not initialized");
2272 if (cmd.ok()) {
2273 ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2274 Scheduler *sched = pool->scheduler;
2275 // TODO: count/add threads
2276 cmd.no_result();
2277 }
2278 return cmd.status();
2279}
2280
2282 Command cmd("getThreadPoolConcurrency", result, arg);
2283 cmd.check_argc(1);
2284 cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2285 cmd.check_init(0, "threadpool not initialized");
2286 if (cmd.ok()) {
2287 ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2288 Scheduler *sched = pool->scheduler;
2289 sched->lock.lock();
2290 cmd.set_result(INT_CMD, sched->get_maxconcurrency());
2291 sched->lock.unlock();
2292 }
2293 return cmd.status();
2294}
2295
2297 Command cmd("setThreadPoolWorkers", result, arg);
2298 cmd.check_argc(2);
2299 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2300 cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2301 cmd.check_init(0, "threadpool not initialized");
2302 if (cmd.ok()) {
2303 ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2304 Scheduler *sched = pool->scheduler;
2305 sched->lock.lock();
2306 sched->set_maxconcurrency(cmd.int_arg(1));
2307 sched->lock.unlock();
2308 cmd.no_result();
2309 }
2310 return cmd.status();
2311}
2312
2314 Command cmd("closeThreadPool", result, arg);
2315 cmd.check_argc(1, 2);
2316 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2317 cmd.check_init(0, "threadpool not initialized");
2318 if (cmd.nargs() > 1)
2319 cmd.check_arg(1, INT_CMD, "optional argument must be an integer");
2320 if (cmd.ok()) {
2321 ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2322 bool wait = cmd.nargs() == 2 ? (cmd.int_arg(1) != 0) : 1;
2323 pool->shutdown(wait);
2324 cmd.no_result();
2325 }
2326 return cmd.status();
2327}
2328
2330 pool->shutdown(wait);
2331}
2332
2333
2335 Command cmd("currentThreadPool", result, arg);
2336 cmd.check_argc(0);
2338 if (pool) {
2340 } else {
2341 cmd.report("no current threadpool");
2342 }
2343 return cmd.status();
2344}
2345
2347 Command cmd("setCurrentThreadPool", result, arg);
2348 cmd.check_argc(1);
2349 cmd.check_init(0, "threadpool not initialized");
2350 if (cmd.ok()) {
2351 ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2352 acquireShared(pool);
2355 currentThreadPoolRef = pool;
2356 }
2357 return cmd.status();
2358}
2359
2360class EvalJob : public Job {
2361public:
2362 EvalJob() : Job() { }
2363 virtual void execute() {
2365 result = (LinTree::to_string(val));
2366 val->CleanUp();
2367 omFreeBin(val, sleftv_bin);
2368 }
2369};
2370
2371class ExecJob : public Job {
2372public:
2373 ExecJob() : Job() { }
2374 virtual void execute() {
2376 val->CleanUp();
2377 omFreeBin(val, sleftv_bin);
2378 }
2379};
2380
2381class ProcJob : public Job {
2382 string procname;
2383public:
2384 ProcJob(const char *procname_init) : Job(),
2385 procname(procname_init) {
2386 set_name(procname_init);
2387 }
2388 virtual void execute() {
2389 vector<leftv> argv;
2390 for (unsigned i = 0; i <args.size(); i++) {
2391 appendArg(argv, args[i]);
2392 }
2393 for (unsigned i = 0; i < deps.size(); i++) {
2394 appendArg(argv, deps[i]->result);
2395 }
2396 sleftv val;
2397 int error = executeProc(val, procname.c_str(), argv);
2398 if (!error) {
2399 result = (LinTree::to_string(&val));
2400 val.CleanUp();
2401 }
2402 }
2403};
2404
2405class KernelJob : public Job {
2406private:
2407 void (*cfunc)(leftv result, leftv arg);
2408public:
2409 KernelJob(void (*func)(leftv result, leftv arg)) : cfunc(func) { }
2410 virtual void execute() {
2411 vector<leftv> argv;
2412 for (unsigned i = 0; i <args.size(); i++) {
2413 appendArg(argv, args[i]);
2414 }
2415 for (unsigned i = 0; i < deps.size(); i++) {
2416 appendArg(argv, deps[i]->result);
2417 }
2418 sleftv val;
2419 memset(&val, 0, sizeof(val));
2420 if (argv.size() > 0) {
2421 leftv *tail = &argv[0]->next;
2422 for (unsigned i = 1; i < argv.size(); i++) {
2423 *tail = argv[i];
2424 tail = &(*tail)->next;
2425 }
2426 *tail = NULL;
2427 }
2428 cfunc(&val, argv[0]);
2429 result = (LinTree::to_string(&val));
2430 val.CleanUp();
2431 }
2432};
2433
2434class RawKernelJob : public Job {
2435private:
2436 void (*cfunc)(long ndeps, Job **deps);
2437public:
2438 RawKernelJob(void (*func)(long ndeps, Job **deps)) : cfunc(func) { }
2439 virtual void execute() {
2440 long ndeps = deps.size();
2441 Job **jobs = (Job **) omAlloc0(sizeof(Job *) * ndeps);
2442 for (long i = 0; i < ndeps; i++)
2443 jobs[i] = deps[i];
2444 cfunc(ndeps, jobs);
2445 omFree(jobs);
2446 }
2447};
2448
2450 Command cmd("createJob", result, arg);
2451 cmd.check_argc_min(1);
2453 "job name must be a string or quote expression");
2454 if (cmd.ok()) {
2455 if (cmd.test_arg(0, STRING_CMD)) {
2456 ProcJob *job = new ProcJob((char *)(cmd.arg(0)));
2457 for (leftv a = arg->next; a != NULL; a = a->next) {
2458 job->args.push_back(LinTree::to_string(a));
2459 }
2460 cmd.set_result(type_job, new_shared(job));
2461 } else {
2462 cmd.check_argc(1);
2463 Job *job = new EvalJob();
2464 job->args.push_back(LinTree::to_string(arg));
2465 cmd.set_result(type_job, new_shared(job));
2466 }
2467 }
2468 return cmd.status();
2469}
2470
2471Job *createJob(void (*func)(leftv result, leftv arg)) {
2472 KernelJob *job = new KernelJob(func);
2473 return job;
2474}
2475
2476Job *createJob(void (*func)(long ndeps, Job **deps)) {
2477 RawKernelJob *job = new RawKernelJob(func);
2478 return job;
2479}
2480
2481Job *startJob(ThreadPool *pool, Job *job, leftv arg) {
2482 if (job->pool) return NULL;
2483 while (arg) {
2484 job->args.push_back(LinTree::to_string(arg));
2485 arg = arg->next;
2486 }
2487 pool->attachJob(job);
2488 return job;
2489}
2490
2492 return startJob(pool, job, NULL);
2493}
2494
2495// Job *scheduleJob(ThreadPool *pool, Job *job, long ndeps, Job **deps) {
2496// if (job->pool) return NULL;
2497// pool->scheduler->lock.lock();
2498// bool cancelled = false;
2499// job->addDep(ndeps, deps);
2500// for (long i = 0; i < ndeps; i++) {
2501// deps[i]->addNotify(job);
2502// cancelled |= deps[i]->cancelled;
2503// }
2504// if (cancelled) {
2505// job->pool = pool;
2506// pool->cancelJob(job);
2507// }
2508// else
2509// pool->attachJob(job);
2510// pool->scheduler->lock.unlock();
2511// return FIXME: missing/unclear what this is supposed to be
2512// }
2513
2514void cancelJob(Job *job) {
2515 ThreadPool *pool = job->pool;
2516 if (pool) pool->cancelJob(job);
2517}
2518
2520 return currentJobRef;
2521}
2522
2524 Command cmd("startJob", result, arg);
2525 cmd.check_argc_min(1);
2526 int has_pool = cmd.test_arg(0, type_threadpool);
2527 cmd.check_argc_min(1+has_pool);
2528 if (has_pool)
2529 cmd.check_init(0, "threadpool not initialized");
2530 int has_prio = cmd.test_arg(has_pool, INT_CMD);
2531 long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2532 int first_arg = has_pool + has_prio;
2533 cmd.check_arg(first_arg, type_job, STRING_CMD,
2534 "job argument must be a job or string");
2535 if (cmd.ok() && cmd.argtype(first_arg) == type_job)
2536 cmd.check_init(first_arg, "job not initialized");
2537 if (!cmd.ok()) return cmd.status();
2538 ThreadPool *pool;
2539 if (has_pool)
2540 pool = cmd.shared_arg<ThreadPool>(0);
2541 else {
2543 return cmd.abort("no current threadpool defined");
2544 pool = currentThreadPoolRef;
2545 }
2546 Job *job;
2547 if (cmd.argtype(first_arg) == type_job)
2548 job = *(Job **)(cmd.arg(first_arg));
2549 else
2550 job = new ProcJob((char *)(cmd.arg(first_arg)));
2551 leftv a = arg->next;
2552 if (has_pool) a = a->next;
2553 if (has_prio) a = a->next;
2554 for (; a != NULL; a = a->next) {
2555 job->args.push_back(LinTree::to_string(a));
2556 }
2557 if (job->pool)
2558 return cmd.abort("job has already been scheduled");
2559 job->prio = prio;
2560 pool->attachJob(job);
2561 cmd.set_result(type_job, new_shared(job));
2562 return cmd.status();
2563}
2564
2566 Command cmd("waitJob", result, arg);
2567 cmd.check_argc(1);
2568 cmd.check_arg(0, type_job, "argument must be a job");
2569 cmd.check_init(0, "job not initialized");
2570 if (cmd.ok()) {
2571 Job *job = *(Job **)(cmd.arg(0));
2572 ThreadPool *pool = job->pool;
2573 if (!pool) {
2574 return cmd.abort("job has not yet been started or scheduled");
2575 }
2576 pool->waitJob(job);
2577 if (job->cancelled) {
2578 return cmd.abort("job has been cancelled");
2579 }
2580 if (job->result.size() == 0)
2581 cmd.no_result();
2582 else {
2584 cmd.set_result(res->Typ(), res->Data());
2585 }
2586 }
2587 return cmd.status();
2588}
2589
2590void waitJob(Job *job) {
2591 assert(job->pool != NULL);
2592 job->pool->waitJob(job);
2593}
2594
2596 Command cmd("cancelJob", result, arg);
2597 cmd.check_argc(1);
2598 cmd.check_arg(0, type_job, "argument must be a job");
2599 cmd.check_init(0, "job not initialized");
2600 if (cmd.ok()) {
2601 Job *job = cmd.shared_arg<Job>(0);
2602 ThreadPool *pool = job->pool;
2603 if (!pool) {
2604 return cmd.abort("job has not yet been started or scheduled");
2605 }
2606 pool->cancelJob(job);
2607 cmd.no_result();
2608 }
2609 return cmd.status();
2610}
2611
2613 Job *job;
2614 Command cmd("jobCancelled", result, arg);
2615 cmd.check_argc(0, 1);
2616 if (cmd.nargs() == 1) {
2617 cmd.check_arg(0, type_job, "argument must be a job");
2618 cmd.check_init(0, "job not initialized");
2619 job = cmd.shared_arg<Job>(0);
2620 } else {
2621 job = currentJobRef;
2622 if (!job)
2623 cmd.report("no current job");
2624 }
2625 if (cmd.ok()) {
2626 ThreadPool *pool = job->pool;
2627 if (!pool) {
2628 return cmd.abort("job has not yet been started or scheduled");
2629 }
2630 pool->scheduler->lock.lock();
2631 cmd.set_result((long) job->cancelled);
2632 pool->scheduler->lock.unlock();
2633 }
2634 return cmd.status();
2635}
2636
2638 ThreadPool *pool = job->pool;
2639 if (pool) pool->scheduler->lock.lock();
2640 bool result = job->cancelled;
2641 if (pool) pool->scheduler->lock.unlock();
2642 return result;
2643}
2644
2647}
2648
2649void setJobData(Job *job, void *data) {
2650 ThreadPool *pool = job->pool;
2651 if (pool) pool->scheduler->lock.lock();
2652 job->data = data;
2653 if (pool) pool->scheduler->lock.unlock();
2654}
2655
2656
2657void *getJobData(Job *job) {
2658 ThreadPool *pool = job->pool;
2659 if (pool) pool->scheduler->lock.lock();
2660 void *result = job->data;
2661 if (pool) pool->scheduler->lock.unlock();
2662 return result;
2663}
2664
2665void addJobArgs(Job *job, leftv arg) {
2666 ThreadPool *pool = job->pool;
2667 if (pool) pool->scheduler->lock.lock();
2668 while (arg) {
2669 job->args.push_back(LinTree::to_string(arg));
2670 arg = arg->next;
2671 }
2672 if (pool) pool->scheduler->lock.unlock();
2673}
2674
2676 ThreadPool *pool = job->pool;
2677 if (pool) pool->scheduler->lock.lock();
2679 if (pool) pool->scheduler->lock.unlock();
2680 return result;
2681}
2682
2683const char *getJobName(Job *job) {
2684 // TODO
2685 return "";
2686}
2687
2688void setJobName(Job *job, const char *name) {
2689 // TODO
2690}
2691
2693 Command cmd("createTrigger", result, arg);
2694 cmd.check_argc_min(1);
2695 int has_pool = cmd.test_arg(0, type_threadpool);
2696 ThreadPool *pool;
2697 if (has_pool) {
2698 cmd.check_init(0, "threadpool not initialized");
2699 pool = cmd.shared_arg<ThreadPool>(0);
2700 } else {
2701 pool = currentThreadPoolRef;
2702 if (!pool)
2703 return cmd.abort("no default threadpool");
2704 }
2705 cmd.check_argc(has_pool + 2);
2706 cmd.check_arg(has_pool + 0, STRING_CMD, "trigger subtype must be a string");
2707 const char *kind = (const char *)(cmd.arg(has_pool + 0));
2708 if (0 == strcmp(kind, "proc")) {
2709 cmd.check_arg(has_pool + 1, STRING_CMD, "proc trigger argument must be a string");
2710 } else {
2711 cmd.check_arg(has_pool + 1, INT_CMD, "trigger argument must be an integer");
2712 }
2713 if (cmd.ok()) {
2714 Trigger *trigger;
2715 long n = (long) (cmd.arg(has_pool + 1));
2716 if (n < 0)
2717 return cmd.abort("trigger argument must be a non-negative integer");
2718 if (0 == strcmp(kind, "acc")) {
2719 trigger = new AccTrigger(n);
2720 } else if (0 == strcmp(kind, "count")) {
2721 trigger = new CountTrigger(n);
2722 } else if (0 == strcmp(kind, "set")) {
2723 trigger = new SetTrigger(n);
2724 } else if (0 == strcmp(kind, "proc")) {
2725 trigger = new ProcTrigger((const char *) cmd.arg(has_pool + 1));
2726 } else {
2727 return cmd.abort("unknown trigger subtype");
2728 }
2729 pool->attachJob(trigger);
2730 cmd.set_result(type_trigger, new_shared(trigger));
2731 }
2732 return cmd.status();
2733}
2734
2736 Command cmd("updateTrigger", result, arg);
2737 cmd.check_argc_min(1);
2738 cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2739 cmd.check_init(0, "trigger not initialized");
2740 if (cmd.ok()) {
2741 Trigger *trigger = cmd.shared_arg<Trigger>(0);
2742 trigger->pool->scheduler->lock.lock();
2743 if (!trigger->accept(arg->next))
2744 cmd.report("incompatible argument type(s) for this trigger");
2745 else {
2746 trigger->activate(arg->next);
2747 if (trigger->ready()) {
2748 trigger->run();
2749 Scheduler::notifyDeps(trigger->pool->scheduler, trigger);
2750 }
2751 }
2752 trigger->pool->scheduler->lock.unlock();
2753 }
2754 return cmd.status();
2755}
2756
2758 Command cmd("chainTrigger", result, arg);
2759 cmd.check_argc(2);
2760 cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2762 "second argument must be a trigger or job");
2763 cmd.check_init(0, "trigger not initialized");
2764 cmd.check_init(1, "trigger/job not initialized");
2765 if (cmd.ok()) {
2766 Trigger *trigger = cmd.shared_arg<Trigger>(0);
2767 Job *job = cmd.shared_arg<Job>(1);
2768 if (trigger->pool != job->pool)
2769 return cmd.abort("arguments use different threadpools");
2770 ThreadPool *pool = trigger->pool;
2771 pool->scheduler->lock.lock();
2772 job->triggers.push_back(trigger);
2773 pool->scheduler->lock.unlock();
2774 }
2775 return cmd.status();
2776}
2777
2779 Command cmd("testTrigger", result, arg);
2780 cmd.check_argc(1);
2781 cmd.check_arg(0, type_trigger, "argument must be a trigger");
2782 cmd.check_init(0, "trigger not initialized");
2783 if (cmd.ok()) {
2784 Trigger *trigger = cmd.shared_arg<Trigger>(0);
2785 ThreadPool *pool = trigger->pool;
2786 pool->scheduler->lock.lock();
2787 cmd.set_result((long)trigger->ready());
2788 pool->scheduler->lock.unlock();
2789 }
2790 return cmd.status();
2791}
2792
2793
2795 vector<Job *> jobs;
2796 vector<Job *> deps;
2797 Command cmd("scheduleJob", result, arg);
2798 cmd.check_argc_min(1);
2799 int has_pool = cmd.test_arg(0, type_threadpool);
2800 if (has_pool) {
2801 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2802 cmd.check_init(0, "threadpool not initialized");
2803 }
2804 cmd.check_argc_min(has_pool+1);
2805 int has_prio = cmd.test_arg(has_pool, INT_CMD);
2806 ThreadPool *pool;
2807 if (has_pool)
2808 pool = cmd.shared_arg<ThreadPool>(0);
2809 else {
2811 return cmd.abort("no current threadpool defined");
2812 pool = currentThreadPoolRef;
2813 }
2814 long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2815 int first_arg = has_pool + has_prio;
2816 if (cmd.test_arg(first_arg, type_job)) {
2817 jobs.push_back(*(Job **)(cmd.arg(first_arg)));
2818 } else if (cmd.test_arg(first_arg, STRING_CMD)) {
2819 jobs.push_back(new ProcJob((char *)(cmd.arg(first_arg))));
2820 } else if (cmd.test_arg(first_arg, LIST_CMD)) {
2821 lists l = (lists) (cmd.arg(first_arg));
2822 int n = lSize(l);
2823 for (int i = 0; i < n; i++) {
2824 if (l->m[i].Typ() != type_job)
2825 return cmd.abort("job argument must be a job, string, or list of jobs");
2826 }
2827 for (int i = 0; i < n; i++) {
2828 Job *job = *(Job **) (l->m[i].Data());
2829 if (!job)
2830 return cmd.abort("job not initialized");
2831 jobs.push_back(job);
2832 }
2833 } else {
2834 return cmd.abort("job argument must be a job, string, or list of jobs");
2835 }
2836 bool error = false;
2837 leftv a = arg->next;
2838 if (has_pool) a = a->next;
2839 if (has_prio) a = a->next;
2840 for (; !error && a; a = a->next) {
2841 if (a->Typ() == type_job || a->Typ() == type_trigger) {
2842 deps.push_back(*(Job **)(a->Data()));
2843 } else if (a->Typ() == LIST_CMD) {
2844 lists l = (lists) a->Data();
2845 int n = lSize(l);
2846 for (int i = 0; i < n; i++) {
2847 if (l->m[i].Typ() == type_job || l->m[i].Typ() == type_trigger) {
2848 deps.push_back(*(Job **)(l->m[i].Data()));
2849 } else {
2850 error = true;
2851 break;
2852 }
2853 }
2854 }
2855 }
2856 if (error) {
2857 return cmd.abort("illegal dependency");
2858 }
2859 for (unsigned i = 0; i < jobs.size(); i++) {
2860 Job *job = jobs[i];
2861 if (job->pool) {
2862 return cmd.abort("job has already been scheduled");
2863 }
2864 job->prio = prio;
2865 }
2866 for (unsigned i = 0; i < deps.size(); i++) {
2867 Job *job = deps[i];
2868 if (!job->pool) {
2869 return cmd.abort("dependency has not yet been scheduled");
2870 }
2871 if (job->pool != pool) {
2872 return cmd.abort("dependency has been scheduled on a different threadpool");
2873 }
2874 }
2875 pool->scheduler->lock.lock();
2876 bool cancelled = false;
2877 for (unsigned i = 0; i < jobs.size(); i++) {
2878 jobs[i]->addDep(deps);
2879 }
2880 for (unsigned i = 0; i < deps.size(); i++) {
2881 deps[i]->addNotify(jobs);
2882 cancelled |= deps[i]->cancelled;
2883 }
2884 for (unsigned i = 0; i < jobs.size(); i++) {
2885 if (cancelled) {
2886 jobs[i]->pool = pool;
2887 pool->cancelJob(jobs[i]);
2888 }
2889 else
2890 pool->attachJob(jobs[i]);
2891 }
2892 pool->scheduler->lock.unlock();
2893 if (jobs.size() > 0)
2894 cmd.set_result(type_job, new_shared(jobs[0]));
2895 return cmd.status();
2896}
2897
2899 Command cmd("currentJob", result, arg);
2900 cmd.check_argc(0);
2901 Job *job = currentJobRef;
2902 if (job) {
2903 cmd.set_result(type_job, new_shared(job));
2904 } else {
2905 cmd.report("no current job");
2906 }
2907 return cmd.status();
2908}
2909
2910
2912 if (wrong_num_args("threadID", arg, 0))
2913 return TRUE;
2914 result->rtyp = INT_CMD;
2915 result->data = (char *)thread_id;
2916 return FALSE;
2917}
2918
2920 if (wrong_num_args("mainThread", arg, 0))
2921 return TRUE;
2922 result->rtyp = INT_CMD;
2923 result->data = (char *)(long)(thread_id == 0L);
2924 return FALSE;
2925}
2926
2928 if (wrong_num_args("threadEval", arg, 2))
2929 return TRUE;
2930 if (arg->Typ() != type_thread) {
2931 WerrorS("threadEval: argument is not a thread");
2932 return TRUE;
2933 }
2934 InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2935 string expr = LinTree::to_string(arg->next);
2936 ThreadState *ts = thread->getThreadState();
2937 if (ts && ts->parent != pthread_self()) {
2938 WerrorS("threadEval: can only be called from parent thread");
2939 return TRUE;
2940 }
2941 if (ts) ts->lock.lock();
2942 if (!ts || !ts->running || !ts->active) {
2943 WerrorS("threadEval: thread is no longer running");
2944 if (ts) ts->lock.unlock();
2945 return TRUE;
2946 }
2947 ts->to_thread.push("e");
2948 ts->to_thread.push(expr);
2949 ts->to_cond.signal();
2950 ts->lock.unlock();
2951 result->rtyp = NONE;
2952 return FALSE;
2953}
2954
2956 if (wrong_num_args("threadExec", arg, 2))
2957 return TRUE;
2958 if (arg->Typ() != type_thread) {
2959 WerrorS("threadExec: argument is not a thread");
2960 return TRUE;
2961 }
2962 InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2963 string expr = LinTree::to_string(arg->next);
2964 ThreadState *ts = thread->getThreadState();
2965 if (ts && ts->parent != pthread_self()) {
2966 WerrorS("threadExec: can only be called from parent thread");
2967 return TRUE;
2968 }
2969 if (ts) ts->lock.lock();
2970 if (!ts || !ts->running || !ts->active) {
2971 WerrorS("threadExec: thread is no longer running");
2972 if (ts) ts->lock.unlock();
2973 return TRUE;
2974 }
2975 ts->to_thread.push("x");
2976 ts->to_thread.push(expr);
2977 ts->to_cond.signal();
2978 ts->lock.unlock();
2979 result->rtyp = NONE;
2980 return FALSE;
2981}
2982
2984 Command cmd("threadPoolExec", result, arg);
2985 ThreadPool *pool;
2986 cmd.check_argc(1, 2);
2987 int has_pool = cmd.nargs() == 2;
2988 if (has_pool) {
2989 cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2990 cmd.check_init(0, "threadpool not initialized");
2991 pool = cmd.shared_arg<ThreadPool>(0);
2992 } else {
2993 pool = currentThreadPoolRef;
2994 if (!pool)
2995 return cmd.abort("no current threadpool");
2996 }
2997 if (cmd.ok()) {
2998 string expr = LinTree::to_string(has_pool ? arg->next : arg);
2999 Job* job = new ExecJob();
3000 job->args.push_back(expr);
3001 job->pool = pool;
3002 pool->broadcastJob(job);
3003 }
3004 return cmd.status();
3005}
3006
3008 if (wrong_num_args("threadResult", arg, 1))
3009 return TRUE;
3010 if (arg->Typ() != type_thread) {
3011 WerrorS("threadResult: argument is not a thread");
3012 return TRUE;
3013 }
3014 InterpreterThread *thread = *(InterpreterThread **)arg->Data();
3015 ThreadState *ts = thread->getThreadState();
3016 if (ts && ts->parent != pthread_self()) {
3017 WerrorS("threadResult: can only be called from parent thread");
3018 return TRUE;
3019 }
3020 if (ts) ts->lock.lock();
3021 if (!ts || !ts->running || !ts->active) {
3022 WerrorS("threadResult: thread is no longer running");
3023 if (ts) ts->lock.unlock();
3024 return TRUE;
3025 }
3026 while (ts->from_thread.empty()) {
3027 ts->from_cond.wait();
3028 }
3029 string expr = ts->from_thread.front();
3030 ts->from_thread.pop();
3031 ts->lock.unlock();
3032 leftv val = LinTree::from_string(expr);
3033 result->rtyp = val->Typ();
3034 result->data = val->Data();
3035 return FALSE;
3036}
3037
3039 Command cmd("setSharedName", result, arg);
3040 cmd.check_argc(2);
3041 int type = cmd.argtype(0);
3042 cmd.check_init(0, "first argument is not initialized");
3043 if (type != type_job && type != type_trigger && type != type_threadpool) {
3044 cmd.report("first argument must be a job, trigger, or threadpool");
3045 }
3046 cmd.check_arg(1, STRING_CMD, "second argument must be a string");
3047 if (cmd.ok()) {
3048 SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3049 name_lock.lock();
3050 obj->set_name((char *) cmd.arg(1));
3051 name_lock.unlock();
3052 }
3053 return cmd.status();
3054}
3055
3057 Command cmd("getSharedName", result, arg);
3058 cmd.check_argc(1);
3059 int type = cmd.argtype(0);
3060 cmd.check_init(0, "first argument is not initialized");
3061 if (type != type_job && type != type_trigger && type != type_threadpool) {
3062 cmd.report("first argument must be a job, trigger, or threadpool");
3063 }
3064 if (cmd.ok()) {
3065 SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3066 name_lock.lock();
3067 cmd.set_result(obj->get_name().c_str());
3068 name_lock.unlock();
3069 }
3070 return cmd.status();
3071}
3072
3073}
3074
3075using namespace LibThread;
3076
3077
3078extern "C" int SI_MOD_INIT(systhreads)(SModulFunctions *fn)
3079{
3080 const char *libname = currPack->libname;
3081 if (!libname) libname = "";
3082 master_lock.lock();
3083 if (!thread_state)
3085 makeSharedType(type_atomic_table, "atomic_table");
3086 makeSharedType(type_atomic_list, "atomic_list");
3087 makeSharedType(type_shared_table, "shared_table");
3088 makeSharedType(type_shared_list, "shared_list");
3089 makeSharedType(type_channel, "channel");
3090 makeSharedType(type_syncvar, "syncvar");
3091 makeSharedType(type_region, "region");
3092 makeSharedType(type_thread, "thread");
3093 makeSharedType(type_threadpool, "threadpool");
3094 makeSharedType(type_job, "job");
3095 makeSharedType(type_trigger, "trigger");
3096 makeRegionlockType(type_regionlock, "regionlock");
3097
3098 fn->iiAddCproc(libname, "putTable", FALSE, putTable);
3099 fn->iiAddCproc(libname, "getTable", FALSE, getTable);
3100 fn->iiAddCproc(libname, "inTable", FALSE, inTable);
3101 fn->iiAddCproc(libname, "putList", FALSE, putList);
3102 fn->iiAddCproc(libname, "getList", FALSE, getList);
3103 fn->iiAddCproc(libname, "lockRegion", FALSE, lockRegion);
3104 fn->iiAddCproc(libname, "regionLock", FALSE, regionLock);
3105 fn->iiAddCproc(libname, "unlockRegion", FALSE, unlockRegion);
3106 fn->iiAddCproc(libname, "sendChannel", FALSE, sendChannel);
3107 fn->iiAddCproc(libname, "receiveChannel", FALSE, receiveChannel);
3108 fn->iiAddCproc(libname, "statChannel", FALSE, statChannel);
3109 fn->iiAddCproc(libname, "writeSyncVar", FALSE, writeSyncVar);
3110 fn->iiAddCproc(libname, "updateSyncVar", FALSE, updateSyncVar);
3111 fn->iiAddCproc(libname, "readSyncVar", FALSE, readSyncVar);
3112 fn->iiAddCproc(libname, "statSyncVar", FALSE, statSyncVar);
3113
3114 fn->iiAddCproc(libname, "makeAtomicTable", FALSE, makeAtomicTable);
3115 fn->iiAddCproc(libname, "makeAtomicList", FALSE, makeAtomicList);
3116 fn->iiAddCproc(libname, "makeSharedTable", FALSE, makeSharedTable);
3117 fn->iiAddCproc(libname, "makeSharedList", FALSE, makeSharedList);
3118 fn->iiAddCproc(libname, "makeChannel", FALSE, makeChannel);
3119 fn->iiAddCproc(libname, "makeSyncVar", FALSE, makeSyncVar);
3120 fn->iiAddCproc(libname, "makeRegion", FALSE, makeRegion);
3121 fn->iiAddCproc(libname, "findSharedObject", FALSE, findSharedObject);
3122 fn->iiAddCproc(libname, "bindSharedObject", FALSE, bindSharedObject);
3123 fn->iiAddCproc(libname, "typeSharedObject", FALSE, typeSharedObject);
3124
3125 fn->iiAddCproc(libname, "createThread", FALSE, createThread);
3126 fn->iiAddCproc(libname, "joinThread", FALSE, joinThread);
3127 fn->iiAddCproc(libname, "createThreadPool", FALSE, createThreadPool);
3128 fn->iiAddCproc(libname, "createThreadPoolSet", FALSE, createThreadPoolSet);
3129#if 0
3130 fn->iiAddCproc(libname, "adjoinThreadPool", FALSE, adjoinThreadPool);
3131 fn->iiAddCproc(libname, "getAdjoinedThreadPools", FALSE, getAdjoinedThreadPools);
3132#endif
3133 fn->iiAddCproc(libname, "closeThreadPool", FALSE, closeThreadPool);
3134 fn->iiAddCproc(libname, "getThreadPoolWorkers", FALSE, getThreadPoolWorkers);
3135 fn->iiAddCproc(libname, "setThreadPoolWorkers", FALSE, setThreadPoolWorkers);
3136 fn->iiAddCproc(libname, "getThreadPoolConcurrency", FALSE, getThreadPoolConcurrency);
3137 fn->iiAddCproc(libname, "setThreadPoolConcurrency", FALSE, setThreadPoolConcurrency);
3138 fn->iiAddCproc(libname, "currentThreadPool", FALSE, currentThreadPool);
3139 fn->iiAddCproc(libname, "setCurrentThreadPool", FALSE, setCurrentThreadPool);
3140 fn->iiAddCproc(libname, "threadPoolExec", FALSE, threadPoolExec);
3141 fn->iiAddCproc(libname, "threadID", FALSE, threadID);
3142 fn->iiAddCproc(libname, "mainThread", FALSE, mainThread);
3143 fn->iiAddCproc(libname, "threadEval", FALSE, threadEval);
3144 fn->iiAddCproc(libname, "threadExec", FALSE, threadExec);
3145 fn->iiAddCproc(libname, "threadResult", FALSE, threadResult);
3146 fn->iiAddCproc(libname, "createJob", FALSE, createJob);
3147 fn->iiAddCproc(libname, "currentJob", FALSE, currentJob);
3148 fn->iiAddCproc(libname, "setSharedName", FALSE, setSharedName);
3149 fn->iiAddCproc(libname, "getSharedName", FALSE, getSharedName);
3150 fn->iiAddCproc(libname, "startJob", FALSE, startJob);
3151 fn->iiAddCproc(libname, "waitJob", FALSE, waitJob);
3152 fn->iiAddCproc(libname, "cancelJob", FALSE, cancelJob);
3153 fn->iiAddCproc(libname, "jobCancelled", FALSE, jobCancelled);
3154 fn->iiAddCproc(libname, "scheduleJob", FALSE, scheduleJob);
3155 fn->iiAddCproc(libname, "scheduleJobs", FALSE, scheduleJob);
3156 fn->iiAddCproc(libname, "createTrigger", FALSE, createTrigger);
3157 fn->iiAddCproc(libname, "updateTrigger", FALSE, updateTrigger);
3158 fn->iiAddCproc(libname, "testTrigger", FALSE, testTrigger);
3159 fn->iiAddCproc(libname, "chainTrigger", FALSE, chainTrigger);
3160
3161 LinTree::init();
3163
3164 return MAX_TOK;
3165}
int BOOLEAN
Definition: auxiliary.h:87
#define TRUE
Definition: auxiliary.h:100
#define FALSE
Definition: auxiliary.h:96
int setBlackboxStuff(blackbox *bb, const char *n)
define a new type
Definition: blackbox.cc:142
int l
Definition: cfEzgcd.cc:100
int m
Definition: cfEzgcd.cc:128
int i
Definition: cfEzgcd.cc:132
int p
Definition: cfModGcd.cc:4078
return false
Definition: cfModGcd.cc:84
CanonicalForm b
Definition: cfModGcd.cc:4103
void wait()
Definition: thread.h:88
void broadcast()
Definition: thread.h:103
void signal()
Definition: thread.h:97
AccTrigger(long count_init)
Definition: shared.cc:1999
virtual void execute()
Definition: shared.cc:2017
virtual void activate(leftv arg)
Definition: shared.cc:2008
virtual bool accept(leftv arg)
Definition: shared.cc:2005
virtual bool ready()
Definition: shared.cc:2001
void set_result(int type, long n)
Definition: shared.cc:132
int test_arg(int i, int type)
Definition: shared.cc:116
void check_init(int i, const char *err)
Definition: shared.cc:85
void report(const char *err)
Definition: shared.cc:111
BOOLEAN abort(const char *err)
Definition: shared.cc:148
void no_result()
Definition: shared.cc:136
long int_arg(int i)
Definition: shared.cc:108
BOOLEAN status()
Definition: shared.cc:142
void check_arg(int i, int type, int type2, const char *err)
Definition: shared.cc:91
leftv * args
Definition: shared.cc:46
void check_arg(int i, int type, const char *err)
Definition: shared.cc:81
const char * error
Definition: shared.cc:44
void * arg(int i)
Definition: shared.cc:101
void set_result(long n)
Definition: shared.cc:120
const char * name
Definition: shared.cc:43
void check_argc_min(int n)
Definition: shared.cc:77
void set_result(int type, void *p)
Definition: shared.cc:128
int argtype(int i)
Definition: shared.cc:95
void set_result(const char *s)
Definition: shared.cc:124
T * shared_arg(int i)
Definition: shared.cc:105
void check_argc(int n)
Definition: shared.cc:69
void check_argc(int lo, int hi)
Definition: shared.cc:73
Command(const char *n, leftv r, leftv a)
Definition: shared.cc:49
CountTrigger(long count_init)
Definition: shared.cc:2038
virtual void execute()
Definition: shared.cc:2052
virtual void activate(leftv arg)
Definition: shared.cc:2047
virtual bool accept(leftv arg)
Definition: shared.cc:2044
virtual bool ready()
Definition: shared.cc:2040
virtual void execute()
Definition: shared.cc:2363
virtual void execute()
Definition: shared.cc:2374
ThreadState * getThreadState()
Definition: shared.cc:1430
InterpreterThread(ThreadState *ts_init)
Definition: shared.cc:1428
vector< string > args
Definition: shared.cc:1558
vector< Job * > deps
Definition: shared.cc:1555
bool cancelled
Definition: shared.cc:1565
void run()
Definition: shared.cc:1982
ThreadPool * pool
Definition: shared.cc:1551
void * data
Definition: shared.cc:1560
string result
Definition: shared.cc:1559
long pending_index
Definition: shared.cc:1554
virtual bool ready()
Definition: shared.cc:1605
void addDep(Job *job)
Definition: shared.cc:1571
virtual void execute()=0
void addNotify(vector< Job * > &jobs)
Definition: shared.cc:1968
vector< Job * > notify
Definition: shared.cc:1556
vector< Trigger * > triggers
Definition: shared.cc:1557
KernelJob(void(*func)(leftv result, leftv arg))
Definition: shared.cc:2409
void(* cfunc)(leftv result, leftv arg)
Definition: shared.cc:2407
virtual void execute()
Definition: shared.cc:2410
ProcJob(const char *procname_init)
Definition: shared.cc:2384
virtual void execute()
Definition: shared.cc:2388
virtual bool accept(leftv arg)
Definition: shared.cc:2098
virtual void execute()
Definition: shared.cc:2127
ProcTrigger(const char *p)
Definition: shared.cc:2092
virtual void activate(leftv arg)
Definition: shared.cc:2101
virtual bool ready()
Definition: shared.cc:2094
virtual void execute()
Definition: shared.cc:2439
void(* cfunc)(long ndeps, Job **deps)
Definition: shared.cc:2436
RawKernelJob(void(*func)(long ndeps, Job **deps))
Definition: shared.cc:2438
SharedObjectTable objects
Definition: shared.cc:209
Lock * get_lock()
Definition: shared.cc:212
virtual ~Region()
Definition: shared.cc:211
virtual ~Scheduler()
Definition: shared.cc:1696
vector< ThreadPool * > thread_owners
Definition: shared.cc:1664
vector< JobQueue * > thread_queues
Definition: shared.cc:1666
void addThread(ThreadPool *owner, ThreadState *thread)
Definition: shared.cc:1735
ConditionVariable response
Definition: shared.cc:1669
void cancelDeps(Job *job)
Definition: shared.cc:1786
priority_queue< Job *, vector< Job * >, JobCompare > global_queue
Definition: shared.cc:1665
ConditionVariable cond
Definition: shared.cc:1668
ThreadState * getThread(int i)
Definition: shared.cc:1708
void set_maxconcurrency(int n)
Definition: shared.cc:1682
void detachJob(Job *job)
Definition: shared.cc:1758
void broadcastJob(ThreadPool *pool, Job *job)
Definition: shared.cc:1776
vector< ThreadState * > threads
Definition: shared.cc:1663
void cancelJob(Job *job)
Definition: shared.cc:1795
void waitJob(Job *job)
Definition: shared.cc:1806
int get_maxconcurrency()
Definition: shared.cc:1685
static void notifyDeps(Scheduler *scheduler, Job *job)
Definition: shared.cc:1829
void clearThreadState()
Definition: shared.cc:1826
void queueJob(Job *job)
Definition: shared.cc:1770
static void * main(ThreadState *ts, void *arg)
Definition: shared.cc:1856
void shutdown(bool wait)
Definition: shared.cc:1709
void attachJob(ThreadPool *pool, Job *job)
Definition: shared.cc:1742
vector< Job * > pending
Definition: shared.cc:1667
int threadpool_size(ThreadPool *pool)
Definition: shared.cc:1688
virtual void execute()
Definition: shared.cc:2081
virtual void activate(leftv arg)
Definition: shared.cc:2072
SetTrigger(long count_init)
Definition: shared.cc:2062
virtual bool ready()
Definition: shared.cc:2065
vector< bool > set
Definition: shared.cc:2059
virtual bool accept(leftv arg)
Definition: shared.cc:2069
std::string name
Definition: shared.cc:159
virtual ~SharedObject()
Definition: shared.cc:162
void set_name(std::string &name_init)
Definition: shared.cc:165
virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3)
Definition: shared.cc:188
void set_name(const char *s)
Definition: shared.cc:166
void incref(int by=1)
Definition: shared.cc:170
virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2)
Definition: shared.cc:185
void set_type(int type_init)
Definition: shared.cc:163
std::string & get_name()
Definition: shared.cc:169
ConditionVariable cond
Definition: shared.cc:401
queue< string > q
Definition: shared.cc:399
void send(string item)
Definition: shared.cc:405
virtual ~SingularChannel()
Definition: shared.cc:404
int write(string item)
Definition: shared.cc:459
virtual ~SingularSyncVar()
Definition: shared.cc:439
ConditionVariable cond
Definition: shared.cc:436
void update(leftv val)
Definition: shared.cc:454
void attachJob(Job *job)
Definition: shared.cc:1933
void waitJob(Job *job)
Definition: shared.cc:1951
void queueJob(Job *job)
Definition: shared.cc:1939
void broadcastJob(Job *job)
Definition: shared.cc:1942
ThreadState * getThread(int i)
Definition: shared.cc:1928
void cancelJob(Job *job)
Definition: shared.cc:1948
Scheduler * scheduler
Definition: shared.cc:1635
ThreadPool(Scheduler *sched, int n)
Definition: shared.cc:1921
void detachJob(Job *job)
Definition: shared.cc:1936
void cancelDeps(Job *job)
Definition: shared.cc:1945
void shutdown(bool wait)
Definition: shared.cc:1929
void addThread(ThreadState *thread)
Definition: shared.cc:1930
void *(* thread_func)(ThreadState *, void *)
Definition: shared.cc:1341
ConditionVariable to_cond
Definition: shared.cc:1346
queue< string > from_thread
Definition: shared.cc:1349
ConditionVariable from_cond
Definition: shared.cc:1347
queue< string > to_thread
Definition: shared.cc:1348
virtual ~Transactional()
Definition: shared.cc:317
void set_region(Region *region_init)
Definition: shared.cc:309
virtual void activate(leftv arg)=0
virtual bool accept(leftv arg)=0
virtual ~TxList()
Definition: shared.cc:362
int put(size_t index, string &value)
Definition: shared.cc:363
vector< string > entries
Definition: shared.cc:359
int get(size_t index, string &value)
Definition: shared.cc:377
int put(string &key, string &value)
Definition: shared.cc:326
std::map< string, string > entries
Definition: shared.cc:322
int get(string &key, string &value)
Definition: shared.cc:338
int check(string &key)
Definition: shared.cc:348
virtual ~TxTable()
Definition: shared.cc:325
void put(T data)
Definition: lintree.h:61
Definition: thread.h:17
bool is_locked()
Definition: thread.h:68
void lock()
Definition: thread.h:46
void unlock()
Definition: thread.h:57
Definition: idrec.h:35
Class used for (list of) interpreter objects.
Definition: subexpr.h:83
int Typ()
Definition: subexpr.cc:1019
const char * name
Definition: subexpr.h:87
package req_packhdl
Definition: subexpr.h:106
int rtyp
Definition: subexpr.h:91
void * Data()
Definition: subexpr.cc:1162
leftv next
Definition: subexpr.h:86
int Eval()
Definition: subexpr.cc:1945
void Copy(leftv e)
Definition: subexpr.cc:685
void * data
Definition: subexpr.h:88
void CleanUp(ring r=currRing)
Definition: subexpr.cc:348
Definition: lists.h:24
sleftv * m
Definition: lists.h:46
INLINE_THIS void Init(int l=0)
return result
Definition: facAbsBiFact.cc:75
const CanonicalForm int s
Definition: facAbsFact.cc:51
CanonicalForm res
Definition: facAbsFact.cc:60
CFList & eval
Definition: facFactorize.cc:47
int j
Definition: facHensel.cc:110
void WerrorS(const char *s)
Definition: feFopen.cc:24
feOptIndex
Definition: feOptGen.h:15
feOptIndex feGetOptIndex(const char *name)
Definition: feOpt.cc:104
const char * feSetOptValue(feOptIndex opt, char *optarg)
Definition: feOpt.cc:154
const char * Tok2Cmdname(int tok)
Definition: gentable.cc:140
#define STATIC_VAR
Definition: globaldefs.h:7
#define VAR
Definition: globaldefs.h:5
BOOLEAN iiExprArithM(leftv res, leftv a, int op)
Definition: iparith.cc:9414
VAR package basePack
Definition: ipid.cc:58
VAR package currPack
Definition: ipid.cc:57
EXTERN_VAR omBin sleftv_bin
Definition: ipid.h:145
#define IDDATA(a)
Definition: ipid.h:126
STATIC_VAR jList * T
Definition: janet.cc:30
ListNode * next
Definition: janet.h:31
#define info
Definition: libparse.cc:1256
void siInit(char *)
Definition: misc_ip.cc:1370
VAR omBin slists_bin
Definition: lists.cc:23
int lSize(lists L)
Definition: lists.cc:25
#define error(a)
Definition: mpr_numeric.cc:966
slists * lists
Definition: mpr_numeric.h:146
BOOLEAN readSyncVar(leftv result, leftv arg)
Definition: shared.cc:1238
char * str(leftv arg)
Definition: shared.cc:704
static BOOLEAN getThreadPoolWorkers(leftv result, leftv arg)
Definition: shared.cc:2249
void retain(Job *job)
BOOLEAN getTable(leftv result, leftv arg)
Definition: shared.cc:941
int type_thread
Definition: shared.cc:241
static BOOLEAN getThreadPoolConcurrency(leftv result, leftv arg)
Definition: shared.cc:2281
int type_region
Definition: shared.cc:233
ThreadState * createThread(void *(*thread_func)(ThreadState *, void *), void *arg)
Definition: shared.cc:1469
Job * getCurrentJob()
Definition: shared.cc:2519
SharedObject * consSyncVar()
Definition: shared.cc:720
int not_a_uri(const char *name, leftv arg)
Definition: shared.cc:687
static BOOLEAN setThreadPoolConcurrency(leftv result, leftv arg)
Definition: shared.cc:2296
static BOOLEAN scheduleJob(leftv result, leftv arg)
Definition: shared.cc:2794
BOOLEAN makeSyncVar(leftv result, leftv arg)
Definition: shared.cc:854
BOOLEAN shared_check_assign(blackbox *b, leftv l, leftv r)
Definition: shared.cc:573
BOOLEAN makeSharedList(leftv result, leftv arg)
Definition: shared.cc:824
BOOLEAN shared_assign(leftv l, leftv r)
Definition: shared.cc:524
void * new_shared(SharedObject *obj)
Definition: shared.cc:491
void ref_shared(LinTree::LinTree &lintree, int by)
Definition: shared.cc:1290
BOOLEAN getList(leftv result, leftv arg)
Definition: shared.cc:1028
Lock global_objects_lock
Definition: shared.cc:226
int type_atomic_list
Definition: shared.cc:239
static void appendArgCopy(vector< leftv > &argv, leftv arg)
Definition: shared.cc:742
void * shared_init(blackbox *b)
Definition: shared.cc:487
BOOLEAN statChannel(leftv result, leftv arg)
Definition: shared.cc:1171
BOOLEAN threadEval(leftv result, leftv arg)
Definition: shared.cc:2927
void * shared_copy(blackbox *b, void *d)
Definition: shared.cc:515
SharedObject * SharedObjectPtr
Definition: shared.cc:246
bool getJobCancelled()
Definition: shared.cc:2645
BOOLEAN unlockRegion(leftv result, leftv arg)
Definition: shared.cc:1120
BOOLEAN sendChannel(leftv result, leftv arg)
Definition: shared.cc:1135
void * joinThread(ThreadState *ts)
Definition: shared.cc:1474
Job * startJob(ThreadPool *pool, Job *job, leftv arg)
Definition: shared.cc:2481
static ThreadState * newThread(void *(*thread_func)(ThreadState *, void *), void *arg, const char **error)
Definition: shared.cc:1436
BOOLEAN setSharedName(leftv result, leftv arg)
Definition: shared.cc:3038
static void appendArg(vector< leftv > &argv, string &s)
Definition: shared.cc:728
SharedObject * makeSharedObject(SharedObjectTable &table, Lock *lock, int type, string &name, SharedConstructor scons)
Definition: shared.cc:249
BOOLEAN writeSyncVar(leftv result, leftv arg)
Definition: shared.cc:1189
std::map< std::string, SharedObject * > SharedObjectTable
Definition: shared.cc:203
BOOLEAN makeAtomicTable(leftv result, leftv arg)
Definition: shared.cc:778
BOOLEAN bindSharedObject(leftv result, leftv arg)
Definition: shared.cc:924
void report(const char *fmt, const char *name)
Definition: shared.cc:666
char * shared_string(blackbox *b, void *d)
Definition: shared.cc:595
BOOLEAN lockRegion(leftv result, leftv arg)
Definition: shared.cc:1088
BOOLEAN currentJob(leftv result, leftv arg)
Definition: shared.cc:2898
long thread_counter
Definition: shared.cc:231
int wrong_num_args(const char *name, leftv arg, int n)
Definition: shared.cc:672
ThreadState * thread_state
Definition: shared.cc:1366
static InterpreterThread * createInterpreterThread(const char **error)
Definition: shared.cc:1485
int type_threadpool
Definition: shared.cc:242
Lock name_lock(true)
int type_job
Definition: shared.cc:243
static BOOLEAN testTrigger(leftv result, leftv arg)
Definition: shared.cc:2778
ThreadPool * createThreadPool(int threads, int prioThreads=0)
Definition: shared.cc:2218
BOOLEAN threadExec(leftv result, leftv arg)
Definition: shared.cc:2955
BOOLEAN rlock_assign(leftv l, leftv r)
Definition: shared.cc:549
const int have_threads
Definition: shared.cc:38
void setJobData(Job *job, void *data)
Definition: shared.cc:2649
BOOLEAN putTable(leftv result, leftv arg)
Definition: shared.cc:1001
BOOLEAN makeAtomicList(leftv result, leftv arg)
Definition: shared.cc:792
ThreadPool * getCurrentThreadPool()
Definition: shared.cc:2245
STATIC_VAR Job * currentJobRef
Definition: shared.cc:1631
static BOOLEAN createTrigger(leftv result, leftv arg)
Definition: shared.cc:2692
int not_a_region(const char *name, leftv arg)
Definition: shared.cc:695
BOOLEAN currentThreadPool(leftv result, leftv arg)
Definition: shared.cc:2334
void addJobArgs(Job *job, leftv arg)
Definition: shared.cc:2665
void rlock_destroy(blackbox *b, void *d)
Definition: shared.cc:506
int type_atomic_table
Definition: shared.cc:237
void * interpreter_thread(ThreadState *ts, void *arg)
Definition: shared.cc:1391
static BOOLEAN setThreadPoolWorkers(leftv result, leftv arg)
Definition: shared.cc:2266
int type_syncvar
Definition: shared.cc:236
static BOOLEAN updateTrigger(leftv result, leftv arg)
Definition: shared.cc:2735
BOOLEAN getSharedName(leftv result, leftv arg)
Definition: shared.cc:3056
SharedObjectTable global_objects
Definition: shared.cc:227
SharedObject * consList()
Definition: shared.cc:712
BOOLEAN receiveChannel(leftv result, leftv arg)
Definition: shared.cc:1152
BOOLEAN mainThread(leftv result, leftv arg)
Definition: shared.cc:2919
void releaseShared(SharedObject *obj)
Definition: shared.cc:197
void * getJobData(Job *job)
Definition: shared.cc:2657
void closeThreadPool(ThreadPool *pool, bool wait)
Definition: shared.cc:2329
int type_shared_table
Definition: shared.cc:238
SharedObject * consChannel()
Definition: shared.cc:716
int type_channel
Definition: shared.cc:235
BOOLEAN threadPoolExec(leftv result, leftv arg)
Definition: shared.cc:2983
int type_shared_list
Definition: shared.cc:240
BOOLEAN setCurrentThreadPool(leftv result, leftv arg)
Definition: shared.cc:2346
BOOLEAN regionLock(leftv result, leftv arg)
Definition: shared.cc:1103
BOOLEAN threadID(leftv result, leftv arg)
Definition: shared.cc:2911
void makeRegionlockType(int &type, const char *name)
Definition: shared.cc:1321
char * rlock_string(blackbox *b, void *d)
Definition: shared.cc:657
void encode_shared(LinTree::LinTree &lintree, leftv val)
Definition: shared.cc:1275
BOOLEAN makeSharedTable(leftv result, leftv arg)
Definition: shared.cc:806
BOOLEAN typeSharedObject(leftv result, leftv arg)
Definition: shared.cc:893
Job * createJob(void(*func)(leftv result, leftv arg))
Definition: shared.cc:2471
void release(Job *job)
VAR long thread_id
Definition: shared.cc:230
STATIC_VAR ThreadPool * currentThreadPoolRef
Definition: shared.cc:1630
Scheduler * scheduler
Definition: shared.cc:1625
BOOLEAN statSyncVar(leftv result, leftv arg)
Definition: shared.cc:1257
void setOption(int ch)
Definition: shared.cc:1368
queue< Job * > JobQueue
Definition: shared.cc:1620
BOOLEAN shared_op2(int op, leftv res, leftv a1, leftv a2)
Definition: shared.cc:585
Lock master_lock(true)
SharedObject * findSharedObject(SharedObjectTable &table, Lock *lock, string &name)
Definition: shared.cc:271
SharedObjectPtr(* SharedConstructor)()
Definition: shared.cc:247
int type_regionlock
Definition: shared.cc:234
int type_trigger
Definition: shared.cc:244
BOOLEAN makeChannel(leftv result, leftv arg)
Definition: shared.cc:841
BOOLEAN shared_op3(int op, leftv res, leftv a1, leftv a2, leftv a3)
Definition: shared.cc:590
void * thread_main(void *arg)
Definition: shared.cc:1385
const char * getJobName()
void acquireShared(SharedObject *obj)
Definition: shared.cc:193
void thread_init()
Definition: shared.cc:1373
static BOOLEAN jobCancelled(leftv result, leftv arg)
Definition: shared.cc:2612
SharedObject * consTable()
Definition: shared.cc:708
static BOOLEAN chainTrigger(leftv result, leftv arg)
Definition: shared.cc:2757
BOOLEAN makeRegion(leftv result, leftv arg)
Definition: shared.cc:867
BOOLEAN putList(leftv result, leftv arg)
Definition: shared.cc:1061
static bool joinInterpreterThread(InterpreterThread *thread)
Definition: shared.cc:1512
void setJobName(const char *)
static BOOLEAN createThreadPoolSet(leftv result, leftv arg)
Definition: shared.cc:2166
void makeSharedType(int &type, const char *name)
Definition: shared.cc:1306
void shared_destroy(blackbox *b, void *d)
Definition: shared.cc:498
static BOOLEAN executeProc(sleftv &result, const char *procname, const vector< leftv > &argv)
Definition: shared.cc:749
SharedObject * consRegion()
Definition: shared.cc:724
BOOLEAN updateSyncVar(leftv result, leftv arg)
Definition: shared.cc:1209
BOOLEAN inTable(leftv result, leftv arg)
Definition: shared.cc:974
leftv getJobResult(Job *job)
Definition: shared.cc:2675
Lock thread_lock
Definition: shared.cc:1364
void installShared(int type)
Definition: shared.cc:1302
BOOLEAN threadResult(leftv result, leftv arg)
Definition: shared.cc:3007
void cancelJob(Job *job)
Definition: shared.cc:2514
void waitJob(Job *job)
Definition: shared.cc:2590
leftv decode_shared(LinTree::LinTree &lintree)
Definition: shared.cc:1281
void init()
Definition: lintree.cc:864
std::string to_string(leftv val)
Definition: lintree.cc:843
void install(int typ, LinTreeEncodeFunc enc, LinTreeDecodeFunc dec, LinTreeRefFunc ref)
Definition: lintree.cc:51
leftv from_string(std::string &str)
Definition: lintree.cc:854
#define omStrDup(s)
Definition: omAllocDecl.h:263
#define omAlloc0Bin(bin)
Definition: omAllocDecl.h:206
#define omFree(addr)
Definition: omAllocDecl.h:261
#define omAlloc0(size)
Definition: omAllocDecl.h:211
#define omFreeBin(addr, bin)
Definition: omAllocDecl.h:259
#define NULL
Definition: omList.c:12
static int index(p_Length length, p_Ord ord)
Definition: p_Procs_Impl.h:592
void Werror(const char *fmt,...)
Definition: reporter.cc:189
#define MAX_THREADS
Definition: shared.cc:1334
void pSingular_initialize_thread()
int SI_MOD_INIT() systhreads(SModulFunctions *fn)
Definition: shared.cc:3078
int status int void * buf
Definition: si_signals.h:59
wait
Definition: si_signals.h:51
bool operator()(const Job *lhs, const Job *rhs)
Definition: shared.cc:1584
sleftv * leftv
Definition: structs.h:57
#define assert(A)
Definition: svd_si.h:3
int name
New type name for int.
Definition: templateForC.h:21
#define IDHDL
Definition: tok.h:31
@ LIST_CMD
Definition: tok.h:118
@ DEF_CMD
Definition: tok.h:58
@ STRING_CMD
Definition: tok.h:185
@ INT_CMD
Definition: tok.h:96
@ MAX_TOK
Definition: tok.h:218
#define NONE
Definition: tok.h:221
#define COMMAND
Definition: tok.h:29