LDMX Software
framework::Process Class Reference

Class which represents the process under execution. More...

#include <Process.h>

Public Member Functions

 Process (const framework::config::Parameters &configuration)
 Class constructor.
 
 ~Process ()
 Class Destructor.
 
const std::string & getPassName () const
 Get the processing pass label.
 
int getRunNumber () const
 Get the current run number or the run number to be used when initiating new events from the job.
 
const ldmx::EventHeadergetEventHeader () const
 Get the pointer to the current event header, if defined.
 
const ldmx::RunHeadergetRunHeader () const
 Get the pointer to the current run header, if defined.
 
ConditionsgetConditions ()
 Get a reference to the conditions system.
 
int getLogFrequency () const
 Get the frequency with which the event information is printed.
 
void run ()
 Run the process.
 
void requestFinish ()
 Request that the processing finish with this event.
 
TDirectory * makeHistoDirectory (const std::string &dirName)
 Construct a TDirectory* for the given module.
 
TDirectory * openHistoFile ()
 Open a ROOT TFile to write histograms and TTrees.
 
StorageControlgetStorageController ()
 Access the storage control unit for this process.
 
void setEventHeader (ldmx::EventHeader *h)
 Set the pointer to the current event header, used only for tests.
 

Private Member Functions

bool process (int n, int n_tries, Event &event) const
 Process the input event through the sequence of processors.
 
void newRun (ldmx::RunHeader &header)
 Run through the processors and let them know that we are starting a new run.
 
void onFileOpen (EventFile &file) const
 File is being opened.
 
void onFileClose (EventFile &file) const
 File is begin closed.
 
 enableLogging ("Process")
 Turn on logging for our process.
 

Private Attributes

framework::config::Parameters config_
 The parameters used to configure this class.
 
std::string pass_name_
 Processing pass name.
 
int eventLimit_
 Limit on events to process.
 
int minEvents_
 When reading a file in, what's the first event to read.
 
int totalEvents_
 Number of events we'd like to produce independetly of the number of tries it would take.
 
int logFrequency_
 The frequency with which event info is printed.
 
int maxTries_
 Maximum number of attempts to make before giving up on an event.
 
bool skipCorruptedInputFiles_
 allow the Process to skip input files that are corrupted
 
StorageControl storageController_
 Storage controller.
 
std::vector< EventProcessor * > sequence_
 Ordered list of EventProcessors to execute.
 
Conditions conditions_
 Set of ConditionsProviders.
 
std::vector< std::string > inputFiles_
 List of input files to process.
 
std::vector< std::string > outputFiles_
 List of output file names.
 
int compressionSetting_
 Compression setting to pass to output files.
 
std::vector< std::string > dropKeepRules_
 Set of drop/keep rules.
 
int runForGeneration_ {1}
 Run number to use if generating events.
 
std::string histoFilename_
 Filename for histograms and other user products.
 
const ldmx::EventHeadereventHeader_ {0}
 Pointer to the current EventHeader, used for Conditions information.
 
ldmx::RunHeaderrunHeader_ {0}
 Pointer to the current RunHeader, used for Conditions information.
 
TFile * histoTFile_ {0}
 TFile for histograms and other user products.
 
performance::Trackerperformance_ {0}
 class with calls backs to track performance measurements of software
 

Detailed Description

Class which represents the process under execution.

Definition at line 36 of file Process.h.

Constructor & Destructor Documentation

◆ Process()

framework::Process::Process ( const framework::config::Parameters & configuration)

Class constructor.

Parameters
configurationParameters to configure process with

Definition at line 23 of file Process.cxx.

