LSST Applications 30.0.7,g0e76e35be5+e8e946ae08,g19811a7679+138f7293ba,g199a45376c+5e234f8357,g1fd858c14a+2f48dbc4c4,g262e1987ae+fb36cac54d,g29ae962dfc+d9108a0941,g2c21b0017a+4f59a27f16,g31e44d4a5c+b0138be388,g33ac35c1f1+28b9f72785,g35bb328faa+b0138be388,g40c9b15c53+823ad735c1,g47891489e3+bcc48a0b46,g53246c7159+b0138be388,g64539dfbff+e8e946ae08,g67b6fd64d1+bcc48a0b46,g74acd417e5+422380537a,g76965917b2+a5ca99c4d9,g786e29fd12+796b79145d,g7aefaa3e3d+dc0c200193,g86b635cae8+734fe384f0,g87389fa792+d8b5378923,g89139ef638+bcc48a0b46,g8bbb235e95+3f4f7f9447,g8ea07a8fe4+78a4c88802,g9290983e33+ffdc83c6f7,g92c671f44c+e8e946ae08,gaa753fd333+03f406da14,gbf99507273+b0138be388,gc49b57b85e+8df26ee1f0,gca7fc764a6+bcc48a0b46,gd7ef33dd92+bcc48a0b46,gdab6d2f7ff+422380537a,ge1c02a5578+b0138be388,ge410e46f29+bcc48a0b46,ge80df9fc40+e6db5413d1,geaed405ab2+1de65a85c6,gf5dcc679e7+35a0ce2edd,gf5f1c85443+e8e946ae08
LSST Data Management Base Package
Loading...
Searching...
No Matches
ssobject.py
Go to the documentation of this file.
1import pandas as pd
2import numpy as np
3from functools import partial
4from . import photfit
5from . import util
6from . import schema
7from .moid import MOIDSolver, earth_orbit
8import argparse
9import sys
10
11# The only columns we need from DiaSource.
12
13DIA_COLUMNS = [
14 "diaSourceId", "midpointMjdTai", "ra", "dec", "extendedness",
15 "band", "psfFlux", "psfFluxErr"
16]
17DIA_DTYPES = [int, float, float, float, float, str, float, float]
18
19
20def nJy_to_mag(f_njy):
21 """
22 Convert flux density in nanoJanskys (nJy) to AB magnitude.
23
24 Parameters
25 ----------
26 f_njy : float or array-like
27 Flux density in nanoJanskys.
28
29 Returns
30 -------
31 float or array-like
32 AB magnitude corresponding to the input flux density.
33 """
34 return 31.4 - 2.5 * np.log10(f_njy)
35
36
37def nJy_err_to_mag_err(f_njy, f_err_njy):
38 """
39 Convert flux error in nanoJanskys to magnitude error.
40
41 Parameters
42 ----------
43 f_njy : float
44 Flux in nanoJanskys.
45 f_err_njy : float
46 Flux error in nanoJanskys.
47
48 Returns
49 -------
50 float
51 Magnitude error.
52 """
53 return 1.085736 * (f_err_njy / f_njy)
54
55
57 row, sss, fixedG12=None, magSigmaFloor=0.0, nSigmaClip=None,
58):
59 # just verify we didn't screw up something
60 assert sss["ssObjectId"].nunique() == 1
61
62 # Metadata columns
63 row["ssObjectId"] = sss["ssObjectId"].iloc[0]
64 row["firstObservationMjdTai"] = sss["dia_midpointMjdTai"].min()
65
66 if "discoverySubmissionDate" in row.dtype.names: # DP2 does not have this field
67 # FIXME: here I arbitrarily guess we discover everything 7 days
68 # after first obsv. we should really pull this out of the obs_sbn tbl.
69 row["discoverySubmissionDate"] = row["firstObservationMjdTai"] + 7.0
70 row["arc"] = np.ptp(sss["dia_midpointMjdTai"])
71 row["designation"] = sss["designation"].iloc[0]
72
73 # observation counts
74 row["nObs"] = len(sss)
75
76 # per band entries
77 for band in "ugrizy":
78 df = sss[sss["dia_band"] == band]
79
80 # set defaults for this band (equivalents of NULL)
81 row[f"{band}_Chi2"] = np.nan
82 row[f"{band}_G12"] = np.nan
83 row[f"{band}_G12Err"] = np.nan
84 row[f"{band}_H"] = np.nan
85 row[f"{band}_H_{band}_G12_Cov"] = np.nan
86 row[f"{band}_HErr"] = np.nan
87 row[f"{band}_nObsUsed"] = 0
88 row[f"{band}_phaseAngleMin"] = np.nan
89 row[f"{band}_phaseAngleMax"] = np.nan
90
91 nBandObs = len(df)
92 row[f"{band}_nObs"] = nBandObs
93 if nBandObs > 0:
94 paMin, paMax = df["phaseAngle"].min(), df["phaseAngle"].max()
95 row[f"{band}_phaseAngleMin"] = paMin
96 row[f"{band}_phaseAngleMax"] = paMax
97
98 if nBandObs > 1:
99 # do the absmag/slope fits, if there are at least two
100 # data points
101 H, G12, sigmaH, sigmaG12, covHG12, chi2dof, nobsv = photfit.fitHG12(
102 df["dia_psfMag"], df["dia_psfMagErr"],
103 df["phaseAngle"], df["topoRange"], df["helioRange"],
104 fixedG12=fixedG12, magSigmaFloor=magSigmaFloor,
105 nSigmaClip=nSigmaClip,
106 )
107 nDof = nBandObs - (1 if fixedG12 is not None else 2)
108 # print(provID, band, H, G12, sigmaH, sigmaG12, covHG12,
109 # chi2dof, nobsv)
110
111 # mark if the fit failed
112 if np.isnan(G12):
113 row[f"{band}_slope_fit_failed"] = True
114 # FIXME: if fitting fails, we should revert to simple
115 # estimation of H using a fiducial G12 value, storing
116 # that G12 as well.
117
118 row[f"{band}_Chi2"] = chi2dof * nDof
119 row[f"{band}_G12"] = G12
120 row[f"{band}_G12Err"] = sigmaG12
121 row[f"{band}_H"] = H
122 row[f"{band}_H_{band}_G12_Cov"] = covHG12
123 row[f"{band}_HErr"] = sigmaH
124 row[f"{band}_nObsUsed"] = nobsv
125
126 # Extendedness
127 row["extendednessMin"] = sss["dia_extendedness"].min()
128 row["extendednessMax"] = sss["dia_extendedness"].max()
129 row["extendednessMedian"] = sss["dia_extendedness"].median()
130
131
133 sss, dia, mpcorb, fixedG12=None, magSigmaFloor=0.0,
134 nSigmaClip=None,
135):
136 """
137 Compute solar system object properties by joining and processing
138 SSSource, DiaSource, and MPC orbit data.
139
140 This function takes a pre-grouped SSSource table, joins it with
141 DiaSource data, computes per-object quantities, and calculates
142 additional orbital parameters like Tisserand J and Minimum Orbit
143 Intersection Distance (MOID) with Earth for matching objects.
144
145 Parameters
146 ----------
147 sss : pandas.DataFrame
148 SSSource table, pre-grouped by 'ssObjectId'. Must be sorted by
149 'ssObjectId' for correct grouping. Contains columns like
150 'ssObjectId', 'diaSourceId', etc.
151 dia : pandas.DataFrame
152 DiaSource table with columns prefixed as 'dia_' in the join.
153 Must include 'dia_diaSourceId', 'dia_psfFlux', 'dia_psfFluxErr',
154 etc.
155 mpcorb : pandas.DataFrame
156 MPC orbit data with columns like
157 'unpacked_primary_provisional_designation', 'q', 'e', 'i',
158 'node', 'argperi'.
159
160 Returns
161 -------
162 numpy.ndarray
163 Array of ssObject records with dtype schema.ssObjectDtype,
164 containing computed properties for each unique ssObjectId,
165 including magnitudes, orbital elements, Tisserand J, and
166 MOID-related values.
167
168 Raises
169 ------
170 AssertionError
171 If 'sss' is not pre-grouped by 'ssObjectId', or if DiaSources
172 are missing after join.
173
174 Notes
175 -----
176 - The function assumes 'sss' is large and avoids internal
177 sorting/copying for efficiency.
178 - Tisserand J and MOID are computed only for objects matching
179 designations in 'mpcorb'.
180 - MOID computation uses a MOIDSolver for each matched object.
181 """
182
183 # assert that sss is pre-grouped by ssObjectId
184 assert util.values_grouped(sss["ssObjectId"]), (
185 "SSSource table must be pre-grouped by ssObjectId. "
186 "An easy way to do this is to sort by ssObjectId before calling compute_ssobject(). "
187 "The grouping is required for correct per-object computations, and since SSSource is "
188 "typically large and we want to avoid copies, it's not done internally."
189 )
190
191 # Join the DiaSource parts we're interested in to our SSSource table
192 num = len(sss)
193 dia_tmp = dia[DIA_COLUMNS].add_prefix("dia_") # FIXME: does this cause unnececessary copy?
194 # FIXME: The diaSourceId should really be uint64. But Felis doesn't speak
195 # uint64, but only knows about int64. Yet the pipeline produces uint64
196 # diaSourceId in the dia_source dataset. So we have to cast here to int64
197 # to make the join work (otherwise pyarrow tries to cast to float64, and
198 # the whole thing gloriously explodes).
199 dia_tmp["dia_diaSourceId"] = dia_tmp["dia_diaSourceId"].astype("int64[pyarrow]")
200 sss = sss.merge(dia_tmp, left_on="diaSourceId", right_on="dia_diaSourceId", how="inner")
201 assert num == len(sss), f"{num - len(sss)} DiaSources found missing."
202 del sss["dia_diaSourceId"]
203 del dia_tmp
204
205 # add magnitude columns
206 sss["dia_psfMag"] = nJy_to_mag(sss["dia_psfFlux"])
207 sss["dia_psfMagErr"] = nJy_err_to_mag_err(sss["dia_psfFlux"], sss["dia_psfFluxErr"])
208
209 # Pre-create the empty array
210 totalNumObjects = np.unique(sss["ssObjectId"]).size
211 obj = np.zeros(totalNumObjects, dtype=schema.SSObjectDtype)
212
213 # compute per-object quantities
214 callback = partial(
215 compute_ssobject_entry, fixedG12=fixedG12,
216 magSigmaFloor=magSigmaFloor, nSigmaClip=nSigmaClip,
217 )
218 util.group_by([sss], "ssObjectId", callback, out=obj)
219
220 #
221 # compute columns that can be efficiently computed in a vector fashon
222 #
223 # Tisserand J
224
225 if mpcorb is not None:
226 # inner join by provisional designation. We allow for some objects to be
227 # missing from mpcorb (this should not happen often, but it did in DP1).
228 # FIXME: at some point require that no objects are missing. I _think_ that
229 # shouldn't happen in normal operations.
230 oidx, midx = util.argjoin(
231 obj["designation"].astype("U"),
232 mpcorb["unpacked_primary_provisional_designation"].to_numpy().astype("U"),
233 )
234 assert np.all(
235 mpcorb["unpacked_primary_provisional_designation"].take(midx)
236 == obj["designation"][oidx].astype("U")
237 )
238 q, e, i, node, argperi, epoch_mjd = util.unpack(
239 mpcorb["q e i node argperi epoch_mjd".split()].take(midx)
240 )
241 a = q / (1.0 - e)
242 obj["tisserand_J"][oidx] = util.tisserand_jupiter(a, e, i)
243
244 # MOID computation
245 solver = MOIDSolver()
246 for i, el_obj in enumerate(zip(a, e, i, node, argperi)):
247 earth = earth_orbit(epoch_mjd[i])
248 (moid, deltaV, eclon, trueEarth, trueObject) = solver.compute(earth, el_obj)
249 row = obj[oidx[i]]
250 row["MOIDEarth"] = moid
251 row["MOIDEarthDeltaV"] = deltaV
252 row["MOIDEarthEclipticLongitude"] = eclon
253 row["MOIDEarthTrueAnomaly"] = trueEarth
254 row["MOIDEarthTrueAnomalyObject"] = trueObject
255
256 return obj
257
258
259def main():
260 """
261 CLI entry point for building SSObject table from SSSource,
262 DiaSource, and MPC orbit data.
263 """
264 parser = argparse.ArgumentParser(
265 description="Build SSObject table from SSSource, DiaSource, and MPC orbit Parquet files",
266 formatter_class=argparse.RawDescriptionHelpFormatter,
267 epilog="""
268Examples:
269 ssp-build-ssobject sssource.parquet dia_sources.parquet mpc_orbits.parquet --output ssobject.parquet
270 """,
271 )
272
273 parser.add_argument("sssource_parquet", help="Path to SSSource Parquet file")
274 parser.add_argument("diasource_parquet", help="Path to DiaSource Parquet file")
275 parser.add_argument("mpcorb_parquet", help="Path to MPC orbits Parquet file")
276 parser.add_argument("--output", "-o", required=True, help="Path to output SSObject Parquet file")
277 parser.add_argument(
278 "--reraise",
279 action="store_true",
280 help="Re-raise exceptions instead of exiting gracefully (for debugging)",
281 )
282
283 args = parser.parse_args()
284
285 try:
286 # Load SSSource
287 print(f"Loading SSSource from {args.sssource_parquet}...")
288 sss = pd.read_parquet(args.sssource_parquet, engine="pyarrow", dtype_backend="pyarrow").reset_index(
289 drop=True
290 )
291 num = len(sss)
292 print(f"Loaded {num:,} SSSource rows")
293
294 # Load DiaSource with required columns
295 dia_columns = [
296 "diaSourceId",
297 "midpointMjdTai",
298 "ra",
299 "dec",
300 "extendedness",
301 "band",
302 "psfFlux",
303 "psfFluxErr",
304 ]
305 print(f"Loading DiaSource from {args.diasource_parquet}...")
306 dia = pd.read_parquet(
307 args.diasource_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=dia_columns
308 ).reset_index(drop=True)
309 print(f"Loaded {len(dia):,} DiaSource rows")
310
311 # Ensure diaSourceId is uint64
312 assert np.all(dia["diaSourceId"] >= 0)
313 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]")
314
315 # Load MPC orbits
316 mpcorb_columns = [
317 "unpacked_primary_provisional_designation",
318 "a",
319 "q",
320 "e",
321 "i",
322 "node",
323 "argperi",
324 "peri_time",
325 "mean_anomaly",
326 "epoch_mjd",
327 "h",
328 "g",
329 ]
330 print(f"Loading MPC orbits from {args.mpcorb_parquet}...")
331 mpcorb = pd.read_parquet(
332 args.mpcorb_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=mpcorb_columns
333 ).reset_index(drop=True)
334 print(f"Loaded {len(mpcorb):,} MPC orbit rows")
335
336 # Compute SSObject
337 print("Computing SSObject data...")
338 obj = compute_ssobject(sss, dia, mpcorb)
339
340 # Save result
341 print(f"Saving {len(obj):,} SSObject rows to {args.output}...")
342 util.struct_to_parquet(obj, args.output)
343
344 print(f"Success! Created SSObject with {len(obj):,} objects")
345 print(f"Row size: {obj.dtype.itemsize:,} bytes, Total size: {obj.nbytes:,} bytes")
346
347 except Exception as e:
348 print(f"Error: {e}", file=sys.stderr)
349 if args.reraise:
350 raise
351 sys.exit(1)
352
353
354if __name__ == "__main__":
355 input_dir = "./analysis/inputs"
356 output_dir = "./analysis/outputs"
357
358 #
359 # Loads
360 #
361
362 # load SSObject
363 sss = pd.read_parquet(
364 f"{output_dir}/sssource.parquet", engine="pyarrow", dtype_backend="pyarrow"
365 ).reset_index(drop=True)
366 num = len(sss)
367
368 # load corresponding DiaSource
369 dia = pd.read_parquet(
370 f"{input_dir}/dia_sources.parquet", engine="pyarrow", dtype_backend="pyarrow", columns=DIA_COLUMNS
371 ).reset_index(drop=True)
372
373 # FIXME: I'm not sure why the datatype is int and not uint here.
374 # Investigate upstream...
375 assert np.all(dia["diaSourceId"] >= 0)
376 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]")
377
378 # Load mpcorb
379 mpcorb = pd.read_parquet(
380 f"{input_dir}/mpc_orbits.parquet",
381 engine="pyarrow",
382 dtype_backend="pyarrow",
383 columns=[
384 "unpacked_primary_provisional_designation",
385 "a",
386 "q",
387 "e",
388 "i",
389 "node",
390 "argperi",
391 "peri_time",
392 "mean_anomaly",
393 "epoch_mjd",
394 "h",
395 "g",
396 ],
397 ).reset_index(drop=True)
398
399 #
400 # Business logic
401 #
402 obj = compute_ssobject(sss, dia, mpcorb)
403
404 #
405 # Save
406 #
407 util.struct_to_parquet(obj, f"{output_dir}/ssobject.parquet")
408
409 print(f"row_length={obj.dtype.itemsize:,} bytes, rows={len(obj):,}, {obj.nbytes:,} bytes total")
compute_ssobject(sss, dia, mpcorb, fixedG12=None, magSigmaFloor=0.0, nSigmaClip=None)
Definition ssobject.py:135
compute_ssobject_entry(row, sss, fixedG12=None, magSigmaFloor=0.0, nSigmaClip=None)
Definition ssobject.py:58
nJy_err_to_mag_err(f_njy, f_err_njy)
Definition ssobject.py:37