2025-02-05 10:21:50 -05:00
|
|
|
import json
|
|
|
|
import logging
|
2025-02-24 16:45:50 -05:00
|
|
|
import gzip
|
|
|
|
import pickle
|
2025-02-05 10:21:50 -05:00
|
|
|
from pathlib import Path
|
|
|
|
import click
|
2025-02-24 16:45:50 -05:00
|
|
|
from scripts.data_gov.helpers import fetch_data_gov_packages
|
|
|
|
from datetime import datetime
|
|
|
|
from typing import Dict, Any
|
|
|
|
from tqdm import tqdm
|
|
|
|
import deepdiff
|
|
|
|
import orjson
|
2025-02-05 10:21:50 -05:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2025-02-24 16:45:50 -05:00
|
|
|
@click.group()
|
|
|
|
def cli():
|
|
|
|
"""Data.gov package management commands."""
|
|
|
|
pass
|
|
|
|
|
|
|
|
@cli.command()
|
|
|
|
@click.argument('output_path', type=click.Path(path_type=Path))
|
2025-02-05 10:21:50 -05:00
|
|
|
@click.option('--rows-per-page', '-r', type=int, default=1000,
|
|
|
|
help='Number of results to fetch per page.')
|
|
|
|
@click.option('--start-date', '-s', type=str, default=None,
|
|
|
|
help='Start date for fetching packages in YYYY-MM-DD format.')
|
2025-02-24 16:45:50 -05:00
|
|
|
def fetch(output_path: Path, rows_per_page: int, log_level: str, start_date: str):
|
|
|
|
"""Fetch all package data from data.gov API and save to gzipped JSONL file."""
|
|
|
|
if output_path.is_dir():
|
|
|
|
current_date = datetime.now().strftime('%Y%m%d')
|
|
|
|
output_path = output_path / f'data_{current_date}.jsonl.gz'
|
|
|
|
|
|
|
|
logger.info(f"Writing to {output_path}")
|
|
|
|
|
|
|
|
with gzip.open(output_path, 'at') as f:
|
2025-02-05 10:21:50 -05:00
|
|
|
for results in fetch_data_gov_packages(rows_per_page=rows_per_page, start_date=start_date):
|
|
|
|
for package in results:
|
|
|
|
f.write(json.dumps(package) + '\n')
|
|
|
|
|
2025-02-24 16:45:50 -05:00
|
|
|
@cli.command()
|
|
|
|
@click.argument('file1', type=click.Path(exists=True, path_type=Path))
|
|
|
|
@click.argument('file2', type=click.Path(exists=True, path_type=Path))
|
|
|
|
def compare(file1: Path, file2: Path, log_level: str):
|
|
|
|
"""Compare two gzipped JSONL files by indexing on the 'name' key."""
|
|
|
|
def load_jsonl_index(file_path: Path) -> Dict[str, Any]:
|
|
|
|
# Check for pickle file
|
|
|
|
pickle_path = file_path.with_suffix('.pickle')
|
|
|
|
if pickle_path.exists():
|
|
|
|
logger.info(f"Loading cached index from {pickle_path}")
|
|
|
|
with open(pickle_path, 'rb') as f:
|
|
|
|
return pickle.load(f)
|
|
|
|
|
|
|
|
# If no pickle file exists, load from JSONL and create pickle
|
|
|
|
index = {}
|
|
|
|
with gzip.open(file_path, 'rt') as f:
|
|
|
|
for line in tqdm(f, desc=f"Loading {file_path}"):
|
|
|
|
record = orjson.loads(line)
|
|
|
|
index[record['name']] = record
|
|
|
|
|
|
|
|
# Save to pickle for future runs
|
|
|
|
logger.info(f"Saving index to {pickle_path}")
|
|
|
|
with open(pickle_path, 'wb') as f:
|
|
|
|
pickle.dump(index, f)
|
|
|
|
|
|
|
|
return index
|
|
|
|
|
|
|
|
logger.info(f"Loading {file1}")
|
|
|
|
index1 = load_jsonl_index(file1)
|
|
|
|
logger.info(f"Loading {file2}")
|
|
|
|
index2 = load_jsonl_index(file2)
|
|
|
|
|
|
|
|
names1 = set(index1.keys())
|
|
|
|
names2 = set(index2.keys())
|
|
|
|
|
|
|
|
only_in_file1 = [index1[name] for name in names1 - names2]
|
|
|
|
only_in_file2 = [index2[name] for name in names2 - names1]
|
|
|
|
names_in_both = names1 & names2
|
|
|
|
changed = [[index1[name], index2[name]] for name in tqdm(names_in_both, desc="Changed") if index1[name] != index2[name]]
|
|
|
|
changed_deep = [[diff.to_json(), item1, item2] for item1, item2 in tqdm(changed[:1000], desc="Changed (deep)") if (diff := deepdiff.DeepDiff(item1, item2, ignore_order=True))]
|
|
|
|
|
|
|
|
# for suffix, items in [
|
|
|
|
# ('added', only_in_file2),
|
|
|
|
# ('removed', only_in_file1),
|
|
|
|
# ('changed', changed),
|
|
|
|
# ('changed_deep', changed_deep)
|
|
|
|
# ]:
|
|
|
|
# logger.info(f"Writing {suffix}: {len(items)}")
|
|
|
|
# output_path = file2.parent / f'{file2.stem}_{suffix}.jsonl.gz'
|
|
|
|
# with gzip.open(output_path, 'wt') as f:
|
|
|
|
# for item in tqdm(items, desc=suffix):
|
|
|
|
# f.write(json.dumps(item) + '\n')
|
|
|
|
|
|
|
|
breakpoint()
|
|
|
|
|
2025-02-05 10:21:50 -05:00
|
|
|
if __name__ == "__main__":
|
2025-02-24 16:45:50 -05:00
|
|
|
cli()
|