24 : conditions_{*this} {
25 config_ = configuration;
26
27 pass_name_ = configuration.getParameter<std::string>("passName", "");
28 histoFilename_ = configuration.getParameter<std::string>("histogramFile", "");
29
30 maxTries_ = configuration.getParameter<int>("maxTriesPerEvent", 1);
31 eventLimit_ = configuration.getParameter<int>("maxEvents", -1);
32 minEvents_ = configuration.getParameter<int>("minEvents", -1);
33 totalEvents_ = configuration.getParameter<int>("totalEvents", -1);
34 logFrequency_ = configuration.getParameter<int>("logFrequency", -1);
36 configuration.getParameter<int>("compressionSetting", 9);
38 configuration.getParameter<bool>("skipCorruptedInputFiles", false);
39
41 configuration.getParameter<std::vector<std::string>>("inputFiles", {});
43 configuration.getParameter<std::vector<std::string>>("outputFiles", {});
45 configuration.getParameter<std::vector<std::string>>("keep", {});
46
47 eventHeader_ = 0;
48
49 // set up the logging for this run
50 logging::open(
51 configuration.getParameter<framework::config::Parameters>("logger", {}));
52
53 auto run{configuration.getParameter<int>("run", -1)};
54 if (run > 0) runForGeneration_ = run;
55
56 auto libs{
57 configuration.getParameter<std::vector<std::string>>("libraries", {})};
58 std::for_each(libs.begin(), libs.end(), [](auto &lib) {
59 PluginFactory::getInstance().loadLibrary(lib);
60 });
61
63 configuration.getParameter<bool>("skimDefaultIsKeep", true));
64 auto skimRules{
65 configuration.getParameter<std::vector<std::string>>("skimRules", {})};
66 for (size_t i = 0; i < skimRules.size(); i += 2) {
67 storageController_.addRule(skimRules[i], skimRules[i + 1]);
68 }
69
70 auto sequence{
71 configuration.getParameter<std::vector<framework::config::Parameters>>(
72 "sequence", {})};
73 if (sequence.empty() &&
74 configuration.getParameter<bool>("testingMode", false)) {
75 EXCEPTION_RAISE(
76 "NoSeq",
77 "No sequence has been defined. What should I be doing?\nUse "
78 "p.sequence to tell me what processors to run.");
79 }
80 for (auto proc : sequence) {
81 auto className{proc.getParameter<std::string>("className")};
82 auto instanceName{proc.getParameter<std::string>("instanceName")};
84 className, instanceName, *this);
85 if (ep == 0) {
86 EXCEPTION_RAISE(
87 "UnableToCreate",
88 "Unable to create instance '" + instanceName + "' of class '" +
89 className +
90 "'. Did you load the library that this class is apart of?");
91 }
92 auto histograms{
93 proc.getParameter<std::vector<framework::config::Parameters>>(
94 "histograms", {})};
95 if (!histograms.empty()) {
96 ep->getHistoDirectory();
97 ep->createHistograms(histograms);
98 }
99 ep->configure(proc);
100 sequence_.push_back(ep);
101 }
102
103 auto conditionsObjectProviders{
104 configuration.getParameter<std::vector<framework::config::Parameters>>(
105 "conditionsObjectProviders", {})};
106 for (auto cop : conditionsObjectProviders) {
107 auto className{cop.getParameter<std::string>("className")};
108 auto objectName{cop.getParameter<std::string>("objectName")};
109 auto tagName{cop.getParameter<std::string>("tagName")};
110
111 conditions_.createConditionsObjectProvider(className, objectName, tagName,
112 cop);
113 }
114
115 bool logPerformance =
116 configuration.getParameter<bool>("logPerformance", false);
117 if (logPerformance) {
118 std::vector<std::string> names{sequence_.size()};
119 for (std::size_t i{0}; i < sequence_.size(); i++) {
120 names[i] = sequence_[i]->getName();
121 }
123 new performance::Tracker(makeHistoDirectory("performance"), names);
124 }
125}
void createConditionsObjectProvider(const std::string &classname, const std::string &instancename, const std::string &tagname, const framework::config::Parameters &params)
Create a ConditionsObjectProvider given the information.
virtual void configure(framework::config::Parameters &parameters)
Callback for the EventProcessor to configure itself from the given set of parameters.
EventProcessor * createEventProcessor(const std::string &classname, const std::string &moduleInstanceName, Process &process)
Make an event processor.
static PluginFactory & getInstance()
Get the factory instance.
std::vector< EventProcessor * > sequence_
Ordered list of EventProcessors to execute.
Definition Process.h:182
std::vector< std::string > outputFiles_
List of output file names.
Definition Process.h:192
bool skipCorruptedInputFiles_
allow the Process to skip input files that are corrupted
Definition Process.h:176
std::vector< std::string > dropKeepRules_
Set of drop/keep rules.
Definition Process.h:204
void run()
Run the process.
Definition Process.cxx:141
std::string pass_name_
Processing pass name.
Definition Process.h:154
std::string histoFilename_
Filename for histograms and other user products.
Definition Process.h:210
StorageControl storageController_
Storage controller.
Definition Process.h:179
int totalEvents_
Number of events we'd like to produce independetly of the number of tries it would take.
Definition Process.h:165
int maxTries_
Maximum number of attempts to make before giving up on an event.
Definition Process.h:171
std::vector< std::string > inputFiles_
List of input files to process.
Definition Process.h:189
int minEvents_
When reading a file in, what's the first event to read.
Definition Process.h:160
TDirectory * makeHistoDirectory(const std::string &dirName)
Construct a TDirectory* for the given module.
Definition Process.cxx:429
performance::Tracker * performance_
class with calls backs to track performance measurements of software
Definition Process.h:222
const ldmx::EventHeader * eventHeader_
Pointer to the current EventHeader, used for Conditions information.
Definition Process.h:213
int compressionSetting_
Compression setting to pass to output files.
Definition Process.h:201
int logFrequency_
The frequency with which event info is printed.
Definition Process.h:168
int runForGeneration_
Run number to use if generating events.
Definition Process.h:207
Conditions conditions_
Set of ConditionsProviders.
Definition Process.h:185
int eventLimit_
Limit on events to process.
Definition Process.h:157
framework::config::Parameters config_
The parameters used to configure this class.
Definition Process.h:151
void setDefaultKeep(bool keep)
Set the default state.
void addRule(const std::string &processor_pat, const std::string &purpose_pat)
Add a listening rule.
Class encapsulating parameters for configuring a processor.
Definition Parameters.h:29

