LSST Applications 30.0.7,g0e76e35be5+e8e946ae08,g19811a7679+138f7293ba,g199a45376c+5e234f8357,g1fd858c14a+2f48dbc4c4,g262e1987ae+fb36cac54d,g29ae962dfc+d9108a0941,g2c21b0017a+4f59a27f16,g31e44d4a5c+b0138be388,g33ac35c1f1+28b9f72785,g35bb328faa+b0138be388,g40c9b15c53+823ad735c1,g47891489e3+bcc48a0b46,g53246c7159+b0138be388,g64539dfbff+e8e946ae08,g67b6fd64d1+bcc48a0b46,g74acd417e5+422380537a,g76965917b2+a5ca99c4d9,g786e29fd12+796b79145d,g7aefaa3e3d+dc0c200193,g86b635cae8+734fe384f0,g87389fa792+d8b5378923,g89139ef638+bcc48a0b46,g8bbb235e95+3f4f7f9447,g8ea07a8fe4+78a4c88802,g9290983e33+ffdc83c6f7,g92c671f44c+e8e946ae08,gaa753fd333+03f406da14,gbf99507273+b0138be388,gc49b57b85e+8df26ee1f0,gca7fc764a6+bcc48a0b46,gd7ef33dd92+bcc48a0b46,gdab6d2f7ff+422380537a,ge1c02a5578+b0138be388,ge410e46f29+bcc48a0b46,ge80df9fc40+e6db5413d1,geaed405ab2+1de65a85c6,gf5dcc679e7+35a0ce2edd,gf5f1c85443+e8e946ae08
LSST Data Management Base Package
Loading...
Searching...
No Matches
felis.py
Go to the documentation of this file.
1import numpy as np
2from typing import Mapping, Any
3from textwrap import wrap
4
5import argparse
6import sys
7import yaml
8
9# Default tables to process if none specified
10DEFAULT_TABLES = ["SSObject", "SSSource", "mpc_orbits", "current_identifications", "numbered_identifications"]
11
12
13# ----------------------------------------------------------------------
14# Helper: timestamp precision → numpy time unit
15# ----------------------------------------------------------------------
16def _timestamp_precision_to_unit(prec: int) -> str:
17 """
18 Map a Felis timestamp precision to a numpy datetime64 unit.
19
20 prec = number of decimal places of seconds to retain.
21 """
22 if prec <= 0:
23 return "s"
24 elif prec <= 2:
25 return "ms"
26 elif prec <= 5:
27 return "us"
28 else:
29 return "ns" # max precision numpy supports
30
31
32# ----------------------------------------------------------------------
33# Column → NumPy dtype
34# ----------------------------------------------------------------------
35def _felis_column_to_numpy_dtype(col: Mapping[str, Any]) -> tuple[str, Any]:
36 name = col["name"]
37
38 dt = col.get("datatype")
39 if dt is None:
40 raise ValueError(f"Column {name!r} has no datatype")
41
42 dt = dt.lower()
43
44 # ---------- numeric ------------------------------------------------------
45 if dt == "int8":
46 return name, np.int8
47 if dt in ("int16", "short"):
48 return name, np.int16
49 if dt in ("int32", "int"):
50 return name, np.int32
51 if dt in ("int64", "long", "bigint"):
52 return name, np.int64
53
54 if dt == "uint8":
55 return name, np.uint8
56 if dt == "uint16":
57 return name, np.uint16
58 if dt == "uint32":
59 return name, np.uint32
60 if dt == "uint64":
61 return name, np.uint64
62
63 if dt in ("float32", "float"):
64 return name, np.float32
65 if dt in ("float64", "double"):
66 return name, np.float64
67
68 if dt in ("bool", "boolean"):
69 return name, np.bool_
70
71 # ---------- timestamps ---------------------------------------------------
72 if dt == "timestamp":
73 prec = col.get("precision", 0)
74 if not isinstance(prec, int):
75 raise ValueError(f"Timestamp field {name!r} has non-integer precision")
77 return name, np.dtype(f"datetime64[{unit}]")
78
79 # ---------- fixed-size binary via length? --------------------------------
80 # If you want to support `datatype: binary` later, we can add that here.
81
82 # ---------- strings ------------------------------------------------------
83 if dt in ("string", "unicode", "str", "char"):
84 L = col.get("length")
85 if isinstance(L, int):
86 return name, np.dtype(f"U{L}")
87 return name, np.dtype("U")
88
89 # ---------- lists / arrays / unknown → object ----------------------------
90 return name, object
91
92
93# ----------------------------------------------------------------------
94# Table → NumPy dtype with metadata
95# ----------------------------------------------------------------------
96def felis_table_to_numpy_dtype(table: Mapping[str, Any]) -> np.dtype:
97 """
98 Convert a Felis table definition (YAML → dict) into a NumPy dtype.
99 Metadata stored:
100
101 dtype.metadata["description"] = table description
102 dtype.metadata["columns"] = {name: "[unit] description"}
103 """
104 cols = table.get("columns")
105 if cols is None:
106 raise ValueError("Table definition has no 'columns' key")
107
108 # Field dtypes
109 fields = [_felis_column_to_numpy_dtype(c) for c in cols]
110
111 # Table-level description
112 table_desc = table.get("description")
113
114 # Column metadata with optional unit prepended
115 colmeta = {}
116 for c in cols:
117 name = c["name"]
118 desc = c.get("description")
119 unit = c.get("ivoa:unit")
120
121 if unit is not None:
122 if desc:
123 full = f"[{unit}] {desc}"
124 else:
125 full = f"[{unit}]"
126 else:
127 full = desc
128
129 if full is not None:
130 colmeta[name] = full
131
132 metadata = {}
133 if table_desc is not None:
134 metadata["description"] = table_desc
135 if colmeta:
136 metadata["columns"] = colmeta
137
138 if metadata:
139 return np.dtype(fields, metadata=metadata)
140 return np.dtype(fields)
141
142
144 dtype: np.dtype,
145 table_name: str,
146 target_comment_col: int = 36,
147 max_line_length: int = 110,
148) -> str:
149 """
150 Pretty-print a structured NumPy dtype (with Felis-derived metadata)
151 as valid, readable Python code:
152
153 # Wrapped table description...
154 #
155 <table_name>Dtype = np.dtype([
156 ('field1', '<i8'), # comment...
157 ('very_long_field', ... # comment juts, next long field aligned
158 # to same jutter column...
159 ])
160
161 Parameters
162 ----------
163 dtype : np.dtype
164 Structured dtype with metadata fields:
165 metadata["description"] : table-level description (optional)
166 metadata["columns"] : {col_name: per-column description}
167 table_name : str
168 Name used for the assignment, e.g. <table_name>Dtype.
169 target_comment_col : int, default=36
170 Preferred starting column for comments when the field fits before it.
171 If the field text is longer than this, a "juttering group" alignment
172 logic kicks in to prevent jagged right edges.
173 max_line_length : int, default=110
174 Maximum line length for wrapping table descriptions and comments.
175 Lines will be wrapped to be strictly less than this length.
176
177 Behavior
178 --------
179 * Table description is wrapped to < max_line_length chars, placed above
180 dtype assignment, followed by a blank line.
181 * Field comments:
182 - If the field length <= target_comment_col - 1 → comment starts at
183 target_comment_col, and the "juttering group" resets.
184 - If the field length >= target_comment_col → the comment "juts out".
185 + First such field sets the group's jutter column.
186 + Next juttering fields use max(previous_jut_col, natural_jut_col).
187 + This avoids jaggedness.
188 * Final lines never exceed max_line_length - 1 chars.
189 * Per-field comments wrap into at most 2 lines, with "..." if needed.
190 * dtype.metadata is NOT emitted; only used for comments.
191
192 Returns
193 -------
194 str
195 Pretty Python code string.
196 """
197 if not isinstance(dtype, np.dtype) or not dtype.fields:
198 raise TypeError("Expected a structured numpy.dtype with fields")
199
200 md = dtype.metadata or {}
201 table_desc = md.get("description")
202 col_descs = md.get("columns", {})
203
204 lines: list[str] = []
205
206 # ---- Table description: wrap to < max_line_length chars -----------
207 if table_desc:
208 txt = f"{table_name}: {table_desc}"
209 for w in wrap(txt, width=max_line_length - 3):
210 lines.append(f"# {w}")
211
212 # ---- Begin dtype assignment ---------------------------------------------
213 lines.append(f"{table_name}Dtype = np.dtype([")
214
215 # Build base field specs
216 field_entries = []
217 for name, (ftype, _) in dtype.fields.items():
218 base = f" ({name!r}, {ftype.str!r}),"
219 comment = col_descs.get(name)
220 if comment:
221 comment = " ".join(str(comment).split()) # normalize whitespace
222 field_entries.append((base, comment))
223
224 last_jut_comment_col = None # track juttering group alignment
225
226 # ---- Process each field with smoothed jutter alignment ------------------
227 for base, comment in field_entries:
228 base_len = len(base)
229
230 if not comment:
231 lines.append(base)
232 # Reset jutter group if this field doesn't jut
233 if base_len <= target_comment_col - 1:
234 last_jut_comment_col = None
235 continue
236
237 # Determine comment_col for this field
238 if base_len <= target_comment_col - 1:
239 # field fits → align to target column, reset jutter group
240 comment_col = target_comment_col
241 last_jut_comment_col = None
242 else:
243 # field juts
244 natural_col = base_len + 1 # one space after field
245
246 if last_jut_comment_col is None:
247 # first jutter field
248 comment_col = natural_col
249 last_jut_comment_col = comment_col
250 else:
251 # subsequent jutter fields (apply smoothing)
252 if natural_col <= last_jut_comment_col:
253 # would jut less; align with previous jutter column
254 comment_col = last_jut_comment_col
255 else:
256 # juts further; update group
257 comment_col = natural_col
258 last_jut_comment_col = comment_col
259
260 # Compute max allowed comment length for < max_line_length total chars
261 max_comment_width = max(10, (max_line_length - 1) - (comment_col + 2)) # 2 for "# "
262
263 # Wrap comment into segments
264 words = comment.split()
265 segments = []
266 cur = ""
267 for w in words:
268 if not cur:
269 cur = w
270 elif len(cur) + 1 + len(w) <= max_comment_width:
271 cur += " " + w
272 else:
273 segments.append(cur)
274 cur = w
275 if cur:
276 segments.append(cur)
277
278 # Ellipsize to 2 lines max
279 if len(segments) > 2:
280 segments = segments[:2]
281 if len(segments[-1]) + 3 > max_comment_width:
282 segments[-1] = segments[-1][: max_comment_width - 3].rstrip()
283 segments[-1] += "..."
284
285 # Emit first line
286 pad = " " * (comment_col - base_len)
287 lines.append(f"{base}{pad}# {segments[0]}")
288
289 # Continuations
290 cont_prefix = " " * comment_col + "# "
291 for seg in segments[1:]:
292 lines.append(f"{cont_prefix}{seg}")
293
294 lines.append("])")
295 return "\n".join(lines)
296
297
298def main():
299 parser = argparse.ArgumentParser(description="Generate NumPy dtypes from Felis YAML schema")
300 parser.add_argument("felis_yaml_file", help="Path to the YAML schema file")
301 parser.add_argument(
302 "table_names",
303 nargs="*",
304 default=DEFAULT_TABLES,
305 help=(f"Names of tables to process (default: {', '.join(DEFAULT_TABLES)})"),
306 )
307 args = parser.parse_args()
308
309 with open(args.felis_yaml_file) as fp:
310 schema = yaml.safe_load(fp)
311
312 table_schemas = {t["name"]: t for t in schema["tables"]}
313
314 # Print header
315 print("# ***** GENERATED FILE, DO NOT EDIT BY HAND *****")
316 print("# ruff: noqa: W505")
317 print(f"# generated with {' '.join(sys.argv)} # noqa: E501")
318 print()
319 print("import numpy as np")
320 print()
321
322 for i, table in enumerate(args.table_names):
323 dtype = felis_table_to_numpy_dtype(table_schemas[table])
324 print(pretty_print_dtype(dtype, table))
325 if i < len(args.table_names) - 1:
326 print()
327
328
329if __name__ == "__main__":
330 main()
tuple[str, Any] _felis_column_to_numpy_dtype(Mapping[str, Any] col)
Definition felis.py:35
str _timestamp_precision_to_unit(int prec)
Definition felis.py:16
str pretty_print_dtype(np.dtype dtype, str table_name, int target_comment_col=36, int max_line_length=110)
Definition felis.py:148
np.dtype felis_table_to_numpy_dtype(Mapping[str, Any] table)
Definition felis.py:96