Cleanup to prep for diffing

This commit is contained in:
Jack Cushman 2025-02-24 16:45:50 -05:00
parent 7af7f9cf3e
commit a7c99e264d
9 changed files with 290 additions and 122 deletions

View file

@ -1,10 +1,9 @@
import httpx
from typing import Iterator, Dict, Any, List
import time
from typing import Dict, Any, List
import click
from pathlib import Path
import logging
from datetime import datetime
from scripts.data_gov.helpers import fetch_data_gov_packages
from scripts.data_gov.models import db, Dataset, DatasetHistory
from tqdm import tqdm
from playhouse.shortcuts import model_to_dict
@ -104,71 +103,6 @@ def save_packages_to_database(output_path: Path, rows_per_page: int = 1000, star
finally:
db.close()
def fetch_data_gov_packages(rows_per_page: int = 1000, start_date: str = None, max_retries: int = 3) -> Iterator[Dict[str, Any]]:
"""
Fetch package data from data.gov API using date-based pagination.
Args:
rows_per_page: Number of results to fetch per page
start_date: Optional date to start fetching from (format: YYYY-MM-DDTHH:MM:SS.mmmmmm)
max_retries: Maximum number of retry attempts for 5xx errors
Yields:
Dict containing package data for each result
"""
base_url = "https://catalog.data.gov/api/3/action/package_search"
current_date = start_date
total_records = 0
while True:
logger.info(f"Current date offset: {current_date}")
# Build date filter query
url = f"{base_url}?rows={rows_per_page}&sort=metadata_modified+desc"
if current_date:
# Format date to match Solr's expected format (dropping microseconds)
formatted_date = current_date.split('.')[0] + 'Z'
date_filter = f"+metadata_modified:[* TO {formatted_date}]"
url += f"&fq={date_filter}"
for attempt in range(max_retries):
try:
start_time = time.time()
response = httpx.get(url, timeout=60.0)
request_time = time.time() - start_time
response.raise_for_status()
break # Success, exit retry loop
except httpx.HTTPStatusError as e:
if e.response.status_code >= 500 and attempt < max_retries - 1:
retry_wait = 2 ** attempt # Exponential backoff
logger.warning(f"Got {e.response.status_code}, retrying in {retry_wait}s... (attempt {attempt + 1}/{max_retries})")
logger.warning(f"Error URL: {url}")
time.sleep(retry_wait)
continue
# If not a 5xx error or we're out of retries, re-raise
logger.error(f"Error URL: {url}")
logger.error(f"Response content: {response.text}")
raise
data = response.json()
results = data["result"]["results"]
if not results:
break
# Get date of last result for next query
current_date = results[-1]["metadata_modified"]
total_records += len(results)
logger.info(f"Request took {request_time:.2f}s. Total records: {total_records}")
yield results
time.sleep(1)
def get_dataset_history(dataset_name: str) -> None:
"""
Fetch and display all versions of a dataset with the given ID,

View file

@ -1,16 +1,25 @@
import httpx
import json
import time
import logging
import gzip
import pickle
from pathlib import Path
from typing import Iterator, Dict, Any, List
import click
from scripts.data_gov.fetch_index import fetch_data_gov_packages
from scripts.data_gov.helpers import fetch_data_gov_packages
from datetime import datetime
from typing import Dict, Any
from tqdm import tqdm
import deepdiff
import orjson
logger = logging.getLogger(__name__)
@click.command()
@click.argument('output_path', type=click.Path(path_type=Path), default='data/data_20250130.jsonl')
@click.group()
def cli():
"""Data.gov package management commands."""
pass
@cli.command()
@click.argument('output_path', type=click.Path(path_type=Path))
@click.option('--rows-per-page', '-r', type=int, default=1000,
help='Number of results to fetch per page.')
@click.option('--log-level', '-l',
@ -19,17 +28,87 @@ logger = logging.getLogger(__name__)
help='Logging level.')
@click.option('--start-date', '-s', type=str, default=None,
help='Start date for fetching packages in YYYY-MM-DD format.')
def main(output_path: Path, rows_per_page: int, log_level: str, start_date: str):
"""Fetch all package data from data.gov API and save to JSONL file."""
def fetch(output_path: Path, rows_per_page: int, log_level: str, start_date: str):
"""Fetch all package data from data.gov API and save to gzipped JSONL file."""
logging.basicConfig(
level=getattr(logging, log_level),
format='%(asctime)s - %(levelname)s - %(message)s'
)
with open(output_path, 'a') as f:
if output_path.is_dir():
current_date = datetime.now().strftime('%Y%m%d')
output_path = output_path / f'data_{current_date}.jsonl.gz'
logger.info(f"Writing to {output_path}")
with gzip.open(output_path, 'at') as f:
for results in fetch_data_gov_packages(rows_per_page=rows_per_page, start_date=start_date):
for package in results:
f.write(json.dumps(package) + '\n')
@cli.command()
@click.argument('file1', type=click.Path(exists=True, path_type=Path))
@click.argument('file2', type=click.Path(exists=True, path_type=Path))
@click.option('--log-level', '-l',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
default='INFO',
help='Logging level.')
def compare(file1: Path, file2: Path, log_level: str):
"""Compare two gzipped JSONL files by indexing on the 'name' key."""
logging.basicConfig(
level=getattr(logging, log_level),
format='%(asctime)s - %(levelname)s - %(message)s'
)
def load_jsonl_index(file_path: Path) -> Dict[str, Any]:
# Check for pickle file
pickle_path = file_path.with_suffix('.pickle')
if pickle_path.exists():
logger.info(f"Loading cached index from {pickle_path}")
with open(pickle_path, 'rb') as f:
return pickle.load(f)
# If no pickle file exists, load from JSONL and create pickle
index = {}
with gzip.open(file_path, 'rt') as f:
for line in tqdm(f, desc=f"Loading {file_path}"):
record = orjson.loads(line)
index[record['name']] = record
# Save to pickle for future runs
logger.info(f"Saving index to {pickle_path}")
with open(pickle_path, 'wb') as f:
pickle.dump(index, f)
return index
logger.info(f"Loading {file1}")
index1 = load_jsonl_index(file1)
logger.info(f"Loading {file2}")
index2 = load_jsonl_index(file2)
names1 = set(index1.keys())
names2 = set(index2.keys())
only_in_file1 = [index1[name] for name in names1 - names2]
only_in_file2 = [index2[name] for name in names2 - names1]
names_in_both = names1 & names2
changed = [[index1[name], index2[name]] for name in tqdm(names_in_both, desc="Changed") if index1[name] != index2[name]]
changed_deep = [[diff.to_json(), item1, item2] for item1, item2 in tqdm(changed[:1000], desc="Changed (deep)") if (diff := deepdiff.DeepDiff(item1, item2, ignore_order=True))]
# for suffix, items in [
# ('added', only_in_file2),
# ('removed', only_in_file1),
# ('changed', changed),
# ('changed_deep', changed_deep)
# ]:
# logger.info(f"Writing {suffix}: {len(items)}")
# output_path = file2.parent / f'{file2.stem}_{suffix}.jsonl.gz'
# with gzip.open(output_path, 'wt') as f:
# for item in tqdm(items, desc=suffix):
# f.write(json.dumps(item) + '\n')
breakpoint()
if __name__ == "__main__":
main()
cli()

View file

@ -0,0 +1,71 @@
import httpx
import time
from typing import Any, Dict, Iterator
import logging
logger = logging.getLogger(__name__)
def fetch_data_gov_packages(rows_per_page: int = 1000, start_date: str = None, max_retries: int = 3) -> Iterator[Dict[str, Any]]:
"""
Fetch package data from data.gov API using date-based pagination.
Args:
rows_per_page: Number of results to fetch per page
start_date: Optional date to start fetching from (format: YYYY-MM-DDTHH:MM:SS.mmmmmm)
max_retries: Maximum number of retry attempts for 5xx errors
Yields:
Dict containing package data for each result
"""
base_url = "https://catalog.data.gov/api/3/action/package_search"
current_date = start_date
total_records = 0
while True:
logger.info(f"Current date offset: {current_date}")
# Build date filter query
url = f"{base_url}?rows={rows_per_page}&sort=metadata_modified+desc"
if current_date:
# Format date to match Solr's expected format (dropping microseconds)
formatted_date = current_date.split('.')[0] + 'Z'
date_filter = f"+metadata_modified:[* TO {formatted_date}]"
url += f"&fq={date_filter}"
for attempt in range(max_retries):
try:
start_time = time.time()
response = httpx.get(url, timeout=60.0)
request_time = time.time() - start_time
response.raise_for_status()
break # Success, exit retry loop
except httpx.HTTPStatusError as e:
if e.response.status_code >= 500 and attempt < max_retries - 1:
retry_wait = 2 ** attempt # Exponential backoff
logger.warning(f"Got {e.response.status_code}, retrying in {retry_wait}s... (attempt {attempt + 1}/{max_retries})")
logger.warning(f"Error URL: {url}")
time.sleep(retry_wait)
continue
# If not a 5xx error or we're out of retries, re-raise
logger.error(f"Error URL: {url}")
logger.error(f"Response content: {response.text}")
raise
data = response.json()
results = data["result"]["results"]
if not results:
break
# Get date of last result for next query
current_date = results[-1]["metadata_modified"]
total_records += len(results)
logger.info(f"Request took {request_time:.2f}s. Total records: {total_records}")
yield results
time.sleep(1)

View file

@ -1,17 +1,19 @@
from playhouse.migrate import *
from scripts.data_gov.models import db
from scripts.data_gov.models import db, Crawl
migrator = SqliteMigrator(db)
def do_migrate():
crawler_identified_date = DateTimeField(null=True)
crawler_downloaded_date = DateTimeField(null=True)
crawler_last_run_id = ForeignKeyField(Crawl, null=True)
deleted_by = ForeignKeyField(Crawl, null=True)
with db.atomic():
# Create the Run table first
db.create_tables([Crawl])
migrate(
# migrator.add_column('dataset', 'crawler_identified_date', crawler_identified_date),
# migrator.add_column('dataset', 'crawler_downloaded_date', crawler_downloaded_date),
# migrator.add_column('datasethistory', 'crawler_identified_date', crawler_identified_date),
# migrator.add_column('datasethistory', 'crawler_downloaded_date', crawler_downloaded_date),
migrator.add_column('dataset', 'crawler_last_run_id', crawler_last_run_id),
migrator.add_column('datasethistory', 'deleted_by', deleted_by),
)
if __name__ == '__main__':

View file

@ -18,6 +18,12 @@ class BaseModel(Model):
class Meta:
database = db
class Crawl(BaseModel):
id = AutoField(primary_key=True)
start_date = DateTimeField()
end_date = DateTimeField(null=True)
class Dataset(BaseModel):
# fields from data.gov
id = CharField(primary_key=True)
@ -54,8 +60,10 @@ class Dataset(BaseModel):
# fields starting with crawler_ are added by our crawler
crawler_identified_date = DateTimeField(null=True, default=datetime.now)
crawler_downloaded_date = DateTimeField(null=True)
crawler_last_crawl_id = ForeignKeyField('Crawl', backref='datasets', null=True)
class DatasetHistory(Dataset):
history_id = AutoField(primary_key=True)
id = CharField() # Regular CharField, not primary key
#deleted_by_date = DateTimeField(null=True) # New field to track deletion date
deleted_by_date = DateTimeField(null=True)