References framework::StorageControl::addRule(), compressionSetting_, conditions_, config_, framework::EventProcessor::configure(), framework::Conditions::createConditionsObjectProvider(), framework::PluginFactory::createEventProcessor(), framework::EventProcessor::createHistograms(), dropKeepRules_, eventHeader_, eventLimit_, framework::EventProcessor::getHistoDirectory(), framework::PluginFactory::getInstance(), histoFilename_, inputFiles_, logFrequency_, makeHistoDirectory(), maxTries_, minEvents_, outputFiles_, pass_name_, performance_, run(), runForGeneration_, sequence_, framework::StorageControl::setDefaultKeep(), skipCorruptedInputFiles_, storageController_, and totalEvents_.

◆ ~Process()

framework::Process::~Process ( )

Class Destructor.

Cleans up sequence of EventProcessors. These processors were created by ConfigurePython and should be deleted.

Definition at line 127 of file Process.cxx.

127 {
128 // need to delete the performance object so that it is
129 // written before we close the histogram file below
130 if (performance_) delete performance_;
131 for (EventProcessor *ep : sequence_) {
132 delete ep;
133 }
134 if (histoTFile_) {
135 histoTFile_->Write();
136 delete histoTFile_;
137 histoTFile_ = 0;
138 }
139}
TFile * histoTFile_
TFile for histograms and other user products.
Definition Process.h:219

References histoTFile_, performance_, and sequence_.

Member Function Documentation

◆ getConditions()

Conditions & framework::Process::getConditions ( )
inline

Get a reference to the conditions system.

Definition at line 78 of file Process.h.

78{ return conditions_; }

References conditions_.

Referenced by framework::EventProcessor::getConditions(), and framework::ConditionsObjectProvider::requestParentCondition().

◆ getEventHeader()

const ldmx::EventHeader * framework::Process::getEventHeader ( ) const
inline

Get the pointer to the current event header, if defined.

Definition at line 68 of file Process.h.

68{ return eventHeader_; }

References eventHeader_.

Referenced by framework::Conditions::getConditionPtr(), and framework::EventProcessor::getEventHeader().

◆ getLogFrequency()

int framework::Process::getLogFrequency ( ) const
inline

Get the frequency with which the event information is printed.

Returns
integer log frequency (negative if turned off)

Definition at line 84 of file Process.h.

84{ return logFrequency_; }

References logFrequency_.

Referenced by framework::EventProcessor::getLogFrequency().

◆ getPassName()

const std::string & framework::Process::getPassName ( ) const
inline

Get the processing pass label.

Returns
The processing pass label.

Definition at line 56 of file Process.h.

56{ return pass_name_; }

References pass_name_.

Referenced by framework::RandomNumberSeedService::onNewRun().

◆ getRunHeader()

const ldmx::RunHeader * framework::Process::getRunHeader ( ) const
inline

Get the pointer to the current run header, if defined.

Definition at line 73 of file Process.h.

73{ return runHeader_; }
ldmx::RunHeader * runHeader_
Pointer to the current RunHeader, used for Conditions information.
Definition Process.h:216

References runHeader_.

◆ getRunNumber()

int framework::Process::getRunNumber ( ) const

Get the current run number or the run number to be used when initiating new events from the job.

Returns
int Run number

Definition at line 425 of file Process.cxx.

425 {
427}
int getRun() const
Return the run number.
Definition EventHeader.h:84

References eventHeader_, ldmx::EventHeader::getRun(), and runForGeneration_.

Referenced by framework::EventProcessor::getRunNumber().

◆ getStorageController()

StorageControl & framework::Process::getStorageController ( )
inline

Access the storage control unit for this process.

Definition at line 109 of file Process.h.

109{ return storageController_; }

References storageController_.

Referenced by framework::EventProcessor::setStorageHint().

◆ makeHistoDirectory()

TDirectory * framework::Process::makeHistoDirectory ( const std::string & dirName)

Construct a TDirectory* for the given module.

Definition at line 429 of file Process.cxx.

429 {
430 auto owner{openHistoFile()};
431 TDirectory *child = owner->mkdir((char *)dirName.c_str());
432 if (child) child->cd();
433 return child;
434}
TDirectory * openHistoFile()
Open a ROOT TFile to write histograms and TTrees.
Definition Process.cxx:436

References openHistoFile().

Referenced by framework::EventProcessor::getHistoDirectory(), and Process().

◆ newRun()

void framework::Process::newRun ( ldmx::RunHeader & header)
private

Run through the processors and let them know that we are starting a new run.

Parameters
[in]headerRunHeader for the new run

Definition at line 457 of file Process.cxx.

