14#include "Framework/EventFile.h"
16#include "Framework/Exception/Exception.h"
17#include "Framework/Logger.h"
18#include "Framework/NtupleManager.h"
19#include "Framework/RunHeader.h"
25volatile std::sig_atomic_t preemption_received_ = 0;
30 : conditions_{*this} {
43 configuration.
get<
bool>(
"skipCorruptedInputFiles",
false);
45 input_files_ = configuration.
get<std::vector<std::string>>(
"inputFiles", {});
47 configuration.
get<std::vector<std::string>>(
"outputFiles", {});
55 auto run{configuration.
get<
int>(
"run", -1)};
58 auto libs{configuration.
get<std::vector<std::string>>(
"libraries", {})};
59 std::set<std::string> libraries_loaded;
60 for (
const auto &lib : libs) {
61 if (libraries_loaded.find(lib) != libraries_loaded.end()) {
65 void *handle = dlopen(lib.c_str(), RTLD_NOW);
66 if (handle ==
nullptr) {
67 EXCEPTION_RAISE(
"LibraryLoadFailure",
68 "Error loading library '" + lib +
"':" + dlerror());
71 libraries_loaded.insert(lib);
75 configuration.
get<
bool>(
"skimDefaultIsKeep",
true));
76 auto skim_rules{configuration.
get<std::vector<std::string>>(
"skimRules", {})};
77 for (
size_t i = 0; i < skim_rules.size(); i += 2) {
81 auto sequence{configuration.
get<std::vector<framework::config::Parameters>>(
83 if (sequence.empty() && configuration.
get<
bool>(
"testingMode",
false)) {
86 "No sequence has been defined. What should I be doing?\nUse "
87 "p.sequence to tell me what processors to run.");
89 for (
auto proc : sequence) {
90 auto class_name{proc.get<std::string>(
"className")};
91 auto instance_name{proc.get<std::string>(
"instanceName")};
93 EventProcessor::Factory::get().make(class_name, instance_name, *
this)};
95 EXCEPTION_RAISE(
"UnableToCreate",
96 "The EventProcessor Factory was unable to create " +
97 instance_name +
" of type " + class_name +
98 ". Did you inherit from framework::Producer or "
99 "framework::Analyzer? "
100 "Did you DECLARE_PRODUCER or DECLARE_ANALYZER in the "
101 "implementation (.cxx) file? "
102 "Did you use the class's full name (including "
103 "namespaces) in the Python configuration class? "
104 "Does the Python configuration class reference the "
105 "correct library it is a part of?");
108 proc.get<std::vector<framework::config::Parameters>>(
"histograms", {})};
109 if (!histograms.empty()) {
110 ep.value()->getHistoDirectory();
111 ep.value()->createHistograms(histograms);
113 ep.value()->configure(proc);
117 auto conditions_object_providers{
118 configuration.
get<std::vector<framework::config::Parameters>>(
119 "conditionsObjectProviders", {})};
120 for (
auto cop : conditions_object_providers) {
121 auto class_name{cop.get<std::string>(
"className")};
122 auto object_name{cop.get<std::string>(
"objectName")};
123 auto tag_name{cop.get<std::string>(
"tagName")};
128 bool log_performance = configuration.
get<
bool>(
"logPerformance",
false);
129 if (log_performance) {
130 std::vector<std::string> names{
sequence_.size()};
131 for (std::size_t i{0}; i <
sequence_.size(); i++) {
158 auto n_events_processed{0};
172 std::size_t i_proc{0};
180 proc->onProcessStart();
191 EXCEPTION_RAISE(
"InvalidConfig",
192 "No input files or output files were given.");
194 ldmx_log(warn) <<
"Several output files given with no input files. "
196 <<
"' will be used.";
221 ldmx_log(warn) <<
"The totalEvents was set, so maxEvents and "
222 "maxTriesPerEvent will be ignored!";
225 while (n_events_processed < event_limit) {
227 if (preemption_received_) {
229 <<
"Preemption signal received, stopping event generation";
245 bool completed =
process(n_events_processed, num_tries, the_event);
250 if (completed) num_tries = 0;
256 n_events_processed++;
265 run_header.
setRunEnd(std::time(
nullptr));
270 if (n_events_processed < total_tries / 10000) {
272 <<
"Less than 1 event out of every 10k events tried was accepted!";
274 <<
"This could be an issue with your filtering and biasing procedure "
275 "since this is incredibly inefficient.";
283 bool single_output =
false;
285 single_output =
true;
288 EXCEPTION_RAISE(
"Process",
289 "Unable to handle case of different number of input and "
290 "output files (other than zero/one ouput file).");
300 ldmx_log(warn) <<
"Input file '" << infilename
301 <<
"' was found to be corrupted. Skipping.";
306 "We should never get here. "
307 "EventFile is corrupted but we aren't skipping corrupted inputs. "
308 "EventFile should be throwing its own exceptions in this case.");
312 ldmx_log(info) <<
"Opening file " << infilename;
321 if (!single_output or ifile == 0) {
330 master_file = out_file;
332 EXCEPTION_RAISE(
"Process",
"Unable to construct output file for " +
341 master_file = out_file;
348 master_file = &in_file;
354 n_events_processed++;
357 bool event_completed =
true;
358 while (!preemption_received_ &&
372 ldmx_log(info) <<
"Got new run header from '"
376 ldmx_log(warn) <<
"Run header for run " << was_run
377 <<
" was not found!";
381 event_completed =
process(n_events_processed, 1, the_event);
386 n_events_processed++;
389 if (preemption_received_) {
390 ldmx_log(fatal) <<
"Preemption signal received, stopping event "
391 "processing and closing files";
394 bool leave_early{
false};
396 ldmx_log(info) <<
"Reached event limit of " <<
event_limit_
402 ldmx_log(warn) <<
"Processing interrupted";
406 ldmx_log(info) <<
"Closing file " << infilename;
412 if (out_file and !single_output) {
440 proc->onProcessEnd();
457 TDirectory *child = owner->mkdir((
char *)dirName.c_str());
458 if (child) child->cd();
463 TDirectory *owner{
nullptr};
469 "You did not provide the necessary histogram file name to "
470 "put your histograms (or performance data) in.\n Provide this "
471 "name in the python configuration with 'p.histogramFile = "
472 "\"myHistFile.root\"' where p is the Process object.");
492 std::size_t i_proc{0};
497 proc->beforeNewRun(header);
511 proc->onNewRun(header);
516 ldmx_log(info) << header;
525 ldmx_log(info) <<
"Processing " << n + 1 <<
" Run "
526 <<
event.getEventHeader().getRun() <<
" Event "
527 <<
event.getEventHeader().getEventNumber() <<
" ("
528 << t.AsString(
"lc") <<
")";
532 std::size_t i_proc{0};
538 proc->process(event);
559 std::size_t i_proc{0};
564 proc->onFileOpen(file);
573 std::size_t i_proc{0};
578 proc->onFileClose(file);
Base classes for all user event processing components to extend.
Class implementing an event buffer system for storing event data.
Class which represents the process under execution.
Specific exception used to abort an event.
void onProcessStart()
Calls onProcessStart for all ConditionsObjectProviders.
void onNewRun(ldmx::RunHeader &)
Calls onNewRun for all ConditionsObjectProviders.
void createConditionsObjectProvider(const std::string &classname, const std::string &instancename, const std::string &tagname, const framework::config::Parameters ¶ms)
Create a ConditionsObjectProvider given the information.
This class manages all ROOT file input/output operations.
void updateParent(EventFile *parent)
Change pointer to different parent file.
const std::string & getFileName()
void addDrop(const std::string &rule)
Add a rule for dropping collections from the output.
void setupEvent(Event *evt)
Set an Event object containing the event data to work with this file.
void writeRunTree()
Write the map of run headers to the file as a TTree of RunHeader.
bool nextEvent(bool storeCurrentEvent=true)
Prepare the next event.
ldmx::RunHeader * getRunHeaderPtr(int runNumber)
Update the RunHeader for a given run, if it exists in the input file.
void writeRunHeader(ldmx::RunHeader &runHeader)
Write the run header into the run map.
bool isCorrupted() const
Check if the file we have is corrupted.
Base class for all event processing components.
Implements an event buffer system for storing event data.
int getEventNumber() const
Get the event number.
void onEndOfFile()
Perform end of file action.
ldmx::EventHeader & getEventHeader()
Get the event header.
const ldmx::EventHeader * getEventHeaderPtr()
Get the event header as a pointer.
void clear()
Reset all of the variables to their limits.
static NtupleManager & getInstance()
void reset()
Reset NtupleManager to blank state.
int log_frequency_
The frequency with which event info is printed.
std::vector< EventProcessor * > sequence_
Ordered list of EventProcessors to execute.
bool skip_corrupted_input_files_
allow the Process to skip input files that are corrupted
std::string histo_filename_
Filename for histograms and other user products.
int max_tries_
Maximum number of attempts to make before giving up on an event.
int run_for_generation_
Run number to use if generating events.
int compression_setting_
Compression setting to pass to output files.
void run()
Run the process.
int event_limit_
Limit on events to process.
std::string pass_name_
Processing pass name.
void newRun(ldmx::RunHeader &header)
Run through the processors and let them know that we are starting a new run.
TFile * histo_t_file_
TFile for histograms and other user products.
TDirectory * openHistoFile()
Open a ROOT TFile to write histograms and TTrees.
int total_events_
Number of events we'd like to produce independetly of the number of tries it would take.
~Process()
Class Destructor.
void onFileClose(EventFile &file) const
File is begin closed.
std::vector< std::string > drop_keep_rules_
Set of drop/keep rules.
ldmx::RunHeader * run_header_
Pointer to the current RunHeader, used for Conditions information.
TDirectory * makeHistoDirectory(const std::string &dirName)
Construct a TDirectory* for the given module.
performance::Tracker * performance_
class with calls backs to track performance measurements of software
int min_events_
When reading a file in, what's the first event to read.
StorageControl storage_controller_
Storage controller.
std::vector< std::string > output_files_
List of output file names.
std::vector< std::string > input_files_
List of input files to process.
const ldmx::EventHeader * event_header_
Pointer to the current EventHeader, used for Conditions information.
bool process(int n, int n_tries, Event &event) const
Process the input event through the sequence of processors.
void onFileOpen(EventFile &file) const
File is being opened.
Conditions conditions_
Set of ConditionsProviders.
Process(const framework::config::Parameters &configuration)
Class constructor.
int getRunNumber() const
Get the current run number or the run number to be used when initiating new events from the job.
framework::config::Parameters config_
The parameters used to configure this class.
void setDefaultKeep(bool keep)
Set the default state.
bool keepEvent(bool event_completed) const
Determine if the current event should be kept, based on the defined rules.
void addRule(const std::string &processor_pat, const std::string &purpose_pat)
Add a listening rule.
void resetEventState()
Reset the event-by-event state.
Class encapsulating parameters for configuring a processor.
const T & get(const std::string &name) const
Retrieve the parameter of the given name.
All classes in the ldmx-sw project use this namespace.