Fix early data discard

Fix issue in clip generator: when running very fast offline raw data being discarded before clips are generated. Changed threading model slightly and increased data keep time by 2x the thread jitter to try to avoid this.
This commit is contained in:
Douglas Gillespie 2024-06-11 15:55:14 +01:00
parent 74759cb576
commit 45cc88ac67
4 changed files with 64 additions and 15 deletions

View File

@ -257,12 +257,13 @@ public class PamRawDataBlock extends AcousticDataBlock<RawDataUnit> {
synchronized public RawDataUnit[] getAvailableSamples(long startMillis, long durationMillis, int channelMap) throws RawDataUnavailableException {
RawDataUnit firstUnit = getFirstUnit();
if (firstUnit == null) {
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_NOT_ARRIVED, startMillis, (int) durationMillis);
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_NOT_ARRIVED, 0,0, startMillis, (int) durationMillis);
}
long firstMillis = firstUnit.getTimeMilliseconds();
long firstSamples = firstUnit.getStartSample();
RawDataUnit lastUnit = getLastUnit();
long lastMillis = lastUnit.getEndTimeInMilliseconds();
long lastSample = lastUnit.getStartSample()+lastUnit.getSampleDuration();
long firstAvailableMillis = Math.max(firstMillis, startMillis);
@ -272,7 +273,8 @@ public class PamRawDataBlock extends AcousticDataBlock<RawDataUnit> {
double[][] data = getSamplesForMillis(firstAvailableMillis, lastAvailableMillis-firstAvailableMillis, channelMap);
if (data == null) {
// this shouldn't happen. If an exception wasn't thrown from getSamples... then data should no tb enull
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_NOT_ARRIVED, startMillis, (int) durationMillis);
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_NOT_ARRIVED,
firstSamples, lastSample, startMillis, (int) durationMillis);
}
RawDataUnit[] dataUnits = new RawDataUnit[data.length];
for (int i = 0; i < data.length; i++) {
@ -298,7 +300,7 @@ public class PamRawDataBlock extends AcousticDataBlock<RawDataUnit> {
synchronized public double[][] getSamplesForMillis(long startMillis, long durationMillis, int channelMap) throws RawDataUnavailableException {
RawDataUnit firstUnit = getFirstUnit();
if (firstUnit == null) {
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_NOT_ARRIVED, startMillis, (int) durationMillis);
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_NOT_ARRIVED, 0, 0, startMillis, (int) durationMillis);
}
long firstMillis = firstUnit.getTimeMilliseconds();
long firstSamples = firstUnit.getStartSample();
@ -317,23 +319,28 @@ public class PamRawDataBlock extends AcousticDataBlock<RawDataUnit> {
// run a few tests ...
int chanOverlap = channelMap & getChannelMap();
if (chanOverlap != channelMap) {
throw new RawDataUnavailableException(this, RawDataUnavailableException.INVALID_CHANNEL_LIST, startSample, duration);
throw new RawDataUnavailableException(this, RawDataUnavailableException.INVALID_CHANNEL_LIST, 0,0,startSample, duration);
}
if (duration < 0) {
throw new RawDataUnavailableException(this, RawDataUnavailableException.NEGATIVE_DURATION, startSample, duration);
throw new RawDataUnavailableException(this, RawDataUnavailableException.NEGATIVE_DURATION,0,0, startSample, duration);
}
RawDataUnit dataUnit = getFirstUnit();
if (dataUnit == null) {
return null;
}
if (dataUnit.getStartSample() > startSample) {
RawDataUnit lastUnit = getLastUnit();
long firstSample = dataUnit.getStartSample();
long lastSample = lastUnit.getStartSample()+lastUnit.getSampleDuration();
if (firstSample > startSample) {
// System.out.println("Earliest start sample : " + dataUnit.getStartSample());
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_ALREADY_DISCARDED, startSample, duration);
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_ALREADY_DISCARDED,
firstSample, lastSample, startSample, duration);
}
dataUnit = getLastUnit();
if (hasLastSample(dataUnit, startSample+duration, channelMap) == false) {
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_NOT_ARRIVED, startSample, duration);
throw new RawDataUnavailableException(this, RawDataUnavailableException.DATA_NOT_ARRIVED,
firstSample, lastSample, startSample, duration);
}
int nChan = PamUtils.getNumChannels(channelMap);

View File

