mirror of
https://github.com/harvard-lil/data-vault.git
synced 2025-07-05 06:00:36 -04:00
Refactoring, github pipeline, s3 creation
This commit is contained in:
parent
a7c99e264d
commit
b245fd44eb
21 changed files with 718 additions and 281 deletions
0
scripts/data_gov/__init__.py
Normal file
0
scripts/data_gov/__init__.py
Normal file
|
@ -61,17 +61,8 @@ def find_differences(csv_data: Dict[str, dict],
|
|||
@click.option('--compare-by', '-c',
|
||||
default='id',
|
||||
help='Field to compare by.')
|
||||
@click.option('--log-level', '-l',
|
||||
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
|
||||
default='INFO',
|
||||
help='Logging level.')
|
||||
def main(old_path: Path, new_path: Path, compare_by: str, log_level: str):
|
||||
"""Compare records between CSV and JSONL files."""
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, log_level),
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
old_data = load_jsonl_data(old_path, compare_by=compare_by)
|
||||
new_data = load_jsonl_data(new_path, compare_by=compare_by)
|
||||
|
||||
|
|
|
@ -3,36 +3,38 @@ from collections import Counter, defaultdict
|
|||
from pathlib import Path
|
||||
|
||||
|
||||
# Read the JSONL file and count crawler_identified_date values
|
||||
downloaded_counts = Counter()
|
||||
identified_counts = Counter()
|
||||
titles_by_org = defaultdict(list)
|
||||
with open('data/data_db_dump_20250130.only_name.jsonl', 'r') as f:
|
||||
for line in f:
|
||||
data = json.loads(line)
|
||||
org = json.loads(data.get('organization', '{}'))
|
||||
identified_counts[(data.get('crawler_identified_date') or '')[:10]] += 1
|
||||
titles_by_org[org['title']].append(data["title"])
|
||||
if __name__ == "__main__":
|
||||
|
||||
# Print the counts sorted by date
|
||||
for date, count in sorted(identified_counts.items()):
|
||||
print(f"{date}: {count}")
|
||||
|
||||
# sort each list of titles by org
|
||||
for org, titles in titles_by_org.items():
|
||||
titles_by_org[org].sort()
|
||||
Path('data/titles_by_org.json').write_text(json.dumps(titles_by_org, indent=2))
|
||||
|
||||
|
||||
# print urls
|
||||
for path in Path('data/').glob('glass*'):
|
||||
print(path)
|
||||
with open(path, 'r') as f:
|
||||
# Read the JSONL file and count crawler_identified_date values
|
||||
downloaded_counts = Counter()
|
||||
identified_counts = Counter()
|
||||
titles_by_org = defaultdict(list)
|
||||
with open('data/data_db_dump_20250130.only_name.jsonl', 'r') as f:
|
||||
for line in f:
|
||||
data = json.loads(line)
|
||||
print("* " + data['name'])
|
||||
resources = data.get('resources', [])
|
||||
if type(resources) == str:
|
||||
resources = json.loads(resources)
|
||||
for resource in resources:
|
||||
print(' * ' + resource['url'])
|
||||
org = json.loads(data.get('organization', '{}'))
|
||||
identified_counts[(data.get('crawler_identified_date') or '')[:10]] += 1
|
||||
titles_by_org[org['title']].append(data["title"])
|
||||
|
||||
# Print the counts sorted by date
|
||||
for date, count in sorted(identified_counts.items()):
|
||||
print(f"{date}: {count}")
|
||||
|
||||
# sort each list of titles by org
|
||||
for org, titles in titles_by_org.items():
|
||||
titles_by_org[org].sort()
|
||||
Path('data/titles_by_org.json').write_text(json.dumps(titles_by_org, indent=2))
|
||||
|
||||
|
||||
# print urls
|
||||
for path in Path('data/').glob('glass*'):
|
||||
print(path)
|
||||
with open(path, 'r') as f:
|
||||
for line in f:
|
||||
data = json.loads(line)
|
||||
print("* " + data['name'])
|
||||
resources = data.get('resources', [])
|
||||
if type(resources) == str:
|
||||
resources = json.loads(resources)
|
||||
for resource in resources:
|
||||
print(' * ' + resource['url'])
|
||||
|
|
|
@ -10,31 +10,17 @@ import os
|
|||
from urllib.parse import urlparse
|
||||
import re
|
||||
from scripts.helpers.parallel import run_parallel
|
||||
import zipfile
|
||||
import struct
|
||||
import boto3
|
||||
import logging
|
||||
from scripts.data_gov.models import db, Dataset
|
||||
from playhouse.shortcuts import model_to_dict
|
||||
from tqdm import tqdm
|
||||
from datetime import datetime
|
||||
from scripts.helpers.bag import zip_archive, upload_archive, cleanup_files, fetch_and_upload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
## download data.gov datasets, create nabit archives, and upload to S3
|
||||
|
||||
# File extensions that are already compressed or wouldn't benefit from additional compression
|
||||
UNCOMPRESSED_EXTENSIONS = {
|
||||
# Already compressed archives
|
||||
'zip', 'gz', 'tgz', 'bz2', '7z', 'rar', 'xz',
|
||||
# Compressed images
|
||||
'jpg', 'jpeg', 'png', 'gif', 'webp',
|
||||
# Compressed video/audio
|
||||
'mp4', 'mov', 'avi', 'wmv', 'ogv', 'mp3', 'm4a',
|
||||
# Other compressed/binary formats
|
||||
'pdf', 'docx', 'xlsx', 'pptx',
|
||||
}
|
||||
|
||||
stats_counter = {}
|
||||
|
||||
def is_valid_url(url):
|
||||
|
@ -55,95 +41,6 @@ def extract_urls(data, urls = None):
|
|||
extract_urls(item, urls)
|
||||
return urls
|
||||
|
||||
def create_archive(bag_dir, dataset: Dataset, signatures):
|
||||
data_dict = model_to_dict(dataset)
|
||||
for key, value in data_dict.items():
|
||||
if isinstance(value, datetime):
|
||||
data_dict[key] = value.isoformat()
|
||||
data_gov_url = f'https://catalog.data.gov/dataset/{dataset.name}'
|
||||
collect = [
|
||||
*[UrlCollectionTask(url=url) for url in extract_urls(data_dict)],
|
||||
]
|
||||
logger.info(f" - Downloading {len(collect)} files")
|
||||
|
||||
# sort fields from dataset
|
||||
data_gov_metadata = {k: v for k, v in data_dict.items() if not k.startswith('crawler_')}
|
||||
crawler_metadata = {k: v for k, v in data_dict.items() if k.startswith('crawler_')}
|
||||
|
||||
# Create the archive
|
||||
package(
|
||||
output_path=bag_dir,
|
||||
collect=collect,
|
||||
collect_errors='ignore',
|
||||
signed_metadata={
|
||||
'id': str(uuid.uuid4()),
|
||||
'url': data_gov_url,
|
||||
'description': f'Archive of data.gov dataset "{dataset.title}" created by {dataset.organization["title"]}. Full metadata stored in data_gov_metadata key.',
|
||||
'data_gov_metadata': data_gov_metadata,
|
||||
'crawler_metadata': crawler_metadata,
|
||||
},
|
||||
signatures=signatures,
|
||||
)
|
||||
|
||||
def zip_archive(bag_dir, archive_path):
|
||||
# Create zip archive
|
||||
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||||
for file_path in bag_dir.rglob('*'):
|
||||
if file_path.is_file():
|
||||
arc_path = file_path.relative_to(bag_dir)
|
||||
compression = (zipfile.ZIP_STORED
|
||||
if file_path.suffix.lower().lstrip('.') in UNCOMPRESSED_EXTENSIONS
|
||||
else zipfile.ZIP_DEFLATED)
|
||||
zf.write(file_path, arc_path, compress_type=compression)
|
||||
|
||||
# Create metadata file
|
||||
zip_info = []
|
||||
with zipfile.ZipFile(archive_path, 'r') as zf:
|
||||
for info in zf.filelist:
|
||||
header_offset = info.header_offset
|
||||
|
||||
# Read header to calculate data offset
|
||||
zf.fp.seek(header_offset)
|
||||
header = zf.fp.read(zipfile.sizeFileHeader)
|
||||
fheader = struct.unpack(zipfile.structFileHeader, header)
|
||||
fname_length = fheader[zipfile._FH_FILENAME_LENGTH]
|
||||
extra_length = fheader[zipfile._FH_EXTRA_FIELD_LENGTH]
|
||||
data_offset = header_offset + zipfile.sizeFileHeader + fname_length + extra_length
|
||||
|
||||
zip_info.append({
|
||||
'filename': info.filename,
|
||||
'file_size': info.file_size,
|
||||
'compress_size': info.compress_size,
|
||||
'compress_type': info.compress_type,
|
||||
'header_offset': header_offset,
|
||||
'data_offset': data_offset,
|
||||
})
|
||||
|
||||
# Read the bag-info.txt and signed-metadata.json
|
||||
bag_info = (bag_dir / 'bag-info.txt').read_text()
|
||||
signed_metadata = json.loads((bag_dir / 'data/signed-metadata.json').read_text())
|
||||
|
||||
return {
|
||||
'bag_info': bag_info,
|
||||
'signed_metadata': signed_metadata,
|
||||
'zip_entries': zip_info
|
||||
}
|
||||
|
||||
def upload_archive(output_path, collection_path, metadata_path, s3_path, session_args):
|
||||
s3 = boto3.Session(**session_args).client('s3')
|
||||
bucket_name, s3_path = s3_path.split('/', 1)
|
||||
|
||||
# Upload zip file
|
||||
s3_collection_key = os.path.join(s3_path, str(collection_path.relative_to(output_path)))
|
||||
s3.upload_file(str(collection_path), bucket_name, s3_collection_key)
|
||||
logger.info(f" - Uploaded {collection_path.relative_to(output_path)} to {s3_collection_key}")
|
||||
|
||||
# Upload metadata file
|
||||
s3_metadata_key = os.path.join(s3_path, str(metadata_path.relative_to(output_path)))
|
||||
s3.upload_file(str(metadata_path), bucket_name, s3_metadata_key)
|
||||
logger.info(f" - Uploaded {metadata_path.relative_to(output_path)} to {s3_metadata_key}")
|
||||
|
||||
|
||||
def run_pipeline(
|
||||
dataset: Dataset,
|
||||
output_path: Path,
|
||||
|
@ -162,36 +59,43 @@ def run_pipeline(
|
|||
# set this here so it makes it into the metadata
|
||||
dataset.crawler_downloaded_date = datetime.now()
|
||||
|
||||
with tempfile.TemporaryDirectory(dir=str(output_path)) as temp_dir:
|
||||
logger.info("- Creating archive...")
|
||||
# set up paths
|
||||
temp_dir = Path(temp_dir)
|
||||
bag_dir = temp_dir / 'bag'
|
||||
archive_path = temp_dir / 'archive.zip'
|
||||
def create_archive(temp_dir):
|
||||
data_dict = model_to_dict(dataset)
|
||||
for key, value in data_dict.items():
|
||||
if isinstance(value, datetime):
|
||||
data_dict[key] = value.isoformat()
|
||||
data_gov_url = f'https://catalog.data.gov/dataset/{dataset.name}'
|
||||
collect = [
|
||||
*[UrlCollectionTask(url=url) for url in extract_urls(data_dict)],
|
||||
]
|
||||
logger.info(f" - Downloading {len(collect)} files")
|
||||
|
||||
# download data with nabit
|
||||
create_archive(bag_dir, dataset, signatures)
|
||||
# sort fields from dataset
|
||||
data_gov_metadata = {k: v for k, v in data_dict.items() if not k.startswith('crawler_')}
|
||||
crawler_metadata = {k: v for k, v in data_dict.items() if k.startswith('crawler_')}
|
||||
|
||||
logger.info("- Zipping archive...")
|
||||
# zip up data and create metadata
|
||||
output_metadata = zip_archive(bag_dir, archive_path)
|
||||
return {
|
||||
'collect': collect,
|
||||
'signed_metadata': {
|
||||
'id': str(uuid.uuid4()),
|
||||
'url': data_gov_url,
|
||||
'description': f'Archive of data.gov dataset "{dataset.title}" created by {dataset.organization["title"]}. Full metadata stored in data_gov_metadata key.',
|
||||
'data_gov_metadata': data_gov_metadata,
|
||||
'crawler_metadata': crawler_metadata,
|
||||
},
|
||||
}
|
||||
|
||||
logger.info("- Moving files to final location...")
|
||||
# Move files to final location
|
||||
collection_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
metadata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
os.rename(str(archive_path), collection_path)
|
||||
metadata_path.write_text(json.dumps(output_metadata) + '\n')
|
||||
|
||||
if s3_path:
|
||||
logger.info("Uploading to S3...")
|
||||
upload_archive(output_path, collection_path, metadata_path, s3_path, session_args)
|
||||
|
||||
if not no_delete:
|
||||
logger.info("- Deleting zip archive...")
|
||||
os.remove(collection_path)
|
||||
if collection_path.parent.exists() and not os.listdir(collection_path.parent):
|
||||
os.rmdir(collection_path.parent)
|
||||
# Use common pipeline
|
||||
fetch_and_upload(
|
||||
output_path=output_path,
|
||||
collection_path=collection_path,
|
||||
metadata_path=metadata_path,
|
||||
create_archive_callback=create_archive,
|
||||
signatures=signatures,
|
||||
session_args=session_args,
|
||||
s3_path=s3_path,
|
||||
no_delete=no_delete
|
||||
)
|
||||
|
||||
logger.info("- Setting crawler_downloaded_date...")
|
||||
db.connect()
|
||||
|
@ -244,12 +148,10 @@ def get_unprocessed_datasets(output_path: Path, collection: str, min_size: int =
|
|||
@click.option('--signatures', help='JSON string of signature configuration.')
|
||||
@click.option('--profile', '-p', help='AWS profile name')
|
||||
@click.option('--s3-path', '-s', help='S3 path for uploads, e.g. "<bucket_name>/<path>"')
|
||||
@click.option('--log-level', '-l', type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']), default=None,
|
||||
help='Logging level.')
|
||||
@click.option('--stop-after', help='Stop after processing this many collections', type=int)
|
||||
@click.option('--no-delete', is_flag=True, help='Set to preserve zipped data on disk as well as metadata')
|
||||
def main(db_path: Path, output_path: Path, collection: str, workers=None, min_size=0, dataset_name=None,
|
||||
if_exists='skip', signatures=None, profile=None, s3_path=None, log_level=None, stop_after=None, no_delete=False):
|
||||
if_exists='skip', signatures=None, profile=None, s3_path=None, stop_after=None, no_delete=False):
|
||||
|
||||
if dataset_name:
|
||||
workers = 1
|
||||
|
|
|
@ -151,17 +151,8 @@ def cli():
|
|||
help='Number of results to fetch per page.')
|
||||
@click.option('--start-date', '-s', type=str, default=None,
|
||||
help='Date to start fetching from (format: YYYY-MM-DDTHH:MM:SS.mmmmmm)')
|
||||
@click.option('--log-level', '-l',
|
||||
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
|
||||
default='WARNING',
|
||||
help='Logging level.')
|
||||
def fetch(output_path: Path, rows_per_page: int, start_date: str, log_level: str):
|
||||
def fetch(output_path: Path, rows_per_page: int, start_date: str):
|
||||
"""Fetch package data from data.gov API and save to database."""
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, log_level),
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
save_packages_to_database(output_path, rows_per_page, start_date)
|
||||
|
||||
@cli.command()
|
||||
|
|
|
@ -22,19 +22,10 @@ def cli():
|
|||
@click.argument('output_path', type=click.Path(path_type=Path))
|
||||
@click.option('--rows-per-page', '-r', type=int, default=1000,
|
||||
help='Number of results to fetch per page.')
|
||||
@click.option('--log-level', '-l',
|
||||
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
|
||||
default='INFO',
|
||||
help='Logging level.')
|
||||
@click.option('--start-date', '-s', type=str, default=None,
|
||||
help='Start date for fetching packages in YYYY-MM-DD format.')
|
||||
def fetch(output_path: Path, rows_per_page: int, log_level: str, start_date: str):
|
||||
"""Fetch all package data from data.gov API and save to gzipped JSONL file."""
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, log_level),
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
if output_path.is_dir():
|
||||
current_date = datetime.now().strftime('%Y%m%d')
|
||||
output_path = output_path / f'data_{current_date}.jsonl.gz'
|
||||
|
@ -49,17 +40,8 @@ def fetch(output_path: Path, rows_per_page: int, log_level: str, start_date: str
|
|||
@cli.command()
|
||||
@click.argument('file1', type=click.Path(exists=True, path_type=Path))
|
||||
@click.argument('file2', type=click.Path(exists=True, path_type=Path))
|
||||
@click.option('--log-level', '-l',
|
||||
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
|
||||
default='INFO',
|
||||
help='Logging level.')
|
||||
def compare(file1: Path, file2: Path, log_level: str):
|
||||
"""Compare two gzipped JSONL files by indexing on the 'name' key."""
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, log_level),
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
def load_jsonl_index(file_path: Path) -> Dict[str, Any]:
|
||||
# Check for pickle file
|
||||
pickle_path = file_path.with_suffix('.pickle')
|
||||
|
|
|
@ -60,7 +60,7 @@ class Dataset(BaseModel):
|
|||
# fields starting with crawler_ are added by our crawler
|
||||
crawler_identified_date = DateTimeField(null=True, default=datetime.now)
|
||||
crawler_downloaded_date = DateTimeField(null=True)
|
||||
crawler_last_crawl_id = ForeignKeyField('Crawl', backref='datasets', null=True)
|
||||
crawler_last_crawl_id = ForeignKeyField(Crawl, backref='datasets', null=True)
|
||||
|
||||
|
||||
class DatasetHistory(Dataset):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue