Functions for better restarting after buffer overflow.

This commit is contained in:
Douglas Gillespie 2024-02-15 16:48:33 +00:00
parent 9609546602
commit d2263bbba5
14 changed files with 306 additions and 53 deletions

View File

@ -211,10 +211,10 @@ public class AcquisitionProcess extends PamProcess implements DataInputStore {
if (systemPrepared == false) return; if (systemPrepared == false) return;
newDataQueue.clearList(); // clear this first to make sure nothing new comes in.
// before starting, clear all old data // before starting, clear all old data
rawDataBlock.clearAll(); rawDataBlock.clearAll();
newDataQueue.clearList();
sampleRateErrorFilter.prepareFilter(); sampleRateErrorFilter.prepareFilter();
totalExtraSamples = 0; totalExtraSamples = 0;
@ -302,6 +302,7 @@ public class AcquisitionProcess extends PamProcess implements DataInputStore {
// called by PamController. // called by PamController.
// stop the running system - not the selected system since // stop the running system - not the selected system since
// this may have changed // this may have changed
restartTimer.stop(); restartTimer.stop();
// stallCheckTimer.stop(); // stallCheckTimer.stop();
pamStop(""); pamStop("");
@ -471,6 +472,9 @@ public class AcquisitionProcess extends PamProcess implements DataInputStore {
* have been emptied and processing has stopped * have been emptied and processing has stopped
*/ */
protected void pamHasStopped() { protected void pamHasStopped() {
newDataQueue.clearList(); // clear this first to make sure nothing new comes in.
if (runningSystem == null){ if (runningSystem == null){
runningSystem = acquisitionControl.findDaqSystem(null); runningSystem = acquisitionControl.findDaqSystem(null);
} }
@ -650,9 +654,10 @@ public class AcquisitionProcess extends PamProcess implements DataInputStore {
restartTimer.stop(); restartTimer.stop();
PamController.getInstance().pamStop(); PamController.getInstance().restartPamguard();
// PamController.getInstance().pamStop();
PamController.getInstance().pamStart(false); //
// PamController.getInstance().pamStart(false);
} }

View File

@ -128,6 +128,10 @@ public class PamController implements PamControllerInterface, PamSettings {
public static final int PAM_COMPLETE = 6; public static final int PAM_COMPLETE = 6;
public static final int PAM_MAPMAKING = 7; public static final int PAM_MAPMAKING = 7;
public static final int PAM_OFFLINETASK = 8; public static final int PAM_OFFLINETASK = 8;
public static final int BUTTON_START = 1;
public static final int BUTTON_STOP = 2;
private volatile int lastStartStopButton = 0;
// status' for RunMode = RUN_PAMVIEW // status' for RunMode = RUN_PAMVIEW
public static final int PAM_LOADINGDATA = 2; public static final int PAM_LOADINGDATA = 2;
@ -240,6 +244,9 @@ public class PamController implements PamControllerInterface, PamSettings {
private Thread statusCheckThread; private Thread statusCheckThread;
private WaitDetectorThread detectorEndThread; private WaitDetectorThread detectorEndThread;
private boolean firstDataLoadComplete; private boolean firstDataLoadComplete;
// keep a track of the total number of times PAMGuard is started for debug purposes.
private int nStarts;
private RestartRunnable restartRunnable;
private PamController(int runMode, Object object) { private PamController(int runMode, Object object) {
@ -1045,8 +1052,45 @@ public class PamController implements PamControllerInterface, PamSettings {
*/ */
public void restartPamguard() { public void restartPamguard() {
pamStop(); pamStop();
startLater();
/*
* launch a restart thread, that won't do ANYTHING until
* PAMGuard is really idle and buffers are cleared. Can only
* have one of these at a time !
*/
if (restartRunnable != null) {
System.out.println("Warning !!!! PAMGuard is already trying to restart!");
return;
}
restartRunnable = new RestartRunnable();
Thread restartThread = new Thread(restartRunnable, "RestartPAMGuard Thread");
restartThread.run();
} }
private class RestartRunnable implements Runnable {
@Override
public void run() {
long t1 = System.currentTimeMillis();
while (getPamStatus() != PAM_IDLE) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
long t2 = System.currentTimeMillis();
restartRunnable = null;
System.out.printf("PAMGuard safe to restart after %d milliseconds\n", t2-t1);
startLater(false);
}
}
/** /**
* calls pamStart using the SwingUtilities * calls pamStart using the SwingUtilities
* invokeLater command to start PAMGAURD * invokeLater command to start PAMGAURD
@ -1078,7 +1122,13 @@ public class PamController implements PamControllerInterface, PamSettings {
@Override @Override
public void run() { public void run() {
pamStart(saveSettings); /*
* do a final check that the stop button hasn't been pressed - can arrive a bit
* late if the system was continually restarting.
*/
if (lastStartStopButton != BUTTON_STOP) {
pamStart(saveSettings);
}
} }
} }
@ -1103,6 +1153,26 @@ public class PamController implements PamControllerInterface, PamSettings {
} }
} }
/**
* Called from the start button. A little book keeping
* to distinguish this from automatic starts / restarts
* @return true if started.
*/
@Override
public boolean manualStart() {
lastStartStopButton = BUTTON_START;
return pamStart();
}
/**
* Called from the stop button. A little book keeping
* to distinguish this from automatic starts / restarts
*/
@Override
public void manualStop() {
lastStartStopButton = BUTTON_STOP;
pamStop();
}
/** /**
* Start PAMGUARD. This function also gets called from the * Start PAMGUARD. This function also gets called from the
@ -1193,6 +1263,12 @@ public class PamController implements PamControllerInterface, PamSettings {
saveSettings(PamCalendar.getSessionStartTime()); saveSettings(PamCalendar.getSessionStartTime());
} }
if (++nStarts > 1) {
// do this here - all processses should have reset buffers to start again by now.
String msg = String.format("Starting PAMGuard go %d", nStarts);
dumpBufferStatus(msg, false);
}
StorageOptions.getInstance().setBlockOptions(); StorageOptions.getInstance().setBlockOptions();
t1 = System.currentTimeMillis(); t1 = System.currentTimeMillis();
@ -1254,6 +1330,7 @@ public class PamController implements PamControllerInterface, PamSettings {
} }
} }
dumpBufferStatus("In stopping", false);
/* /*
* now launch another thread to wait for everything to have stopped, but * now launch another thread to wait for everything to have stopped, but
* leave this function so that AWT is released and graphics can update, the * leave this function so that AWT is released and graphics can update, the
@ -1281,9 +1358,11 @@ public class PamController implements PamControllerInterface, PamSettings {
long t2 = System.currentTimeMillis(); long t2 = System.currentTimeMillis();
if (t2 - t1 > 5000) { if (t2 - t1 > 5000) {
System.out.printf("Stopping, but stuck in loop for CheckRunStatus for %3.1fs\n", (double) (t2-t1)/1000.); System.out.printf("Stopping, but stuck in loop for CheckRunStatus for %3.1fs\n", (double) (t2-t1)/1000.);
dumpBufferStatus("Stopping stuck in loop", false);
break; // crap out anyway.
} }
try { try {
Thread.sleep(10); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -1294,19 +1373,41 @@ public class PamController implements PamControllerInterface, PamSettings {
} }
/**
* Look in every data block, particularly threaded ones, and dump
* the buffer status. This will have to go via PamProcess so that
* additional information can be added from any processes that
* hold additional data in other internal buffers.
* @param message Message to print prior to dumping buffers for debug.
* @param sayEmpties dump info even if a buffer is empty (otherwise, only ones that have stuff still)
*/
public void dumpBufferStatus(String message, boolean sayEmpties) {
System.out.println("**** Dumping process buffer status: " + message);
ArrayList<PamControlledUnit> pamControlledUnits = pamConfiguration.getPamControlledUnits();
for (PamControlledUnit aUnit : pamControlledUnits) {
int numProcesses = aUnit.getNumPamProcesses();
for (int i=0; i<numProcesses; i++) {
PamProcess aProcess = aUnit.getPamProcess(i);
aProcess.dumpBufferStatus(message, sayEmpties);
}
}
System.out.println("**** End of process buffer dump: " + message);
}
/** /**
* Called once the detectors have actually stopped and puts a few finalising * Called once the detectors have actually stopped and puts a few finalising
* functions into the AWT thread. * functions into the AWT thread.
*/ */
private void finishStopping() { private void finishStopping() {
detectorEndThread = null; detectorEndThread = null;
SwingUtilities.invokeLater(new Runnable() { // this was never getting invoked for some reason.
// SwingUtilities.invokeLater(new Runnable() {
@Override //
public void run() { // @Override
// public void run() {
pamStopped(); pamStopped();
} // }
}); // });
} }
@ -1320,6 +1421,8 @@ public class PamController implements PamControllerInterface, PamSettings {
* it is necessary to make sure that all internal datablock * it is necessary to make sure that all internal datablock
* buffers have had time to empty. * buffers have had time to empty.
*/ */
System.out.println("Arrived in PamStopped() in thread " + Thread.currentThread().toString());
ArrayList<PamControlledUnit> pamControlledUnits = pamConfiguration.getPamControlledUnits(); ArrayList<PamControlledUnit> pamControlledUnits = pamConfiguration.getPamControlledUnits();
if (PamModel.getPamModel().isMultiThread()) { if (PamModel.getPamModel().isMultiThread()) {
@ -1328,6 +1431,7 @@ public class PamController implements PamControllerInterface, PamSettings {
} }
} }
setPamStatus(PAM_IDLE); setPamStatus(PAM_IDLE);
dumpBufferStatus("In pamStopped, now idle", true);
// wait here until the status has changed to Pam_Idle, so that we know // wait here until the status has changed to Pam_Idle, so that we know
// that we've really finished processing all data // that we've really finished processing all data
@ -2054,6 +2158,7 @@ public class PamController implements PamControllerInterface, PamSettings {
statusWarning.setWarningMessage(warningMessage); statusWarning.setWarningMessage(warningMessage);
statusWarning.setWarnignLevel(1); statusWarning.setWarnignLevel(1);
warningSystem.addWarning(statusWarning); warningSystem.addWarning(statusWarning);
// System.out.println(warningMessage);
} }
} }