457 {
458 // Producers are allowed to put parameters into
459 // the run header through 'beforeNewRun' method
460
461 // Put the version into the rh string param
462 header.setStringParameter("Pass = " + pass_name_ + ", version",
463 LDMXSW_VERSION);
464 if (performance_) performance_->start(performance::Callback::beforeNewRun, 0);
465 std::size_t i_proc{0};
466 for (auto module : sequence_) {
467 i_proc++;
468 if (dynamic_cast<Producer *>(module)) {
469 if (performance_)
470 performance_->start(performance::Callback::beforeNewRun, i_proc);
471 dynamic_cast<Producer *>(module)->beforeNewRun(header);
472 if (performance_)
473 performance_->stop(performance::Callback::beforeNewRun, i_proc);
474 }
475 }
476 if (performance_) performance_->stop(performance::Callback::beforeNewRun, 0);
477 // now run header has been modified by Producers,
478 // it is valid to read from for everyone else in 'onNewRun'
479 if (performance_) performance_->start(performance::Callback::onNewRun, 0);
480 conditions_.onNewRun(header);
481 i_proc = 0;
482 for (auto module : sequence_) {
483 i_proc++;
484 if (performance_)
485 performance_->start(performance::Callback::onNewRun, i_proc);
486 module->onNewRun(header);
487 if (performance_)
488 performance_->stop(performance::Callback::onNewRun, i_proc);
489 }
490 if (performance_) performance_->stop(performance::Callback::onNewRun, 0);
491 ldmx_log(info) << header;
492}
void onNewRun(ldmx::RunHeader &)
Calls onNewRun for all ConditionsObjectProviders.
void start(Callback cb, std::size_t i_proc)
start the timer for a specific callback and specific processor
Definition Tracker.cxx:90
void stop(Callback cb, std::size_t i_proc)
stop the timer for a specific callback and specific processor
Definition Tracker.cxx:94
void setStringParameter(const std::string &name, std::string value)
Set a string parameter value.
Definition RunHeader.h:222

References conditions_, framework::Conditions::onNewRun(), pass_name_, performance_, sequence_, ldmx::RunHeader::setStringParameter(), framework::performance::Tracker::start(), and framework::performance::Tracker::stop().

Referenced by run().

◆ onFileClose()

void framework::Process::onFileClose ( EventFile & file) const
private

File is begin closed.

Definition at line 549 of file Process.cxx.

549 {
550 if (performance_) performance_->start(performance::Callback::onFileClose, 0);
551 std::size_t i_proc{0};
552 for (auto module : sequence_) {
553 i_proc++;
554 if (performance_)
555 performance_->start(performance::Callback::onFileClose, i_proc);
556 module->onFileClose(file);
557 if (performance_)
558 performance_->stop(performance::Callback::onFileClose, i_proc);
559 }
560 if (performance_) performance_->stop(performance::Callback::onFileClose, 0);
561}

References performance_, sequence_, framework::performance::Tracker::start(), and framework::performance::Tracker::stop().

Referenced by run().

◆ onFileOpen()

void framework::Process::onFileOpen ( EventFile & file) const
private

File is being opened.

Definition at line 535 of file Process.cxx.

535 {
536 if (performance_) performance_->start(performance::Callback::onFileOpen, 0);
537 std::size_t i_proc{0};
538 for (auto module : sequence_) {
539 i_proc++;
540 if (performance_)
541 performance_->start(performance::Callback::onFileOpen, i_proc);
542 module->onFileOpen(file);
543 if (performance_)
544 performance_->stop(performance::Callback::onFileOpen, i_proc);
545 }
546 if (performance_) performance_->stop(performance::Callback::onFileOpen, 0);
547}

References performance_, sequence_, framework::performance::Tracker::start(), and framework::performance::Tracker::stop().

Referenced by run().

◆ openHistoFile()

TDirectory * framework::Process::openHistoFile ( )

Open a ROOT TFile to write histograms and TTrees.

Definition at line 436 of file Process.cxx.

436 {
437 TDirectory *owner{nullptr};
438
439 if (histoFilename_.empty()) {
440 // trying to write histograms/ntuples but no file defined
441 EXCEPTION_RAISE(
442 "NoHistFileName",
443 "You did not provide the necessary histogram file name to "
444 "put your histograms (or performance data) in.\n Provide this "
445 "name in the python configuration with 'p.histogramFile = "
446 "\"myHistFile.root\"' where p is the Process object.");
447 } else if (histoTFile_ == nullptr) {
448 histoTFile_ = new TFile(histoFilename_.c_str(), "RECREATE");
449 owner = histoTFile_;
450 } else
451 owner = histoTFile_;
452 owner->cd();
453
454 return owner;
455}

References histoFilename_, and histoTFile_.

Referenced by makeHistoDirectory().

◆ process()

bool framework::Process::process ( int n,
int n_tries,
Event & event ) const
private

Process the input event through the sequence of processors.

The input counters (for events and tries) are only used to print the status.

