Taskflow  3.2.0-Master-Branch
Loading...
Searching...
No Matches
observer.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "task.hpp"
4#include "worker.hpp"
5
11namespace tf {
12
13// ----------------------------------------------------------------------------
14// timeline data structure
15// ----------------------------------------------------------------------------
16
21
25struct Segment {
26
27 std::string name;
28 TaskType type;
29
32
33 //template <typename Archiver>
34 //auto save(Archiver& ar) const {
35 // return ar(name, type, beg, end);
36 //}
37
38 //template <typename Archiver>
39 //auto load(Archiver& ar) {
40 // return ar(name, type, beg, end);
41 //}
42
43 Segment() = default;
44
45 Segment(
47 ) : name {n}, type {t}, beg {b}, end {e} {
48 }
49
50 auto span() const {
51 return end-beg;
52 }
53};
54
58struct Timeline {
59
60 size_t uid;
61
62 observer_stamp_t origin;
64
65 Timeline() = default;
66
67 Timeline(const Timeline& rhs) = delete;
68 Timeline(Timeline&& rhs) = default;
69
70 Timeline& operator = (const Timeline& rhs) = delete;
71 Timeline& operator = (Timeline&& rhs) = default;
72
73 //template <typename Archiver>
74 //auto save(Archiver& ar) const {
75 // return ar(uid, origin, segments);
76 //}
77
78 //template <typename Archiver>
79 //auto load(Archiver& ar) {
80 // return ar(uid, origin, segments);
81 //}
82};
83
87struct ProfileData {
88
89 std::vector<Timeline> timelines;
90
91 ProfileData() = default;
92
93 ProfileData(const ProfileData& rhs) = delete;
94 ProfileData(ProfileData&& rhs) = default;
95
96 ProfileData& operator = (const ProfileData& rhs) = delete;
97 ProfileData& operator = (ProfileData&&) = default;
98
99 //template <typename Archiver>
100 //auto save(Archiver& ar) const {
101 // return ar(timelines);
102 //}
103
104 //template <typename Archiver>
105 //auto load(Archiver& ar) {
106 // return ar(timelines);
107 //}
108};
109
110// ----------------------------------------------------------------------------
111// observer interface
112// ----------------------------------------------------------------------------
113
170
171 friend class Executor;
172
173 public:
174
178 virtual ~ObserverInterface() = default;
179
184 virtual void set_up(size_t num_workers) = 0;
185
191 virtual void on_entry(WorkerView w, TaskView task_view) = 0;
192
198 virtual void on_exit(WorkerView w, TaskView task_view) = 0;
199};
200
201// ----------------------------------------------------------------------------
202// ChromeObserver definition
203// ----------------------------------------------------------------------------
204
232
233 friend class Executor;
234
235 // data structure to record each task execution
236 struct Segment {
237
238 std::string name;
239
242
243 Segment(
244 const std::string& n,
247 );
248 };
249
250 // data structure to store the entire execution timeline
251 struct Timeline {
252 observer_stamp_t origin;
255 };
256
257 public:
258
263 void dump(std::ostream& ostream) const;
264
268 inline std::string dump() const;
269
273 inline void clear();
274
278 inline size_t num_tasks() const;
279
280 private:
281
282 inline void set_up(size_t num_workers) override final;
283 inline void on_entry(WorkerView w, TaskView task_view) override final;
284 inline void on_exit(WorkerView w, TaskView task_view) override final;
285
286 Timeline _timeline;
287};
288
289// constructor
290inline ChromeObserver::Segment::Segment(
292) :
293 name {n}, beg {b}, end {e} {
294}
295
296// Procedure: set_up
297inline void ChromeObserver::set_up(size_t num_workers) {
298 _timeline.segments.resize(num_workers);
299 _timeline.stacks.resize(num_workers);
300
301 for(size_t w=0; w<num_workers; ++w) {
302 _timeline.segments[w].reserve(32);
303 }
304
305 _timeline.origin = observer_stamp_t::clock::now();
306}
307
308// Procedure: on_entry
309inline void ChromeObserver::on_entry(WorkerView wv, TaskView) {
310 _timeline.stacks[wv.id()].push(observer_stamp_t::clock::now());
311}
312
313// Procedure: on_exit
314inline void ChromeObserver::on_exit(WorkerView wv, TaskView tv) {
315
316 size_t w = wv.id();
317
318 assert(!_timeline.stacks[w].empty());
319
320 auto beg = _timeline.stacks[w].top();
321 _timeline.stacks[w].pop();
322
323 _timeline.segments[w].emplace_back(
324 tv.name(), beg, observer_stamp_t::clock::now()
325 );
326}
327
328// Function: clear
330 for(size_t w=0; w<_timeline.segments.size(); ++w) {
331 _timeline.segments[w].clear();
332 while(!_timeline.stacks[w].empty()) {
333 _timeline.stacks[w].pop();
334 }
335 }
336}
337
338// Procedure: dump
339inline void ChromeObserver::dump(std::ostream& os) const {
340
341 size_t first;
342
343 for(first = 0; first<_timeline.segments.size(); ++first) {
344 if(_timeline.segments[first].size() > 0) {
345 break;
346 }
347 }
348
349 os << '[';
350
351 for(size_t w=first; w<_timeline.segments.size(); w++) {
352
353 if(w != first && _timeline.segments[w].size() > 0) {
354 os << ',';
355 }
356
357 for(size_t i=0; i<_timeline.segments[w].size(); i++) {
358
359 os << '{'
360 << "\"cat\":\"ChromeObserver\",";
361
362 // name field
363 os << "\"name\":\"";
364 if(_timeline.segments[w][i].name.empty()) {
365 os << w << '_' << i;
366 }
367 else {
368 os << _timeline.segments[w][i].name;
369 }
370 os << "\",";
371
372 // segment field
373 os << "\"ph\":\"X\","
374 << "\"pid\":1,"
375 << "\"tid\":" << w << ','
376 << "\"ts\":" << std::chrono::duration_cast<std::chrono::microseconds>(
377 _timeline.segments[w][i].beg - _timeline.origin
378 ).count() << ','
379 << "\"dur\":" << std::chrono::duration_cast<std::chrono::microseconds>(
380 _timeline.segments[w][i].end - _timeline.segments[w][i].beg
381 ).count();
382
383 if(i != _timeline.segments[w].size() - 1) {
384 os << "},";
385 }
386 else {
387 os << '}';
388 }
389 }
390 }
391 os << "]\n";
392}
393
394// Function: dump
397 dump(oss);
398 return oss.str();
399}
400
401// Function: num_tasks
402inline size_t ChromeObserver::num_tasks() const {
403 return std::accumulate(
404 _timeline.segments.begin(), _timeline.segments.end(), size_t{0},
405 [](size_t sum, const auto& exe){
406 return sum + exe.size();
407 }
408 );
409}
410
411// ----------------------------------------------------------------------------
412// TFProfObserver definition
413// ----------------------------------------------------------------------------
414
448
449 friend class Executor;
450 friend class TFProfManager;
451
452 public:
453
458 void dump(std::ostream& ostream) const;
459
463 std::string dump() const;
464
468 void clear();
469
473 size_t num_tasks() const;
474
475 private:
476
477 Timeline _timeline;
478
480
481 inline void set_up(size_t num_workers) override final;
482 inline void on_entry(WorkerView, TaskView) override final;
483 inline void on_exit(WorkerView, TaskView) override final;
484};
485
486// Procedure: set_up
487inline void TFProfObserver::set_up(size_t num_workers) {
488 _timeline.uid = unique_id<size_t>();
489 _timeline.origin = observer_stamp_t::clock::now();
490 _timeline.segments.resize(num_workers);
491 _stacks.resize(num_workers);
492}
493
494// Procedure: on_entry
495inline void TFProfObserver::on_entry(WorkerView wv, TaskView) {
496 _stacks[wv.id()].push(observer_stamp_t::clock::now());
497}
498
499// Procedure: on_exit
500inline void TFProfObserver::on_exit(WorkerView wv, TaskView tv) {
501
502 size_t w = wv.id();
503
504 assert(!_stacks[w].empty());
505
506 if(_stacks[w].size() > _timeline.segments[w].size()) {
507 _timeline.segments[w].resize(_stacks[w].size());
508 }
509
510 auto beg = _stacks[w].top();
511 _stacks[w].pop();
512
513 _timeline.segments[w][_stacks[w].size()].emplace_back(
514 tv.name(), tv.type(), beg, observer_stamp_t::clock::now()
515 );
516}
517
518// Function: clear
520 for(size_t w=0; w<_timeline.segments.size(); ++w) {
521 for(size_t l=0; l<_timeline.segments[w].size(); ++l) {
522 _timeline.segments[w][l].clear();
523 }
524 while(!_stacks[w].empty()) {
525 _stacks[w].pop();
526 }
527 }
528}
529
530// Procedure: dump
531inline void TFProfObserver::dump(std::ostream& os) const {
532
533 size_t first;
534
535 for(first = 0; first<_timeline.segments.size(); ++first) {
536 if(_timeline.segments[first].size() > 0) {
537 break;
538 }
539 }
540
541 // not timeline data to dump
542 if(first == _timeline.segments.size()) {
543 os << "{}\n";
544 return;
545 }
546
547 os << "{\"executor\":\"" << _timeline.uid << "\",\"data\":[";
548
549 bool comma = false;
550
551 for(size_t w=first; w<_timeline.segments.size(); w++) {
552 for(size_t l=0; l<_timeline.segments[w].size(); l++) {
553
554 if(_timeline.segments[w][l].empty()) {
555 continue;
556 }
557
558 if(comma) {
559 os << ',';
560 }
561 else {
562 comma = true;
563 }
564
565 os << "{\"worker\":" << w << ",\"level\":" << l << ",\"data\":[";
566 for(size_t i=0; i<_timeline.segments[w][l].size(); ++i) {
567
568 const auto& s = _timeline.segments[w][l][i];
569
570 if(i) os << ',';
571
572 // span
573 os << "{\"span\":["
574 << std::chrono::duration_cast<std::chrono::microseconds>(
575 s.beg - _timeline.origin
576 ).count() << ","
577 << std::chrono::duration_cast<std::chrono::microseconds>(
578 s.end - _timeline.origin
579 ).count() << "],";
580
581 // name
582 os << "\"name\":\"";
583 if(s.name.empty()) {
584 os << w << '_' << i;
585 }
586 else {
587 os << s.name;
588 }
589 os << "\",";
590
591 // category "type": "Condition Task",
592 os << "\"type\":\"" << to_string(s.type) << "\"";
593
594 os << "}";
595 }
596 os << "]}";
597 }
598 }
599
600 os << "]}\n";
601}
602
603// Function: dump
606 dump(oss);
607 return oss.str();
608}
609
610// Function: num_tasks
611inline size_t TFProfObserver::num_tasks() const {
612 return std::accumulate(
613 _timeline.segments.begin(), _timeline.segments.end(), size_t{0},
614 [](size_t sum, const auto& exe){
615 return sum + exe.size();
616 }
617 );
618}
619
620// ----------------------------------------------------------------------------
621// TFProfManager
622// ----------------------------------------------------------------------------
623
627class TFProfManager {
628
629 friend class Executor;
630
631 public:
632
633 ~TFProfManager();
634
635 TFProfManager(const TFProfManager&) = delete;
636 TFProfManager& operator=(const TFProfManager&) = delete;
637
638 static TFProfManager& get();
639
640 void dump(std::ostream& ostream) const;
641
642 private:
643
644 const std::string _fpath;
645
646 std::mutex _mutex;
648
649 TFProfManager();
650
651 void _manage(std::shared_ptr<TFProfObserver> observer);
652};
653
654// constructor
655inline TFProfManager::TFProfManager() :
656 _fpath {get_env(TF_ENABLE_PROFILER)} {
657
658}
659
660// Procedure: manage
661inline void TFProfManager::_manage(std::shared_ptr<TFProfObserver> observer) {
662 std::lock_guard lock(_mutex);
663 _observers.push_back(std::move(observer));
664}
665
666// Procedure: dump
667inline void TFProfManager::dump(std::ostream& os) const {
668 for(size_t i=0; i<_observers.size(); ++i) {
669 if(i) os << ',';
670 _observers[i]->dump(os);
671 }
672}
673
674// Destructor
675inline TFProfManager::~TFProfManager() {
676 std::ofstream ofs(_fpath);
677 if(ofs) {
679 //if(_fpath.rfind(".tfp") != std::string::npos) {
680 // ProfileData data;
681 // data.timelines.reserve(_observers.size());
682 // for(size_t i=0; i<_observers.size(); ++i) {
683 // data.timelines.push_back(std::move(_observers[i]->_timeline));
684 // }
685 // Serializer<std::ofstream> serializer(ofs);
686 // serializer(data);
687 //}
689 //else {
690 // ofs << "[\n";
691 // for(size_t i=0; i<_observers.size(); ++i) {
692 // if(i) ofs << ',';
693 // _observers[i]->dump(ofs);
694 // }
695 // ofs << "]\n";
696 //}
697
698 ofs << "[\n";
699 for(size_t i=0; i<_observers.size(); ++i) {
700 if(i) ofs << ',';
701 _observers[i]->dump(ofs);
702 }
703 ofs << "]\n";
704 }
705}
706
707// Function: get
708inline TFProfManager& TFProfManager::get() {
709 static TFProfManager mgr;
710 return mgr;
711}
712
713// ----------------------------------------------------------------------------
714// Identifier for Each Built-in Observer
715// ----------------------------------------------------------------------------
716
722enum class ObserverType : int {
723 TFPROF = 0,
724 CHROME,
726};
727
731inline const char* to_string(ObserverType type) {
732 switch(type) {
733 case ObserverType::TFPROF: return "tfprof";
734 case ObserverType::CHROME: return "chrome";
735 default: return "undefined";
736 }
737}
738
739
740} // end of namespace tf -----------------------------------------------------
741
742
T accumulate(T... args)
class to create an observer based on Chrome tracing format
Definition observer.hpp:231
void clear()
clears the timeline data
Definition observer.hpp:329
std::string dump() const
dumps the timelines into a Chrome Tracing format
Definition observer.hpp:395
size_t num_tasks() const
queries the number of tasks observed
Definition observer.hpp:402
class to create an executor for running a taskflow graph
Definition executor.hpp:50
class to derive an executor observer
Definition observer.hpp:169
virtual void on_entry(WorkerView w, TaskView task_view)=0
method to call before a worker thread executes a closure
virtual void set_up(size_t num_workers)=0
constructor-like method to call when the executor observer is fully created
virtual void on_exit(WorkerView w, TaskView task_view)=0
method to call after a worker thread executed a closure
virtual ~ObserverInterface()=default
virtual destructor
class to create an observer based on the built-in taskflow profiler format
Definition observer.hpp:447
std::string dump() const
dumps the timelines into a JSON string
Definition observer.hpp:604
void clear()
clears the timeline data
Definition observer.hpp:519
size_t num_tasks() const
queries the number of tasks observed
Definition observer.hpp:611
class to access task information from the observer interface
Definition task.hpp:638
class to create an immutable view of a worker in an executor
Definition worker.hpp:78
T lock(T... args)
T move(T... args)
taskflow namespace
Definition small_vector.hpp:27
TaskType
enumeration of all task types
Definition task.hpp:21
@ UNDEFINED
undefined task type (for internal use only)
const char * to_string(TaskType type)
convert a task type to a human-readable string
Definition task.hpp:81
ObserverType
enumeration of all observer types
Definition observer.hpp:722
std::chrono::time_point< std::chrono::steady_clock > observer_stamp_t
default time point type of observers
Definition observer.hpp:20
task include file
worker include file