View File

@ -458,6 +458,17 @@ public interface PamControllerInterface {
* Close all modules and free up resources. * Close all modules and free up resources.
*/ */
public void pamClose(); public void pamClose();
/**
* Start function called from button to do a bit of extra book keeping
* @return
*/
public boolean manualStart();
/**
* Stop function called from button to do a bit of extra book keeping
*/
public void manualStop();
//public void controllerAddFileMenuItem(); //public void controllerAddFileMenuItem();

View File

@ -1268,7 +1268,7 @@ public class PamGui extends PamView implements WindowListener, PamSettings {
class menuPamStart implements ActionListener { class menuPamStart implements ActionListener {
public void actionPerformed(ActionEvent ev){ public void actionPerformed(ActionEvent ev){
pamControllerInterface.pamStart(); pamControllerInterface.manualStart();
} }
} }
@ -1285,7 +1285,7 @@ public class PamGui extends PamView implements WindowListener, PamSettings {
class menuPamStop implements ActionListener { class menuPamStop implements ActionListener {
public void actionPerformed(ActionEvent ev){ public void actionPerformed(ActionEvent ev){
pamControllerInterface.pamStop(); pamControllerInterface.manualStop();
// enableLoggingMenu(); // enableLoggingMenu();
} }
} }

View File

@ -4286,4 +4286,23 @@ public class PamDataBlock<Tunit extends PamDataUnit> extends PamObservable {
inputEl.setAttribute("Channels", String.format("0x%X", getChannelMap())); inputEl.setAttribute("Channels", String.format("0x%X", getChannelMap()));
return inputEl; return inputEl;
} }
/**
* Look in every data block, particularly threaded ones, and dump
* the buffer status. This will have to go via PamProcess so that
* additional information can be added from any processes that
* hold additional data in other internal buffers.
* @param message Message to print prior to dumping buffers for debug.
* @param sayEmpties dump info even if a buffer is empty (otherwise, only ones that have stuff still)
*/
public void dumpBufferStatus(String message, boolean sayEmpties) {
int nObs = countObservers();
for (int i = 0; i < nObs; i++) {
PamObserver obs = getPamObserver(i);
if (obs instanceof ThreadedObserver) {
ThreadedObserver tObs = (ThreadedObserver) obs;
tObs.dumpBufferStatus(message, sayEmpties);
}
}
}
} }