Parameters
[in]ncounter for number of events processed
[in]n_triescounter for number of tries on current event
[in,out]eventreference to event we are going to process
Returns
true if event was full processed (false if aborted)

Definition at line 494 of file Process.cxx.

494 {
495 if ((logFrequency_ != -1) && ((n + 1) % logFrequency_ == 0) && (n_try < 2)) {
496 // only printout event counter if we've enabled log frequency, the event
497 // matches the frequency and we are on the first try
498 TTimeStamp t;
499 ldmx_log(info) << "Processing " << n + 1 << " Run "
500 << event.getEventHeader().getRun() << " Event "
501 << event.getEventHeader().getEventNumber() << " ("
502 << t.AsString("lc") << ")";
503 }
504
505 if (performance_) performance_->start(performance::Callback::process, 0);
506 std::size_t i_proc{0};
507 try {
508 for (auto module : sequence_) {
509 i_proc++;
510 if (performance_)
511 performance_->start(performance::Callback::process, i_proc);
512 if (dynamic_cast<Producer *>(module)) {
513 (dynamic_cast<Producer *>(module))->produce(event);
514 } else if (dynamic_cast<Analyzer *>(module)) {
515 (dynamic_cast<Analyzer *>(module))->analyze(event);
516 }
517 if (performance_)
518 performance_->stop(performance::Callback::process, i_proc);
519 }
520 } catch (AbortEventException &) {
521 if (performance_) {
522 performance_->stop(performance::Callback::process, i_proc);
523 performance_->stop(performance::Callback::process, 0);
524 performance_->end_event(false);
525 }
526 return false;
527 }
528 if (performance_) {
529 performance_->stop(performance::Callback::process, 0);
530 performance_->end_event(true);
531 }
532 return true;
533}
void end_event(bool completed)
inform us that we finished an event (and whether it was completed or not)
Definition Tracker.cxx:98

References framework::performance::Tracker::end_event(), logFrequency_, performance_, sequence_, framework::performance::Tracker::start(), and framework::performance::Tracker::stop().

Referenced by run().

◆ requestFinish()

void framework::Process::requestFinish ( )
inline

Request that the processing finish with this event.

Definition at line 94 of file Process.h.

94{ eventLimit_ = 0; }

References eventLimit_.

◆ run()

void framework::Process::run ( )

Run the process.

Definition at line 141 of file Process.cxx.

