data-vault/scripts/data_gov/fetch_data.py
2025-02-26 14:49:24 -05:00

220 lines
8.3 KiB
Python

from nabit.lib.archive import package
from nabit.lib.sign import KNOWN_TSAS, is_encrypted_key
from nabit.lib.backends.url import UrlCollectionTask
from pathlib import Path
import json
import uuid
import tempfile
import click
import os
from urllib.parse import urlparse
import re
from scripts.helpers.parallel import run_parallel
import logging
from scripts.data_gov.models import db, Dataset
from playhouse.shortcuts import model_to_dict
from tqdm import tqdm
from datetime import datetime
from scripts.helpers.bag import zip_archive, upload_archive, cleanup_files, fetch_and_upload
logger = logging.getLogger(__name__)
## download data.gov datasets, create nabit archives, and upload to S3
stats_counter = {}
def is_valid_url(url):
parsed = urlparse(url)
return parsed.scheme in ['http', 'https'] and re.search(r'[^\.]\.[^\.]', parsed.netloc)
def extract_urls(data, urls = None):
urls = set() if urls is None else urls
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, str):
if is_valid_url(value):
urls.add(value)
elif isinstance(value, (dict, list)):
extract_urls(value, urls)
elif isinstance(data, list):
for item in data:
extract_urls(item, urls)
return urls
def run_pipeline(
dataset: Dataset,
output_path: Path,
metadata_path: Path,
collection_path: Path,
signatures: list = None,
session_args: dict = None,
s3_path: str = None,
no_delete: bool = False,
):
logger.info(f"Processing dataset: {dataset.name}")
# we have a db forked from the main process, so we need to close it and reopen if needed
db.close()
# set this here so it makes it into the metadata
dataset.crawler_downloaded_date = datetime.now()
def create_archive(temp_dir):
data_dict = model_to_dict(dataset)
for key, value in data_dict.items():
if isinstance(value, datetime):
data_dict[key] = value.isoformat()
data_gov_url = f'https://catalog.data.gov/dataset/{dataset.name}'
collect = [
*[UrlCollectionTask(url=url) for url in extract_urls(data_dict)],
]
logger.info(f" - Downloading {len(collect)} files")
# sort fields from dataset
data_gov_metadata = {k: v for k, v in data_dict.items() if not k.startswith('crawler_')}
crawler_metadata = {k: v for k, v in data_dict.items() if k.startswith('crawler_')}
return {
'collect': collect,
'signed_metadata': {
'id': str(uuid.uuid4()),
'url': data_gov_url,
'description': f'Archive of data.gov dataset "{dataset.title}" created by {dataset.organization["title"]}. Full metadata stored in data_gov_metadata key.',
'data_gov_metadata': data_gov_metadata,
'crawler_metadata': crawler_metadata,
},
}
# Use common pipeline
fetch_and_upload(
output_path=output_path,
collection_path=collection_path,
metadata_path=metadata_path,
create_archive_callback=create_archive,
signatures=signatures,
session_args=session_args,
s3_path=s3_path,
no_delete=no_delete
)
logger.info("- Setting crawler_downloaded_date...")
db.connect()
dataset.save()
logger.info("Processing complete")
def get_unprocessed_datasets(output_path: Path, collection: str, min_size: int = 0, dataset_name: str = None):
"""Get datasets from SQLite that don't have metadata files yet."""
query = Dataset.select()
if dataset_name:
query = query.where(Dataset.name == dataset_name)
if min_size:
query = query.where(Dataset.size >= min_size)
# Initialize progress bars
stats_counter['total'] = tqdm(desc="Total records", unit="pkg")
stats_counter['skipped'] = tqdm(desc="Already processed", unit="pkg")
stats_counter['yielded'] = tqdm(desc="Processing", unit="pkg")
for dataset in query:
stats_counter['total'].update(1)
# Check if metadata file exists
name = dataset.name
metadata_path = output_path / 'metadata' / collection / name / 'v1.json'
if metadata_path.exists():
stats_counter['skipped'].update(1)
continue
stats_counter['yielded'].update(1)
yield dataset
@click.command()
@click.option('--db-path', '-d', type=click.Path(exists=True, path_type=Path), default='data/data.db')
@click.option('--output-path', '-o', type=click.Path(path_type=Path), default='data/processed',
help='Output path.')
@click.option('--collection', '-c', type=str, default='data_gov',
help='Collection name.')
@click.option('--workers', '-w', type=int, default=None,
help='Number of worker processes. Defaults to CPU count.')
@click.option('--min-size', '-s', type=int, default=0,
help='Minimum size of dataset to process.')
@click.option('--dataset-name', help='Dataset name to process.')
@click.option('--if-exists', '-e', type=click.Choice(['skip', 'replace', 'version']), default='skip',
help='Whether to skip, replace, or add a version if dataset already exists.')
@click.option('--signatures', help='JSON string of signature configuration.')
@click.option('--profile', '-p', help='AWS profile name')
@click.option('--s3-path', '-s', help='S3 path for uploads, e.g. "<bucket_name>/<path>"')
@click.option('--stop-after', help='Stop after processing this many collections', type=int)
@click.option('--no-delete', is_flag=True, help='Set to preserve zipped data on disk as well as metadata')
def main(db_path: Path, output_path: Path, collection: str, workers=None, min_size=0, dataset_name=None,
if_exists='skip', signatures=None, profile=None, s3_path=None, stop_after=None, no_delete=False):
if dataset_name:
workers = 1
stop_after = 1
if signatures:
signatures = json.loads(signatures)
for signature in signatures:
if signature['action'] == 'sign':
if is_encrypted_key(signature['params']['key']):
signature['params']['password'] = click.prompt(
f"Enter password for {signature['params']['key']}: ",
hide_input=True
)
elif signature['action'] == 'timestamp':
if known_tsa := signature.pop('known_tsa', None):
signature['params'] = KNOWN_TSAS[known_tsa]
session_args = {}
if profile:
session_args['profile_name'] = profile
# Initialize database connection
db.init(db_path)
db.connect()
def get_tasks():
processed = 0
for dataset in get_unprocessed_datasets(output_path, collection, min_size, dataset_name):
# handle existing datasets
name = dataset.name
collection_path = output_path / 'collections' / collection / name / 'v1.zip'
metadata_path = output_path / 'metadata' / collection / name / 'v1.json'
if metadata_path.exists():
if if_exists == 'skip':
continue
elif if_exists == 'replace':
metadata_path.unlink()
if collection_path.exists():
collection_path.unlink()
elif if_exists == 'version':
version = 2
while True:
collection_path = output_path / 'collections' / collection / name / f'v{version}.zip'
metadata_path = output_path / 'metadata' / collection / name / f'v{version}.json'
if not metadata_path.exists():
break
version += 1
yield dataset, output_path, metadata_path, collection_path, signatures, session_args, s3_path, no_delete
processed += 1
if stop_after and processed >= stop_after:
break
try:
run_parallel(run_pipeline, get_tasks(), workers, log_level=log_level, catch_errors=False)
finally:
# Close progress bars
for counter in stats_counter.values():
counter.close()
db.close()
if __name__ == '__main__':
main()