View File

@ -208,6 +208,14 @@ public class PamObservable {//extends PanelOverlayDraw {
if (System.currentTimeMillis() - startTime > timeOutms) { if (System.currentTimeMillis() - startTime > timeOutms) {
// have taken too long, so return that we've failed. // have taken too long, so return that we've failed.
System.out.println("Wait timeout in threaded observer"); System.out.println("Wait timeout in threaded observer");
// and clear everything that's left.
for (int i = 0; i < pamObservers.size(); i++) {
pamObserver = pamObservers.get(i);
if (pamObserver.getClass() == ThreadedObserver.class) {
threadedObserver = (ThreadedObserver) pamObserver;
threadedObserver.clearEverything();
}
}
return false; return false;
} }
try { try {

View File

@ -767,7 +767,7 @@ abstract public class PamProcess implements PamObserver, ProcessAnnotator {
} }
}); });
private int lastSourceNotificationType; private volatile int lastSourceNotificationType;
private Object lastSourceNotificationObject; private Object lastSourceNotificationObject;
@ -1065,4 +1065,24 @@ abstract public class PamProcess implements PamObserver, ProcessAnnotator {
return lastSourceNotificationObject; return lastSourceNotificationObject;
} }
/**
* Say the status of any buffers, particularly in output buffers of
* data blocks, but can add bespoke info for other internal buffers
* for some processes.
* @param message
* @param sayEmpties include info even if a buffer is empty.
*/
public void dumpBufferStatus(String message, boolean sayEmpties) {
ArrayList<PamDataBlock> outputs = getOutputDataBlocks();
try {
for (PamDataBlock output : outputs) {
output.dumpBufferStatus(message, sayEmpties);
}
}
catch (Exception e) {
System.err.println("Error dumping buffer data from process " + getProcessName());
e.printStackTrace();
}
}
} }