141 {
143
144 // Counter to keep track of the number of events that have been
145 // procesed
146 auto n_events_processed{0};
147
148 // make sure the ntuple manager is in a blank state
150
151 // event bus for this process
152 Event theEvent(pass_name_);
153 // the EventHeader object is created with the event bus as
154 // one of its members, we obtain a pointer for the header
155 // here so we can share it with the conditions system
156 eventHeader_ = theEvent.getEventHeaderPtr();
157 theEvent.getEventHeader().setRun(runForGeneration_);
158
159 // Start by notifying everyone that modules processing is beginning
160 std::size_t i_proc{0};
161 if (performance_)
162 performance_->start(performance::Callback::onProcessStart, 0);
164 for (auto module : sequence_) {
165 i_proc++;
166 if (performance_)
167 performance_->start(performance::Callback::onProcessStart, i_proc);
168 module->onProcessStart();
169 if (performance_)
170 performance_->stop(performance::Callback::onProcessStart, i_proc);
171 }
172 if (performance_)
173 performance_->stop(performance::Callback::onProcessStart, 0);
174
175 // If we have no input files, but do have an event number, run for
176 // that number of events and generate an output file.
177 if (inputFiles_.empty() && eventLimit_ > 0) {
178 if (outputFiles_.empty()) {
179 EXCEPTION_RAISE("InvalidConfig",
180 "No input files or output files were given.");
181 } else if (outputFiles_.size() > 1) {
182 ldmx_log(warn) << "Several output files given with no input files. "
183 << "Only the first output file '" << outputFiles_.at(0)
184 << "' will be used.";
185 }
186 std::string outputFileName = outputFiles_.at(0);
187
188 // Configure the event file to create an output file with no parent. This
189 // requires setting the parameters isOutputFile and isSingleOutput to true.
190 EventFile outFile(config_, outputFileName, nullptr, true, true, false);
191 onFileOpen(outFile);
192 outFile.setupEvent(&theEvent);
193
194 for (auto rule : dropKeepRules_) outFile.addDrop(rule);
195
197 runHeader.setRunStart(std::time(nullptr)); // set run starting
198 runHeader_ = &runHeader; // give handle to run header to process
199 outFile.writeRunHeader(runHeader); // add run header to file
200
201 newRun(runHeader);
202
203 int totalTries = 0; // total number of tries for entire run
204 int numTries = 0; // number of tries for the current event number
205 int event_limit = eventLimit_;
206 if (totalEvents_ > 0) {
207 // Have a warning at the first event
208 if (numTries == 0)
209 ldmx_log(warn) << "The totalEvents was set, so maxEvents and "
210 "maxTriesPerEvent will be ignored!";
211 event_limit = totalEvents_;
212 }
213 while (n_events_processed < event_limit) {
214 totalTries++;
215 numTries++;
216
217 ldmx::EventHeader &eh = theEvent.getEventHeader();
219 eh.setEventNumber(n_events_processed + 1);
220 eh.setTimestamp(TTimeStamp());
221
222 // reset the storage controller state
224 logging::Formatter::set(theEvent.getEventNumber());
225
226 bool completed = process(n_events_processed, numTries, theEvent);
227
228 outFile.nextEvent(storageController_.keepEvent(completed));
229
230 // reset try counter only on successfully completed events
231 if (completed) numTries = 0;
232
233 // we use modulo here insetad of >= because we want to carry
234 // the number of tries across the number of events processed boundary
235 // totalEvents_ is set let's not exit until that's reached
236 if (completed or (totalEvents_ < 0 and numTries % maxTries_ == 0)) {
237 n_events_processed++; // increment events made
238 NtupleManager::getInstance().fill(); // fill ntuples
239 }
240
242 }
243
244 onFileClose(outFile);
245
246 runHeader.setRunEnd(std::time(nullptr));
247 runHeader.setNumTries(totalTries);
248 outFile.writeRunTree();
249
250 // Give a warning that this filter has very low efficiency
251 if (n_events_processed < totalTries / 10000) { // integer division is okay
252 ldmx_log(warn)
253 << "Less than 1 event out of every 10k events tried was accepted!";
254 ldmx_log(warn)
255 << "This could be an issue with your filtering and biasing procedure "
256 "since this is incredibly inefficient.";
257 }
258
259 } else {
260 // there are input files
261
262 EventFile *outFile(0);
263
264 bool singleOutput = false;
265 if (outputFiles_.size() == 1) {
266 singleOutput = true;
267 } else if (!outputFiles_.empty() and
268 outputFiles_.size() != inputFiles_.size()) {
269 EXCEPTION_RAISE("Process",
270 "Unable to handle case of different number of input and "
271 "output files (other than zero/one ouput file).");
272 }
273
274 // next, loop through the files
275 int ifile = 0;
276 int wasRun = -1;
277 for (auto infilename : inputFiles_) {
278 EventFile inFile(config_, infilename);
279 if (inFile.isCorrupted()) {
281 ldmx_log(warn) << "Input file '" << infilename
282 << "' was found to be corrupted. Skipping.";
283 continue;
284 } else {
285 EXCEPTION_RAISE(
286 "BadCode",
287 "We should never get here. "
288 "EventFile is corrupted but we aren't skipping corrupted inputs. "
289 "EventFile should be throwing its own exceptions in this case.");
290 }
291 }
292
293 ldmx_log(info) << "Opening file " << infilename;
294 onFileOpen(inFile);
295
296 // configure event file that will be iterated over
297 EventFile *masterFile;
298 if (!outputFiles_.empty()) {
299 // setup new output file if either
300 // 1) we are not in single output mode
301 // 2) this is the first input file
302 if (!singleOutput or ifile == 0) {
303 // setup new output file
304 outFile = new EventFile(config_, outputFiles_[ifile], &inFile,
305 singleOutput);
306 ifile++;
307
308 // setup theEvent we will iterate over
309 if (outFile) {
310 outFile->setupEvent(&theEvent);
311 masterFile = outFile;
312 } else {
313 EXCEPTION_RAISE("Process", "Unable to construct output file for " +
314 outputFiles_[ifile]);
315 }
316
317 for (auto rule : dropKeepRules_) outFile->addDrop(rule);
318
319 } else {
320 // all other input files
321 outFile->updateParent(&inFile);
322 masterFile = outFile;
323
324 } // check if in singleOutput mode
325
326 } else {
327 // empty output file list, use inputFile as master file
328 inFile.setupEvent(&theEvent);
329 masterFile = &inFile;
330 }
331
332 // In case we'd like to skip up to the event of minEvents_
333 while (n_events_processed < (minEvents_ - 1) &&
334 masterFile->nextEvent(false)) {
335 n_events_processed++;
336 }
337
338 bool event_completed = true;
339 while (masterFile->nextEvent(
340 storageController_.keepEvent(event_completed)) &&
341 (eventLimit_ < 0 || (n_events_processed) < eventLimit_)) {
342 // clean up for storage control calculation
344 logging::Formatter::set(theEvent.getEventNumber());
345
346 // notify for new run if necessary
347 if (theEvent.getEventHeader().getRun() != wasRun) {
348 wasRun = theEvent.getEventHeader().getRun();
349 ldmx::RunHeader *rh{masterFile->getRunHeaderPtr(wasRun)};
350 if (rh != nullptr) {
351 runHeader_ = rh;
352 ldmx_log(info) << "Got new run header from '"
353 << masterFile->getFileName() << "'";
355 } else {
356 ldmx_log(warn) << "Run header for run " << wasRun
357 << " was not found!";
358 }
359 }
360
361 event_completed = process(n_events_processed, 1, theEvent);
362
363 if (event_completed) NtupleManager::getInstance().fill();
365
366 n_events_processed++;
367 } // loop through events
368
369 bool leave_early{false};
370 if (eventLimit_ > 0 && n_events_processed == eventLimit_) {
371 ldmx_log(info) << "Reached event limit of " << eventLimit_ << " events";
372 leave_early = true;
373 }
374
375 if (eventLimit_ == 0 && n_events_processed > eventLimit_) {
376 ldmx_log(warn) << "Processing interrupted";
377 leave_early = true;
378 }
379
380 ldmx_log(info) << "Closing file " << infilename;
381 onFileClose(inFile);
382
383 // Reset the event in case of multiple input files
384 theEvent.onEndOfFile();
385
386 if (outFile and !singleOutput) {
387 outFile->writeRunTree();
388 delete outFile;
389 outFile = nullptr;
390 }
391
392 if (leave_early) {
393 break;
394 }
395 } // loop through input files
396
397 if (outFile) {
398 // close outFile
399 // outFile would survive to here in single output mode
400 outFile->writeRunTree();
401 delete outFile;
402 outFile = nullptr;
403 }
404
405 } // are there input files? if-else tree
406
407 // finally, notify everyone that we are stopping
408 if (performance_) performance_->start(performance::Callback::onProcessEnd, 0);
409 i_proc = 0;
410 for (auto module : sequence_) {
411 i_proc++;
412 if (performance_)
413 performance_->start(performance::Callback::onProcessEnd, i_proc);
414 module->onProcessEnd();
415 if (performance_)
416 performance_->stop(performance::Callback::onProcessEnd, i_proc);
417 }
418 if (performance_) performance_->stop(performance::Callback::onProcessEnd, 0);
419
420 // we're done so let's close up the logging
421 logging::close();
423}
void onProcessStart()
Calls onProcessStart for all ConditionsObjectProviders.
void clear()
Reset all of the variables to their limits.
static NtupleManager & getInstance()
void reset()
Reset NtupleManager to blank state.
void newRun(ldmx::RunHeader &header)
Run through the processors and let them know that we are starting a new run.
Definition Process.cxx:457
void onFileClose(EventFile &file) const
File is begin closed.
Definition Process.cxx:549
bool process(int n, int n_tries, Event &event) const
Process the input event through the sequence of processors.
Definition Process.cxx:494
void onFileOpen(EventFile &file) const
File is being opened.
Definition Process.cxx:535
bool keepEvent(bool event_completed) const
Determine if the current event should be kept, based on the defined rules.
void resetEventState()
Reset the event-by-event state.
static void set(int n)
set the event number in the current Formatter
Definition Logger.cxx:158
void absolute_start()
literally first line of Process::run
Definition Tracker.cxx:86
void absolute_stop()
literally last line of Process::run (if run compeletes without error)
Definition Tracker.cxx:88
Provides header information an event such as event number and timestamp.
Definition EventHeader.h:44
void setEventNumber(int eventNumber)
Set the event number.
void setRun(int run)
Set the run number.
void setTimestamp(const TTimeStamp &timestamp)
Set the timestamp.
Run-specific configuration and data stored in its own output TTree alongside the event TTree in the o...
Definition RunHeader.h:57