@ -21,6 +21,10 @@ public class RawDataUnavailableException extends Exception {
private long startSample;
private int duration;
private long availableStart;
private long availableEnd;
/**
* @return the dataCause
*/
@ -34,10 +38,12 @@ public class RawDataUnavailableException extends Exception {
* @param startSample
* @param cause
*/
public RawDataUnavailableException(PamRawDataBlock rawDataBlock, int dataCause, long startSample, int duration) {
public RawDataUnavailableException(PamRawDataBlock rawDataBlock, int dataCause, long availStart, long availEnd, long startSample, int duration) {
super();
this.rawDataBlock = rawDataBlock;
this.dataCause = dataCause;
this.availableStart = availStart;
this.availableEnd = availEnd;
this.startSample = startSample;
this.duration = duration;
}
@ -55,8 +61,8 @@ public class RawDataUnavailableException extends Exception {
public String getMessage() {
switch (dataCause) {
case DATA_ALREADY_DISCARDED:
return String.format("Samples %d length %d requested from %s have already been discarded", startSample, duration,
rawDataBlock.getDataName());
return String.format("Samples %d length %d requested from %s have already been discarded. %d to %d available", startSample, duration,
rawDataBlock.getDataName(), availableStart, availableEnd);
case DATA_NOT_ARRIVED:
return String.format("Samples %d length %d requested from %s have not yet arrived",
startSample, duration, rawDataBlock.getDataName());
@ -70,6 +76,20 @@ public class RawDataUnavailableException extends Exception {
}
return super.getMessage();
}
/**
* @return the availableStart
*/
public long getAvailableStart() {
return availableStart;
}
/**
* @return the availableEnd
*/
public long getAvailableEnd() {
return availableEnd;
}
}

View File

@ -8,6 +8,7 @@ import Acquisition.AcquisitionControl;
import Acquisition.AcquisitionProcess;
import Acquisition.DaqSystem;
import PamController.PamController;
import PamModel.PamModel;
import PamUtils.PamCalendar;
import PamguardMVC.debug.Debug;
@ -130,6 +131,7 @@ public class ThreadedObserver implements PamObserver {
}
}
}
h += PamModel.getPamModel().getPamModelSettings().getThreadingJitterMillis()*2;
return h;
}

View File

@ -140,10 +140,13 @@ public class ClipProcess extends SpectrogramMarkProcess {
clipErr = clipRequest.clipBlockProcess.processClipRequest(clipRequest);
switch (clipErr) {
case 0: // no error - clip should have been created.
li.remove();
break;
case RawDataUnavailableException.DATA_ALREADY_DISCARDED:
case RawDataUnavailableException.INVALID_CHANNEL_LIST:
// System.out.println("Clip error : " + clipErr);
// System.out.println("Clip error : " + clipErr);
li.remove();
break;
case RawDataUnavailableException.DATA_NOT_ARRIVED:
continue; // hopefully, will get this next time !
}
@ -230,6 +233,17 @@ public class ClipProcess extends SpectrogramMarkProcess {
}
minH = Math.max(minH, clipBlockProcesses[i].getRequiredDataHistory(o, arg));
}
ClipRequest firstClip = null;
synchronized(clipRequestSynch) {
if (clipRequestQueue.size() > 0) {
firstClip = clipRequestQueue.get(0);
}
}
if (firstClip != null) {
minH += firstClip.dataUnit.getDurationInMilliseconds();
}
minH += Math.max(3000, 192000/(long)getSampleRate());
if (specMouseDown) {
minH = Math.max(minH, masterClockTime-specMouseDowntime);
@ -453,8 +467,7 @@ public class ClipProcess extends SpectrogramMarkProcess {
this.dataBlock = dataBlock;
this.clipGenSetting = clipGenSetting;
clipBudgetMaker = new StandardClipBudgetMaker(this);
dataBlock.addObserver(this, true);
dataBlock.addObserver(this, false);
if (rawDataBlock != null) {
int chanMap = decideChannelMap(rawDataBlock.getChannelMap());
@ -499,6 +512,7 @@ public class ClipProcess extends SpectrogramMarkProcess {
rawData = rawDataBlock.getSamples(rawStart, (int) (rawEnd-rawStart), channelMap);
}
catch (RawDataUnavailableException e) {
System.out.println(e.getMessage());
return e.getDataCause();
}
if (rawData==null) {
@ -583,9 +597,15 @@ public class ClipProcess extends SpectrogramMarkProcess {
public PamObserver getObserverObject() {
return clipProcess.getObserverObject();
}
@Override
public long getRequiredDataHistory(PamObservable o, Object arg) {
return (long) ((clipGenSetting.preSeconds+clipGenSetting.postSeconds) * 1000.);
long h = (long) ((clipGenSetting.preSeconds+clipGenSetting.postSeconds) * 1000.);
// if (dataBlock != null) {
// can't do this since dataBlock is observing this, so will wrap.
// h += dataBlock.getRequiredHistory();
// }
return h;
}