View File

@ -145,6 +145,17 @@ public class PamRawDataBlock extends AcousticDataBlock<RawDataUnit> {
} }
} }
/**
* Reset data integrity checking counters.
*/
public void reset() {
prevChannelSample = new long[PamConstants.MAX_CHANNELS];
summaryTotals = new double[PamConstants.MAX_CHANNELS];
summaryTotals2 = new double[PamConstants.MAX_CHANNELS];
summaryMaxVal = new double[PamConstants.MAX_CHANNELS];
summaryCount = new int[PamConstants.MAX_CHANNELS];
}
@Override @Override
public void addPamData(RawDataUnit pamDataUnit) { public void addPamData(RawDataUnit pamDataUnit) {
/* /*

View File

@ -444,6 +444,7 @@ public class ThreadedObserver implements PamObserver {
else { else {
emptyRead = false; emptyRead = false;
int lc=0; int lc=0;
ObservedObject observedObject;
while (!toDoList.isEmpty()) { while (!toDoList.isEmpty()) {
// if (stopFlag) { // if (stopFlag) {
@ -458,11 +459,21 @@ public class ThreadedObserver implements PamObserver {
// get the first object, send it for processing and then remove from the list // get the first object, send it for processing and then remove from the list
ObservedObject observedObject = toDoList.get(0);
performAction(observedObject);
synchronized(synchLock) { synchronized(synchLock) {
toDoList.remove(0); if (toDoList.size() > 0) {
observedObject = toDoList.remove(0);
}
else {
break;
}
} }
// need to do this bit outside of the synch block.
performAction(observedObject);
// synchronized(synchLock) {
// if (toDoList.size() > 0) { // list may have been cleared during a shut down.
// toDoList.remove(0);
// }
// }
} }
} }
} }
@ -525,4 +536,21 @@ public class ThreadedObserver implements PamObserver {
} }
} }
public void clearEverything() {
synchronized (synchLock) {
System.out.printf("Clearing %d objects from todo list in %s\n", toDoList.size(), singleThreadObserver.getObserverName());
toDoList.clear();
}
}
public void dumpBufferStatus(String message, boolean sayEmpties) {
int n = toDoList.size();
if (sayEmpties == false && n == 0) {
return;
}
String name = singleThreadObserver.getObserverName();
System.out.printf("Threaded observer %s has %d objects in queue\n", name, n);
}
} }

View File

@ -415,12 +415,19 @@ public class MHTClickTrainAlgorithm implements ClickTrainAlgorithm, PamSettings
return mhtGUI; return mhtGUI;
} }
Thread previousThread = null;
/** /**
* Update the algorithm * Update the algorithm
* @param flag- flag indicating the update type. * @param flag- flag indicating the update type.
*/ */
public void update(int flag, Object info) { public void update(int flag, Object info) {
if (Thread.currentThread() != previousThread) {
// see flag id constants in ClickTrianControl
System.out.printf("Thread change to %s in MHTClicktrainAlgorithm.update flag %d, object %s\n",
Thread.currentThread().toString(), flag, info);
previousThread = Thread.currentThread();
}
switch (flag) { switch (flag) {
case ClickTrainControl.PROCESSING_START: case ClickTrainControl.PROCESSING_START:
//make sure the kernel is cleared before processing //make sure the kernel is cleared before processing

View File

@ -285,28 +285,34 @@ public class MHTKernel<T> {
BitSet currentBitSet; BitSet currentBitSet;
MHTChi2<T> mhtChi2; MHTChi2<T> mhtChi2;
int index; int index;
for (int i=0; i<possibleTracks.size(); i++) { synchronized(trackSynchronisation) {
for (int i=0; i<possibleTracks.size(); i++) {
currentBitSet=possibleTracks.get(i).trackBitSet; currentBitSet=possibleTracks.get(i).trackBitSet;
//index is the total detection count-1; //index is the total detection count-1;
index=kcount-1; index=kcount-1;
//now add both a true and false for the data unit to be in this possibility. //now add both a true and false for the data unit to be in this possibility.
currentBitSet.set(index, true); currentBitSet.set(index, true);
mhtChi2=possibleTracks.get(i).chi2Track.cloneMHTChi2(); mhtChi2=possibleTracks.get(i).chi2Track.cloneMHTChi2();
newPossibilities.add(new TrackBitSet(currentBitSet, mhtChi2)); newPossibilities.add(new TrackBitSet(currentBitSet, mhtChi2));
//add a coast to the possibility //add a coast to the possibility
currentBitSet=(BitSet) currentBitSet.clone(); currentBitSet=(BitSet) currentBitSet.clone();
currentBitSet.set(index, false); currentBitSet.set(index, false);
//currentBitSet.set(currentBitSet.size(), true); //currentBitSet.set(currentBitSet.size(), true);
//add new chi2 value - need to clone this time. //add new chi2 value - need to clone this time.
mhtChi2=possibleTracks.get(i).chi2Track.cloneMHTChi2(); /*
* This line can throw an error due to poor sunchronisation if
* the list is emptied from a different thread.
*/
mhtChi2=possibleTracks.get(i).chi2Track.cloneMHTChi2();
//added the cloned bitset to not mess up references //added the cloned bitset to not mess up references
newPossibilities.add(new TrackBitSet(currentBitSet, mhtChi2)); newPossibilities.add(new TrackBitSet(currentBitSet, mhtChi2));
}
} }
} }

View File

@ -38,8 +38,10 @@ public class DecimatorProcessW extends PamProcess {
@Override @Override
public void pamStart() { public void pamStart() {
// TODO Auto-generated method stub outputDataBlock.reset();
if (decimatorWorker != null) {
decimatorWorker.reset();
}
} }
@Override @Override

View File

@ -77,6 +77,13 @@ public class DecimatorWorker {
createFilters(); createFilters();
} }
/**
* Reset all counters and output buffers.
*/
public void reset() {
createFilters();
}
/** /**
* Make the decimator filters. If reducing frequency, then the filter * Make the decimator filters. If reducing frequency, then the filter
* is applied before decimation (obviously!) so is set up based on the * is applied before decimation (obviously!) so is set up based on the

View File

@ -124,6 +124,7 @@ public class PamFFTProcess extends PamProcess {
public synchronized void setupFFT() { public synchronized void setupFFT() {
System.out.println("In call to setupFFT in " + getProcessName());
// need to find the existing source data block and remove from observing it. // need to find the existing source data block and remove from observing it.
// then find the new one and subscribe to that instead. // then find the new one and subscribe to that instead.
channelCounts = new int[PamConstants.MAX_CHANNELS]; channelCounts = new int[PamConstants.MAX_CHANNELS];
@ -390,28 +391,32 @@ public class PamFFTProcess extends PamProcess {
} }
} }
} }
TempOutputStore[] oldStores = tempStores;
if (iChan == PamUtils.getHighestChannel(fftParameters.channelMap)) { if (iChan == PamUtils.getHighestChannel(fftParameters.channelMap)) {
// time to empty the stores - assume they all have the same // time to empty the stores - assume they all have the same
// amount of data // amount of data
int[] chanList = PamUtils.getChannelArray(fftParameters.channelMap); int[] chanList = PamUtils.getChannelArray(fftParameters.channelMap);
try { try {
int n = tempStores[iChan].getN(); int n = tempStores[iChan].getN();
for (int iF = 0; iF < n; iF++) { for (int iF = 0; iF < n; iF++) {
for (int iC = 0; iC < chanList.length; iC++) { for (int iC = 0; iC < chanList.length; iC++) {
// pu = tempStores[chanList[iC]].get(iF); // pu = tempStores[chanList[iC]].get(iF);
try { try {
outputData.addPamData(tempStores[chanList[iC]].get(iF)); outputData.addPamData(tempStores[chanList[iC]].get(iF));
}
catch (ArrayIndexOutOfBoundsException e) {
// e.printStackTrace();
System.err.printf("%s.newData: %s Store %s (was %s) iC: %d of %d iF: %d of %d\n",
this.getPamControlledUnit().getUnitName(), e.getMessage(),
tempStores[chanList[iC]], oldStores[chanList[iC]],
iC, chanList.length, iF, n);
}
// outputData.addPamData(null);
} }
catch (ArrayIndexOutOfBoundsException e) {
// e.printStackTrace();
System.err.println("PAMFFTProcess.newData: " + e.getMessage() + " " + this.getPamControlledUnit().getUnitName() + " iC: " + iC + " iF: " + iF);
}
// outputData.addPamData(null);
} }
} for (int iC = 0; iC < chanList.length; iC++) {
for (int iC = 0; iC < chanList.length; iC++) { tempStores[chanList[iC]].clearStore();
tempStores[chanList[iC]].clearStore(); }
}
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
@ -539,6 +544,25 @@ public class PamFFTProcess extends PamProcess {
return new ArrayList<Class<? extends PamDataUnit>>(Arrays.asList(RawDataUnit.class)); return new ArrayList<Class<? extends PamDataUnit>>(Arrays.asList(RawDataUnit.class));
} }
@Override
public synchronized void dumpBufferStatus(String message, boolean sayEmpties) {
super.dumpBufferStatus(message, sayEmpties);
int nTemp = 0;
if (tempStores != null) {
nTemp = tempStores.length;
}
for (int i = 0; i < nTemp; i++) {
if (tempStores[i] == null) {
continue;
}
int n = tempStores[i].tempUnits.size();
if (n > 0 || sayEmpties) {
System.out.printf("FFT %s temp store %d has %d datas\n", getProcessName(), i, n);
}
}
}
// @Override // @Override
// public boolean requestOfflineData(PamDataBlock dataBlock, long startMillis, // public boolean requestOfflineData(PamDataBlock dataBlock, long startMillis,
// long endMillis) { // long endMillis) {