References framework::performance::Tracker::absolute_start(), framework::performance::Tracker::absolute_stop(), framework::EventFile::addDrop(), framework::NtupleManager::clear(), conditions_, config_, dropKeepRules_, eventHeader_, eventLimit_, framework::Event::getEventHeader(), framework::Event::getEventHeaderPtr(), framework::Event::getEventNumber(), framework::EventFile::getFileName(), framework::NtupleManager::getInstance(), ldmx::EventHeader::getRun(), framework::EventFile::getRunHeaderPtr(), inputFiles_, framework::EventFile::isCorrupted(), framework::StorageControl::keepEvent(), maxTries_, minEvents_, newRun(), framework::EventFile::nextEvent(), framework::Event::onEndOfFile(), onFileClose(), onFileOpen(), framework::Conditions::onProcessStart(), outputFiles_, pass_name_, performance_, process(), framework::NtupleManager::reset(), framework::StorageControl::resetEventState(), runForGeneration_, runHeader_, sequence_, framework::logging::Formatter::set(), ldmx::EventHeader::setEventNumber(), ldmx::RunHeader::setNumTries(), ldmx::EventHeader::setRun(), ldmx::RunHeader::setRunEnd(), ldmx::RunHeader::setRunStart(), ldmx::EventHeader::setTimestamp(), framework::EventFile::setupEvent(), skipCorruptedInputFiles_, framework::performance::Tracker::start(), framework::performance::Tracker::stop(), storageController_, totalEvents_, framework::EventFile::updateParent(), framework::EventFile::writeRunHeader(), and framework::EventFile::writeRunTree().

