14#include "Framework/EventFile.h"
16#include "Framework/Exception/Exception.h"
17#include "Framework/Logger.h"
18#include "Framework/NtupleManager.h"
19#include "Framework/RunHeader.h"
25volatile std::sig_atomic_t preemption_received_ = 0;
30 : conditions_{*this} {
36 max_tries_ = configuration.
get<
int>(
"max_tries_per_event", 1);
43 configuration.
get<
bool>(
"skip_corrupted_input_files",
false);
45 input_files_ = configuration.
get<std::vector<std::string>>(
"input_files", {});
47 configuration.
get<std::vector<std::string>>(
"output_files", {});
55 auto run{configuration.
get<
int>(
"run", -1)};
58 auto libs{configuration.
get<std::vector<std::string>>(
"libraries", {})};
59 std::set<std::string> libraries_loaded;
60 for (
const auto &lib : libs) {
61 if (libraries_loaded.find(lib) != libraries_loaded.end()) {
65 void *handle = dlopen(lib.c_str(), RTLD_NOW);
66 if (handle ==
nullptr) {
67 EXCEPTION_RAISE(
"LibraryLoadFailure",
68 "Error loading library '" + lib +
"':" + dlerror());
71 libraries_loaded.insert(lib);
75 configuration.
get<
bool>(
"skim_default_is_keep",
true));
77 configuration.
get<std::vector<std::string>>(
"skim_rules", {})};
78 for (
size_t i = 0; i < skim_rules.size(); i += 2) {
82 auto sequence{configuration.
get<std::vector<framework::config::Parameters>>(
84 if (sequence.empty() && configuration.
get<
bool>(
"testing_mode",
false)) {
87 "No sequence has been defined. What should I be doing?\nUse "
88 "p.sequence to tell me what processors to run.");
90 for (
auto proc : sequence) {
91 auto class_name{proc.get<std::string>(
"class_name")};
92 auto instance_name{proc.get<std::string>(
"instance_name")};
94 EventProcessor::Factory::get().make(class_name, instance_name, *
this)};
96 EXCEPTION_RAISE(
"UnableToCreate",
97 "The EventProcessor Factory was unable to create " +
98 instance_name +
" of type " + class_name +
99 ". Did you inherit from framework::Producer or "
100 "framework::Analyzer? "
101 "Did you DECLARE_PRODUCER or DECLARE_ANALYZER in the "
102 "implementation (.cxx) file? "
103 "Did you use the class's full name (including "
104 "namespaces) in the Python configuration class? "
105 "Does the Python configuration class reference the "
106 "correct library it is a part of?");
109 proc.get<std::vector<framework::config::Parameters>>(
"histograms", {})};
110 if (!histograms.empty()) {
111 ep.value()->getHistoDirectory();
112 ep.value()->createHistograms(histograms);
114 ep.value()->configure(proc);
118 auto conditions_object_providers{
119 configuration.
get<std::vector<framework::config::Parameters>>(
120 "conditions_object_providers", {})};
121 for (
auto cop : conditions_object_providers) {
122 auto class_name{cop.get<std::string>(
"class_name")};
123 auto object_name{cop.get<std::string>(
"object_name")};
124 auto tag_name{cop.get<std::string>(
"tag_name")};
129 bool log_performance = configuration.
get<
bool>(
"log_performance",
false);
130 if (log_performance) {
131 std::vector<std::string> names{
sequence_.size()};
132 for (std::size_t i{0}; i <
sequence_.size(); i++) {
159 auto n_events_processed{0};
173 std::size_t i_proc{0};
181 proc->onProcessStart();
192 EXCEPTION_RAISE(
"InvalidConfig",
193 "No input files or output files were given.");
195 ldmx_log(warn) <<
"Several output files given with no input files. "
197 <<
"' will be used.";
222 ldmx_log(warn) <<
"The total_events was set, so max_events and "
223 "max_tries_per_event will be ignored!";
226 while (n_events_processed < event_limit) {
228 if (preemption_received_) {
230 <<
"Preemption signal received, stopping event generation";
246 bool completed =
process(n_events_processed, num_tries, the_event);
251 if (completed) num_tries = 0;
257 n_events_processed++;
266 run_header.
setRunEnd(std::time(
nullptr));
271 if (n_events_processed < total_tries / 10000) {
273 <<
"Less than 1 event out of every 10k events tried was accepted!";
275 <<
"This could be an issue with your filtering and biasing procedure "
276 "since this is incredibly inefficient.";
284 bool single_output =
false;
286 single_output =
true;
289 EXCEPTION_RAISE(
"Process",
290 "Unable to handle case of different number of input and "
291 "output files (other than zero/one ouput file).");
301 ldmx_log(warn) <<
"Input file '" << infilename
302 <<
"' was found to be corrupted. Skipping.";
307 "We should never get here. "
308 "EventFile is corrupted but we aren't skipping corrupted inputs. "
309 "EventFile should be throwing its own exceptions in this case.");
313 ldmx_log(info) <<
"Opening file " << infilename;
322 if (!single_output or ifile == 0) {
331 master_file = out_file;
333 EXCEPTION_RAISE(
"Process",
"Unable to construct output file for " +
342 master_file = out_file;
349 master_file = &in_file;
355 n_events_processed++;
358 bool event_completed =
true;
359 while (!preemption_received_ &&
373 ldmx_log(info) <<
"Got new run header from '"
377 ldmx_log(warn) <<
"Run header for run " << was_run
378 <<
" was not found!";
382 event_completed =
process(n_events_processed, 1, the_event);
387 n_events_processed++;
390 if (preemption_received_) {
391 ldmx_log(fatal) <<
"Preemption signal received, stopping event "
392 "processing and closing files";
395 bool leave_early{
false};
397 ldmx_log(info) <<
"Reached event limit of " <<
event_limit_
403 ldmx_log(warn) <<
"Processing interrupted";
407 ldmx_log(info) <<
"Closing file " << infilename;
413 if (out_file and !single_output) {
441 proc->onProcessEnd();
458 TDirectory *child = owner->mkdir((
char *)dirName.c_str());
459 if (child) child->cd();
464 TDirectory *owner{
nullptr};
470 "You did not provide the necessary histogram file name to "
471 "put your histograms (or performance data) in.\n Provide this "
472 "name in the python configuration with 'p.histogramFile = "
473 "\"myHistFile.root\"' where p is the Process object.");
493 std::size_t i_proc{0};
498 proc->beforeNewRun(header);
512 proc->onNewRun(header);
517 ldmx_log(info) << header;
526 ldmx_log(info) <<
"Processing " << n + 1 <<
" Run "
527 <<
event.getEventHeader().getRun() <<
" Event "
528 <<
event.getEventHeader().getEventNumber() <<
" ("
529 << t.AsString(
"lc") <<
")";
533 std::size_t i_proc{0};
539 proc->process(event);
560 std::size_t i_proc{0};
565 proc->onFileOpen(file);
574 std::size_t i_proc{0};
579 proc->onFileClose(file);
Base classes for all user event processing components to extend.
Class implementing an event buffer system for storing event data.
Class which represents the process under execution.
Specific exception used to abort an event.
void onProcessStart()
Calls onProcessStart for all ConditionsObjectProviders.
void onNewRun(ldmx::RunHeader &)
Calls onNewRun for all ConditionsObjectProviders.
void createConditionsObjectProvider(const std::string &classname, const std::string &instancename, const std::string &tagname, const framework::config::Parameters ¶ms)
Create a ConditionsObjectProvider given the information.
This class manages all ROOT file input/output operations.
void updateParent(EventFile *parent)
Change pointer to different parent file.
const std::string & getFileName()
void addDrop(const std::string &rule)
Add a rule for dropping collections from the output.
void setupEvent(Event *evt)
Set an Event object containing the event data to work with this file.
void writeRunTree()
Write the map of run headers to the file as a TTree of RunHeader.
bool nextEvent(bool storeCurrentEvent=true)
Prepare the next event.
ldmx::RunHeader * getRunHeaderPtr(int runNumber)
Update the RunHeader for a given run, if it exists in the input file.
void writeRunHeader(ldmx::RunHeader &runHeader)
Write the run header into the run map.
bool isCorrupted() const
Check if the file we have is corrupted.
Base class for all event processing components.
Implements an event buffer system for storing event data.
int getEventNumber() const
Get the event number.
void onEndOfFile()
Perform end of file action.
ldmx::EventHeader & getEventHeader()
Get the event header.
const ldmx::EventHeader * getEventHeaderPtr()
Get the event header as a pointer.
void clear()
Reset all of the variables to their limits.
static NtupleManager & getInstance()
void reset()
Reset NtupleManager to blank state.
int log_frequency_
The frequency with which event info is printed.
std::vector< EventProcessor * > sequence_
Ordered list of EventProcessors to execute.
bool skip_corrupted_input_files_
allow the Process to skip input files that are corrupted
std::string histo_filename_
Filename for histograms and other user products.
int max_tries_
Maximum number of attempts to make before giving up on an event.
int run_for_generation_
Run number to use if generating events.
int compression_setting_
Compression setting to pass to output files.
void run()
Run the process.
int event_limit_
Limit on events to process.
std::string pass_name_
Processing pass name.
void newRun(ldmx::RunHeader &header)
Run through the processors and let them know that we are starting a new run.
TFile * histo_t_file_
TFile for histograms and other user products.
TDirectory * openHistoFile()
Open a ROOT TFile to write histograms and TTrees.
int total_events_
Number of events we'd like to produce independetly of the number of tries it would take.
~Process()
Class Destructor.
void onFileClose(EventFile &file) const
File is begin closed.
std::vector< std::string > drop_keep_rules_
Set of drop/keep rules.
ldmx::RunHeader * run_header_
Pointer to the current RunHeader, used for Conditions information.
TDirectory * makeHistoDirectory(const std::string &dirName)
Construct a TDirectory* for the given module.
performance::Tracker * performance_
class with calls backs to track performance measurements of software
int min_events_
When reading a file in, what's the first event to read.
StorageControl storage_controller_
Storage controller.
std::vector< std::string > output_files_
List of output file names.
std::vector< std::string > input_files_
List of input files to process.
const ldmx::EventHeader * event_header_
Pointer to the current EventHeader, used for Conditions information.
bool process(int n, int n_tries, Event &event) const
Process the input event through the sequence of processors.
void onFileOpen(EventFile &file) const
File is being opened.
Conditions conditions_
Set of ConditionsProviders.
Process(const framework::config::Parameters &configuration)
Class constructor.
int getRunNumber() const
Get the current run number or the run number to be used when initiating new events from the job.
framework::config::Parameters config_
The parameters used to configure this class.
void setDefaultKeep(bool keep)
Set the default state.
bool keepEvent(bool event_completed) const
Determine if the current event should be kept, based on the defined rules.
void addRule(const std::string &processor_pat, const std::string &purpose_pat)
Add a listening rule.
void resetEventState()
Reset the event-by-event state.
Class encapsulating parameters for configuring a processor.
const T & get(const std::string &name) const
Retrieve the parameter of the given name.
All classes in the ldmx-sw project use this namespace.