Referenced by Process().

◆ setEventHeader()

void framework::Process::setEventHeader ( ldmx::EventHeader * h)
inline

Set the pointer to the current event header, used only for tests.

Definition at line 114 of file Process.h.

114{ eventHeader_ = h; }

References eventHeader_.

Member Data Documentation

◆ compressionSetting_

int framework::Process::compressionSetting_
private

Compression setting to pass to output files.

Look at the documentation for the TFile constructor if you want to learn more details. Essentially, setting = 100*algo + level with algo = 0 being the global default.

Definition at line 201 of file Process.h.

Referenced by Process().

◆ conditions_

Conditions framework::Process::conditions_
private

Set of ConditionsProviders.

Definition at line 185 of file Process.h.

Referenced by getConditions(), newRun(), Process(), and run().

◆ config_

framework::config::Parameters framework::Process::config_
private

The parameters used to configure this class.

Definition at line 151 of file Process.h.

Referenced by Process(), and run().

◆ dropKeepRules_

std::vector<std::string> framework::Process::dropKeepRules_
private

Set of drop/keep rules.

Definition at line 204 of file Process.h.

Referenced by Process(), and run().

◆ eventHeader_

const ldmx::EventHeader* framework::Process::eventHeader_ {0}
private

Pointer to the current EventHeader, used for Conditions information.

Definition at line 213 of file Process.h.

213{0};

Referenced by getEventHeader(), getRunNumber(), Process(), run(), and setEventHeader().

◆ eventLimit_

int framework::Process::eventLimit_
private

Limit on events to process.

Definition at line 157 of file Process.h.

Referenced by Process(), requestFinish(), and run().

◆ histoFilename_

std::string framework::Process::histoFilename_
private

Filename for histograms and other user products.

Definition at line 210 of file Process.h.

Referenced by openHistoFile(), and Process().

◆ histoTFile_

TFile* framework::Process::histoTFile_ {0}
private

TFile for histograms and other user products.

Definition at line 219 of file Process.h.

219{0};

Referenced by openHistoFile(), and ~Process().

◆ inputFiles_

std::vector<std::string> framework::Process::inputFiles_
private

List of input files to process.

May be empty if this Process will generate new events.

Definition at line 189 of file Process.h.

Referenced by Process(), and run().

◆ logFrequency_

int framework::Process::logFrequency_
private

The frequency with which event info is printed.

Definition at line 168 of file Process.h.

Referenced by getLogFrequency(), Process(), and process().

◆ maxTries_

int framework::Process::maxTries_
private

Maximum number of attempts to make before giving up on an event.

Definition at line 171 of file Process.h.

Referenced by Process(), and run().

◆ minEvents_

int framework::Process::minEvents_
private

When reading a file in, what's the first event to read.

Definition at line 160 of file Process.h.

Referenced by Process(), and run().

◆ outputFiles_

std::vector<std::string> framework::Process::outputFiles_
private

List of output file names.

If empty, no output file will be created.

Definition at line 192 of file Process.h.

Referenced by Process(), and run().

◆ pass_name_

std::string framework::Process::pass_name_
private

Processing pass name.

Definition at line 154 of file Process.h.

Referenced by getPassName(), newRun(), Process(), and run().

◆ performance_

performance::Tracker* framework::Process::performance_ {0}
private

class with calls backs to track performance measurements of software

Definition at line 222 of file Process.h.

222{0};

Referenced by newRun(), onFileClose(), onFileOpen(), Process(), process(), run(), and ~Process().

◆ runForGeneration_

int framework::Process::runForGeneration_ {1}
private

Run number to use if generating events.

Definition at line 207 of file Process.h.

207{1};

Referenced by getRunNumber(), Process(), and run().

◆ runHeader_

ldmx::RunHeader* framework::Process::runHeader_ {0}
private

Pointer to the current RunHeader, used for Conditions information.

Definition at line 216 of file Process.h.

216{0};

Referenced by getRunHeader(), and run().

◆ sequence_

std::vector<EventProcessor *> framework::Process::sequence_
private

Ordered list of EventProcessors to execute.

Definition at line 182 of file Process.h.

Referenced by newRun(), onFileClose(), onFileOpen(), Process(), process(), run(), and ~Process().

◆ skipCorruptedInputFiles_

bool framework::Process::skipCorruptedInputFiles_
private

allow the Process to skip input files that are corrupted

Definition at line 176 of file Process.h.

Referenced by Process(), and run().

◆ storageController_

StorageControl framework::Process::storageController_
private

Storage controller.

Definition at line 179 of file Process.h.

Referenced by getStorageController(), Process(), and run().

◆ totalEvents_

int framework::Process::totalEvents_
private

Number of events we'd like to produce independetly of the number of tries it would take.

Be warned about infinite loops!

Definition at line 165 of file Process.h.

Referenced by Process(), and run().


The documentation for this class was generated from the following files: