Jack Cushman 2025-02-05 10:21:50 -05:00
This repository collects scripts to support the Library Innovation Lab's
[public data preservation project](
These scripts are used as part of internal pipelines, so may not be usable
for others as is, but are available for reference about the creation of our
data and for remix. We also welcome contributions if they fit our internal
pipelines and goals.
## Scripts
Scripts are organized into subfolders for general categories of tasks:
### collection
Scripts for working with a "collection," meaning a set of files stored on
cloud storage that were all gathered with a similar collection strategy.
This folder is for scripts that apply to multiple collections rather than
a single collection.
* copy static files from collections/ to configure the collections.
* generate static indexes of files in a collection.
* fetch and verify integrity of a BagIt archive in a collection.
* manage Cloudflare R2 buckets.
* manage S3 buckets.
### helpers
Util libraries used by other scripts.
* run tasks in parallel.
* load configuration from the user's home dir.
### data_gov
Scripts for working with the []( collection.
* fetch a jsonl file of the full API.
* fetch the full API and store updates in a sqlite database.
* database models for the sqlite database.
* use the sqlite database to fetch any datasets that require updating,
package with nabit, and upload to cloud storage.
* database models for the sqlite database.
* data_gov_diff/: scripts for identifying changes in past data created by or (WIP).
### github
Scripts for working with the [github]( collection.
* use [gitspoke]( to download all repositories listed in a CSV.

Static files to be added to individual collections.

"directory": "data_gov",
"aws_profile": "sc",
"s3_path": ""

<img src="" alt="Harvard Law School Library Innovation Lab logo"/>
This is a regularly updated mirror of all data files linked from [](
The repository is maintained by the Harvard Law School Library Innovation Lab as part
of our [project to preserve U.S. federal public data](
Collection Format
Each dataset on has a unique slug known as its `name`. We store each dataset
in this repository as:
We also store a metadata file for each dataset in the `metadata` directory:
`<version>` is a `v` followed by the number of times we have downloaded the dataset
(v1, v2, etc.)
For example, the dataset [](
is stored in this repository as:
* [collections/data_gov/fruit-and-vegetable-prices/](
* [metadata/data_gov/fruit-and-vegetable-prices/v1.json](
Dataset Format
Each dataset zip file is a BagIt package created by our [bag-nabit]( tool.
[BagIt]( is a simple file format, established by the
Library of Congress, consisting of a folder of metadata and text files. Our BagIt
files follow this directory structure:
* `data/`
* `files/`:
* `...`: these are the actual files you likely want to use as a researcher,
downloaded from the listing.
* `headers.warc`: request and response headers from HTTP fetches for files in `files/`
* `signed-metadata.json`: metadata including's API description of the dataset
The bags also contain these files, which are useful for authenticating the
provenance of the data:
* `bagit.txt`: standard BagIt file
* `bag-info.txt`: standard BagIt file
* `manifest-sha256.txt`: standard BagIt file
* `tagmanifest-sha256.txt`: standard BagIt file
* `signatures/`: directory of signature files
Metadata File Format
Each metadata JSON file contains three main sections:
1. `bag_info`: Contains the BagIt metadata including:
- Bag-Software-Agent: The version of nabit used to create the archive
- Bagging-Date: When the archive was created
2. `signed_metadata`: Contains detailed information about the dataset including:
- `id`: A UUID for this specific archive
- `url`: The URL for the dataset
- `description`: A brief description including the dataset title and creating organization
- `data_gov_metadata`: The complete metadata from's API, including:
- Dataset details (title, description, etc.)
- Organization information
- Resource listings
- Tags and other metadata
- `collection_tasks`: Records of the HTTP requests made to collect the dataset
3. `zip_entries`: Listing of each entry in the collection zip file, which can be used to fetch individual files from the zip file via range request without downloading the entire archive.
Rollup files
There are several rollup files at the top level to help with finding datasets
of interest:
* ``: zipped JSON lines file of all files contained in metadata/
* ``: zipped JSON lines file showing the s3 listing of all files in the repository
* `collections.html`: human-readable HTML file showing the title and link to each dataset (warning, very large file that may not load in some browsers)
Downloading data
To download an individual dataset by name you can construct its URL, such as:
To download large numbers of files, we recommend the `aws` or `rclone` command line tools:
aws s3 cp s3://<name>/ --no-sign-request
Data Limitations
---------------- includes multiple kinds of datasets, including some that link to actual data
files, such as CSV files, and some that link to HTML landing pages. Our process
runs a "shallow crawl" that collects only the directly linked files. Datasets
that link only to a landing page will need to be collected separately.
Source code
The source code used to generate this and other repositories is available at [](
We welcome conversation and collaboration in the issue tracker for that project.

Directory for local data files. Files in this directory are not tracked by git,
but may be used by scripts.
but may be used by scripts.

name = "data-vault"
version = "0.1.0"
readme = ""
requires-python = ">=3.12"
dependencies = [
requires = ["hatchling"]
build-backend = ""
dev-dependencies = [
nabit = { git = "" }
gitspoke = { git = "" }
packages = ["scripts"]

def hello() -> str:
return "Hello from data-mirror!"

def hello() -> str:
return "Hello from data-mirror!"

import logging
from pathlib import Path
import click
from cloudflare import Cloudflare
import os
from scripts.helpers.config import load_config
logger = logging.getLogger(__name__)
def generate_temp_key(account_id: str, bucket: str, parent_access_key_id: str, token: str,
permission: str = "object-read-write", ttl_seconds: int = 3600,
prefixes: list[str] | None = None, objects: list[str] | None = None):
"""Generate a temporary R2 access key using the Cloudflare API.
account_id: Cloudflare account ID
bucket: R2 bucket name
parent_access_key_id: Parent access key ID
token: Cloudflare API token
permission: Permission level ('object-read-write' or 'object-read')
ttl_seconds: Time-to-live in seconds
prefixes: Optional list of key prefixes to restrict access to
objects: Optional list of specific object keys to restrict access to
params = {
"account_id": account_id,
"bucket": bucket,
"parent_access_key_id": parent_access_key_id,
"permission": permission,
"ttl_seconds": ttl_seconds,
if prefixes:
params["prefixes"] = prefixes
if objects:
params["objects"] = objects
return Cloudflare(api_token=token).r2.temporary_credentials.create(**params)
def cli():
"""Cloudflare R2 utility commands."""
@click.option('--bucket', '-b', type=str, required=True,
help='R2 bucket name.')
@click.option('--permission', '-p', type=click.Choice(['object-read-write', 'object-read']),
help='Permission level for the temporary key.')
@click.option('--ttl', '-t', type=int, default=1,
help='Time-to-live in hours for the temporary key.')
@click.option('--prefixes', '-x', multiple=True,
help='Key prefixes to restrict access to. Can be specified multiple times.')
@click.option('--objects', '-o', multiple=True,
help='Specific object keys to restrict access to. Can be specified multiple times.')
@click.option('--log-level', '-l',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
help='Logging level.')
def generate_key(bucket: str, permission: str, ttl: int, prefixes: tuple[str, ...],
objects: tuple[str, ...], log_level: str):
"""Generate temporary Cloudflare R2 access credentials."""
# Setup logging
# Load config
config = load_config().get("temp_tokens", {})
if not config or any(key not in config for key in ['parent_access_key_id', 'account_id', 'token']):
raise click.ClickException("Config file must have 'temp_tokens' dict with 'parent_access_key_id', 'account_id', and 'token' keys")
# Generate temporary key
temp_cred = generate_temp_key(
ttl_seconds=ttl * 3600,
prefixes=list(prefixes) if prefixes else None,
objects=list(objects) if objects else None
# Output AWS config format
click.echo("\n# Add this to ~/.aws/config:")
click.echo("[profile r2-temp]")
click.echo(f"aws_access_key_id = {temp_cred.access_key_id}")
click.echo(f"aws_secret_access_key = {temp_cred.secret_access_key}")
click.echo(f"aws_session_token = {temp_cred.session_token}")
click.echo("region = auto")
click.echo(f"endpoint_url = https://{config['account_id']}")
# Output sample command using first prefix if available
click.echo("\n# Sample upload command:")
sample_path = objects[0] if objects else f"{prefixes[0].strip('/')}/" if prefixes else ""
click.echo(f"aws s3 cp local-file.txt s3://{bucket}/{sample_path} --profile r2-temp")
if __name__ == "__main__":

import click
from pathlib import Path
from scripts.data_gov.models import db, Dataset
import logging
from tqdm import tqdm
logger = logging.getLogger(__name__)
# Header template with styles
<title> Dataset Mirror</title>
<link rel="stylesheet" href="style.css">
<h1> Dataset Mirror</h1>
TABLE_START = ''' <table>
ROW_TEMPLATE = ''' <tr>
TABLE_END = ''' </tbody>
def render_html(datasets_query, output_path: Path) -> None:
"""Render the datasets to an HTML file, streaming content."""
with open(output_path / 'index.html', 'w', encoding='utf-8') as f:
# Write header
# Write table start
# Stream each dataset row
rows = []
for dataset in tqdm(datasets_query.iterator(), desc="Rendering datasets"):
org_title = dataset.organization.get('title') if dataset.organization else 'N/A'
row = ROW_TEMPLATE.format( or '',
if len(rows) >= 1000:
rows = []
if rows:
# Write table end
@click.argument('db_path', type=click.Path(path_type=Path), default='data/data.db')
@click.argument('output_path', type=click.Path(path_type=Path), default='data/processed/web')
@click.option('--log-level', '-l',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
help='Logging level.')
@click.option('--limit', '-n', type=int, default=None,
help='Maximum number of rows to display. Default: all rows.')
def main(db_path: Path, output_path: Path, log_level: str, limit: int | None):
"""Render the Dataset table to an HTML file."""
level=getattr(logging, log_level),
format='%(asctime)s - %(levelname)s - %(message)s'
)"Connecting to database at {db_path}")
try:"Starting HTML generation...")
datasets_query =
if limit:
datasets_query = datasets_query.limit(limit)"Limited to {limit} rows")"Rendering HTML to {output_path}")
render_html(datasets_query, output_path)"Done!")
if __name__ == "__main__":

import boto3
import click
from tqdm import tqdm
import logging
from itertools import islice
logger = logging.getLogger(__name__)
def get_delete_markers(s3_client, bucket: str, prefix: str):
"""Get all delete markers for objects with the given prefix."""
paginator = s3_client.get_paginator('list_object_versions')
for page in tqdm(paginator.paginate(Bucket=bucket, Prefix=prefix), desc="pages"):
if 'DeleteMarkers' in page:
yield [
'Key': marker['Key'],
'VersionId': marker['VersionId']
for marker in page['DeleteMarkers']
if marker['IsLatest']
def remove_delete_markers(s3_client, bucket: str, prefix: str, dry_run: bool = False):
"""Remove all delete markers for objects with the given prefix."""
for marker_batch in get_delete_markers(s3_client, bucket, prefix):
response = s3_client.delete_objects(
'Objects': marker_batch,
'Quiet': True
# Log any errors
if 'Errors' in response:
for error in response['Errors']:
logger.error(f"Failed to remove marker for {error['Key']}: {error['Message']}")
def get_empty_files(s3_client, bucket: str, prefix: str):
"""Get all objects with size zero under the given prefix."""
paginator = s3_client.get_paginator('list_objects_v2')
for page in tqdm(paginator.paginate(Bucket=bucket, Prefix=prefix), desc="pages"):
if 'Contents' in page:
yield [
{'Key': obj['Key']}
for obj in page['Contents']
if obj['Size'] == 0
def delete_empty_files(s3_client, bucket: str, prefix: str, dry_run: bool = False):
"""Delete all zero-size objects under the given prefix."""
pbar = tqdm(desc="deleted")
for empty_batch in get_empty_files(s3_client, bucket, prefix):
if not empty_batch:
if dry_run:
for obj in empty_batch:"Would delete empty file: {obj['Key']}")
response = s3_client.delete_objects(
'Objects': empty_batch,
'Quiet': True
# Log any errors
if 'Errors' in response:
for error in response['Errors']:
logger.error(f"Failed to delete {error['Key']}: {error['Message']}")
def cli():
"""S3 object management commands."""
@click.option('--profile', help='AWS profile name', default='sc-direct')
@click.option('--dry-run', is_flag=True, help='Show what would be done without actually doing it')
@click.option('--log-level', type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
default='INFO', help='Set logging level')
def undelete(s3_path: str, profile: str = None, dry_run: bool = False, log_level: str = 'INFO'):
"""Remove delete markers from versioned S3 objects, effectively undeleting them."""
bucket, prefix = s3_path.split('/', 1)
session = boto3.Session(profile_name=profile)
s3_client = session.client('s3')
remove_delete_markers(s3_client, bucket, prefix, dry_run)
@click.option('--profile', help='AWS profile name', default='sc-direct')
@click.option('--dry-run', is_flag=True, help='Show what would be done without actually doing it')
@click.option('--log-level', type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
default='INFO', help='Set logging level')
def delete_empty(s3_path: str, profile: str = None, dry_run: bool = False, log_level: str = 'INFO'):
"""Delete all zero-size objects under the given prefix."""
bucket, prefix = s3_path.split('/', 1)
session = boto3.Session(profile_name=profile)
s3_client = session.client('s3')
delete_empty_files(s3_client, bucket, prefix, dry_run)
if __name__ == '__main__':

import boto3
import click
import json
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
@click.option('--collections-file', '-c', type=click.Path(exists=True, path_type=Path),
help='Path to collections configuration file.')
def main(collections_file: Path):
# Load collections config
collections = json.loads(collections_file.read_text())
collections_dir = collections_file.parent
for collection in collections:
s3 = boto3.Session(profile_name=collection['aws_profile']).client('s3')
collection_path = collections_dir / collection['directory']
bucket_name, s3_prefix = collection['s3_path'].split('/', 1)
for file_path in collection_path.rglob('*'):
if file_path.is_file():
relative_path = file_path.relative_to(collection_path)
s3_key = f"{s3_prefix}/{relative_path}"
print(f"Uploading {file_path} to s3://{bucket_name}/{s3_key}")
s3.upload_file(str(file_path), bucket_name, s3_key)
if __name__ == '__main__':

from pathlib import Path
import json
import zipfile
import tempfile
import requests
import click
import logging
from nabit.bin.utils import cli_validate
logger = logging.getLogger(__name__)
def download_file(url: str, target_path: Path):
"""Download a file from URL to target path"""
response = requests.get(url, stream=True)
with'wb') as f:
for chunk in response.iter_content(chunk_size=2**20):
def verify_dataset(json_url: str, zip_url: str, output_dir: Path | None = None):
Verify a dataset by downloading and checking its JSON metadata and ZIP contents.
If output_dir is provided, write the uncompressed contents there.
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# Download files"Downloading metadata from {json_url}...")
json_path = tmpdir / "metadata.json"
download_file(json_url, json_path)"Downloading archive from {zip_url}...")
zip_path = tmpdir / ""
download_file(zip_url, zip_path)
# Load metadata
metadata = json.loads(json_path.read_text())
# Create output directory
if not output_dir:
output_dir = tmpdir / "output"
output_dir.mkdir(parents=True, exist_ok=True)
# Verify file contents"Verifying file contents...")
with'rb') as f:
for entry in metadata['zip_entries']:"Checking {entry['filename']}...")['data_offset'])
zip_data =['compress_size'])
if entry['compress_type'] == zipfile.ZIP_STORED:
uncompressed = zip_data
decompressor = zipfile._get_decompressor(entry['compress_type'])
uncompressed = decompressor.decompress(zip_data)
# write the file
output_file = output_dir / entry['filename']
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_bytes(uncompressed)"All files extracted successfully")
# verify dataset with nabit
# Return metadata for potential further use
return metadata
@click.argument('json_url', type=str)
@click.argument('zip_url', type=str)
@click.option('--output', '-o', type=click.Path(path_type=Path),
help='Directory to write uncompressed files')
@click.option('--log-level', '-l',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
help='Logging level.')
def main(json_url: str, zip_url: str, output: Path = None, log_level: str = 'INFO'):
"""Verify dataset from JSON and ZIP URLs"""
# Set up logging
level=getattr(logging, log_level),
format='%(asctime)s - %(levelname)s - %(message)s'
verify_dataset(json_url, zip_url, output)
if __name__ == '__main__':

import json
import click
from pathlib import Path
from typing import Dict, List, Set, Tuple
import logging
from tqdm import tqdm
logger = logging.getLogger(__name__)
def load_jsonl_data(jsonl_path: Path, keep_fields=None, compare_by: str = 'id') -> Dict[str, dict]:
Load data from JSONL file into a dictionary keyed by id.
Only includes fields that match the CSV format.
jsonl_path: Path to the JSONL file
Dictionary mapping id to filtered record data
# Fields to keep from JSONL records
data = {}
with open(jsonl_path, 'r', encoding='utf-8') as f:
for line in tqdm(f, desc="Loading JSONL"):
if line.strip(): # Skip empty lines
record = json.loads(line)
if keep_fields:
record = {k: v for k, v in record.items() if k in keep_fields}
data[record[compare_by]] = record
return data
def find_differences(csv_data: Dict[str, dict],
jsonl_data: Dict[str, dict]) -> Tuple[Set[str], Set[str], Set[str]]:
Find records that differ between CSV and JSONL data.
csv_data: Dictionary of CSV records keyed by id
jsonl_data: Dictionary of JSONL records keyed by id
Tuple of (csv_only_ids, jsonl_only_ids, different_ids)
csv_ids = set(csv_data.keys())
jsonl_ids = set(jsonl_data.keys())
# Find records only in CSV
csv_only = csv_ids - jsonl_ids
# Find records only in JSONL
jsonl_only = jsonl_ids - csv_ids
return csv_only, jsonl_only
@click.argument('old_path', type=click.Path(exists=True, path_type=Path))
@click.argument('new_path', type=click.Path(exists=True, path_type=Path))
@click.option('--compare-by', '-c',
help='Field to compare by.')
@click.option('--log-level', '-l',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
help='Logging level.')
def main(old_path: Path, new_path: Path, compare_by: str, log_level: str):
"""Compare records between CSV and JSONL files."""
level=getattr(logging, log_level),
format='%(asctime)s - %(levelname)s - %(message)s'
old_data = load_jsonl_data(old_path, compare_by=compare_by)
new_data = load_jsonl_data(new_path, compare_by=compare_by)
# Find differences
old_only, new_only = find_differences(old_data, new_data)
old_only_path = old_path.with_suffix(f'.only_{compare_by}.jsonl')
new_only_path = new_path.with_suffix(f'.only_{compare_by}.jsonl')"Writing {len(old_only)} records to {old_only_path}")
with open(old_only_path, 'w', encoding='utf-8') as f:
for id in old_only:
f.write(json.dumps(old_data[id]) + '\n')"Writing {len(new_only)} records to {new_only_path}")
with open(new_only_path, 'w', encoding='utf-8') as f:
for id in new_only:
f.write(json.dumps(new_data[id]) + '\n')
if __name__ == '__main__':
# import sqlite3
# import json
# # Connect to the database
# conn = sqlite3.connect('data/data.db')
# conn.row_factory = sqlite3.Row # This allows us to access columns by name
# # Open the output file
# with open('data/data_db_dump_20250130.jsonl', 'w') as f:
# # Execute the query and fetch rows in chunks
# cursor = conn.execute('''
# FROM dataset
# ''')
# written = 0
# while True:
# rows = cursor.fetchmany(1000) # Fetch 1000 rows at a time
# if not rows:
# break
# written += len(rows)
# # Write each row as a JSON line
# for row in rows:
# # Convert row to dict and write to file
# json_line = json.dumps(dict(row))
# f.write(json_line + '\n')
# print(f"Wrote {written} rows")
# conn.close()

import json
from collections import Counter, defaultdict
from pathlib import Path
# Read the JSONL file and count crawler_identified_date values
downloaded_counts = Counter()
identified_counts = Counter()
titles_by_org = defaultdict(list)
with open('data/data_db_dump_20250130.only_name.jsonl', 'r') as f:
for line in f:
data = json.loads(line)
org = json.loads(data.get('organization', '{}'))
identified_counts[(data.get('crawler_identified_date') or '')[:10]] += 1
# Print the counts sorted by date
for date, count in sorted(identified_counts.items()):
print(f"{date}: {count}")
# sort each list of titles by org
for org, titles in titles_by_org.items():
Path('data/titles_by_org.json').write_text(json.dumps(titles_by_org, indent=2))
# print urls
for path in Path('data/').glob('glass*'):
with open(path, 'r') as f:
for line in f:
data = json.loads(line)
print("* " + data['name'])
resources = data.get('resources', [])
if type(resources) == str:
resources = json.loads(resources)
for resource in resources:
print(' * ' + resource['url'])

from nabit.lib.archive import package
from nabit.lib.sign import KNOWN_TSAS, is_encrypted_key
from nabit.lib.backends.url import UrlCollectionTask
from pathlib import Path
import json
import uuid
import tempfile
import click
import os
from urllib.parse import urlparse
import re
from scripts.helpers.parallel import run_parallel
import zipfile
import struct
import boto3
import logging
from scripts.data_gov.models import db, Dataset
from playhouse.shortcuts import model_to_dict
from tqdm import tqdm
from datetime import datetime
logger = logging.getLogger(__name__)
## download datasets, create nabit archives, and upload to S3
# File extensions that are already compressed or wouldn't benefit from additional compression
# Already compressed archives
'zip', 'gz', 'tgz', 'bz2', '7z', 'rar', 'xz',
# Compressed images
'jpg', 'jpeg', 'png', 'gif', 'webp',
# Compressed video/audio
'mp4', 'mov', 'avi', 'wmv', 'ogv', 'mp3', 'm4a',
# Other compressed/binary formats
'pdf', 'docx', 'xlsx', 'pptx',
stats_counter = {}
def is_valid_url(url):
parsed = urlparse(url)
return parsed.scheme in ['http', 'https'] and'[^\.]\.[^\.]', parsed.netloc)
def extract_urls(data, urls = None):
urls = set() if urls is None else urls
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, str):
if is_valid_url(value):
elif isinstance(value, (dict, list)):
extract_urls(value, urls)
elif isinstance(data, list):
for item in data:
extract_urls(item, urls)
return urls
def create_archive(bag_dir, dataset: Dataset, signatures):
data_dict = model_to_dict(dataset)
for key, value in data_dict.items():
if isinstance(value, datetime):
data_dict[key] = value.isoformat()
data_gov_url = f'{}'
collect = [
*[UrlCollectionTask(url=url) for url in extract_urls(data_dict)],
]" - Downloading {len(collect)} files")
# sort fields from dataset
data_gov_metadata = {k: v for k, v in data_dict.items() if not k.startswith('crawler_')}
crawler_metadata = {k: v for k, v in data_dict.items() if k.startswith('crawler_')}
# Create the archive
'id': str(uuid.uuid4()),
'url': data_gov_url,
'description': f'Archive of dataset "{dataset.title}" created by {dataset.organization["title"]}. Full metadata stored in data_gov_metadata key.',
'data_gov_metadata': data_gov_metadata,
'crawler_metadata': crawler_metadata,
def zip_archive(bag_dir, archive_path):
# Create zip archive
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for file_path in bag_dir.rglob('*'):
if file_path.is_file():
arc_path = file_path.relative_to(bag_dir)
compression = (zipfile.ZIP_STORED
if file_path.suffix.lower().lstrip('.') in UNCOMPRESSED_EXTENSIONS
else zipfile.ZIP_DEFLATED)
zf.write(file_path, arc_path, compress_type=compression)
# Create metadata file
zip_info = []
with zipfile.ZipFile(archive_path, 'r') as zf:
for info in zf.filelist:
header_offset = info.header_offset
# Read header to calculate data offset
header =
fheader = struct.unpack(zipfile.structFileHeader, header)
fname_length = fheader[zipfile._FH_FILENAME_LENGTH]
extra_length = fheader[zipfile._FH_EXTRA_FIELD_LENGTH]
data_offset = header_offset + zipfile.sizeFileHeader + fname_length + extra_length
'filename': info.filename,
'file_size': info.file_size,
'compress_size': info.compress_size,
'compress_type': info.compress_type,
'header_offset': header_offset,
'data_offset': data_offset,
# Read the bag-info.txt and signed-metadata.json
bag_info = (bag_dir / 'bag-info.txt').read_text()
signed_metadata = json.loads((bag_dir / 'data/signed-metadata.json').read_text())
return {
'bag_info': bag_info,
'signed_metadata': signed_metadata,
'zip_entries': zip_info
def upload_archive(output_path, collection_path, metadata_path, s3_path, session_args):
s3 = boto3.Session(**session_args).client('s3')
bucket_name, s3_path = s3_path.split('/', 1)
# Upload zip file
s3_collection_key = os.path.join(s3_path, str(collection_path.relative_to(output_path)))
s3.upload_file(str(collection_path), bucket_name, s3_collection_key)" - Uploaded {collection_path.relative_to(output_path)} to {s3_collection_key}")
# Upload metadata file
s3_metadata_key = os.path.join(s3_path, str(metadata_path.relative_to(output_path)))
s3.upload_file(str(metadata_path), bucket_name, s3_metadata_key)" - Uploaded {metadata_path.relative_to(output_path)} to {s3_metadata_key}")
def run_pipeline(
dataset: Dataset,
output_path: Path,
metadata_path: Path,
collection_path: Path,
signatures: list = None,
session_args: dict = None,
s3_path: str = None,
no_delete: bool = False,
):"Processing dataset: {}")
# we have a db forked from the main process, so we need to close it and reopen if needed
# set this here so it makes it into the metadata
dataset.crawler_downloaded_date =
with tempfile.TemporaryDirectory(dir=str(output_path)) as temp_dir:"- Creating archive...")
# set up paths
temp_dir = Path(temp_dir)
bag_dir = temp_dir / 'bag'
archive_path = temp_dir / ''
# download data with nabit
create_archive(bag_dir, dataset, signatures)"- Zipping archive...")
# zip up data and create metadata
output_metadata = zip_archive(bag_dir, archive_path)"- Moving files to final location...")
# Move files to final location
collection_path.parent.mkdir(parents=True, exist_ok=True)
metadata_path.parent.mkdir(parents=True, exist_ok=True)
os.rename(str(archive_path), collection_path)
metadata_path.write_text(json.dumps(output_metadata) + '\n')
if s3_path:"Uploading to S3...")
upload_archive(output_path, collection_path, metadata_path, s3_path, session_args)
if not no_delete:"- Deleting zip archive...")
if collection_path.parent.exists() and not os.listdir(collection_path.parent):
os.rmdir(collection_path.parent)"- Setting crawler_downloaded_date...")
db.connect()"Processing complete")
def get_unprocessed_datasets(output_path: Path, collection: str, min_size: int = 0, dataset_name: str = None):
"""Get datasets from SQLite that don't have metadata files yet."""
query =
if dataset_name:
query = query.where( == dataset_name)
if min_size:
query = query.where(Dataset.size >= min_size)
# Initialize progress bars
stats_counter['total'] = tqdm(desc="Total records", unit="pkg")
stats_counter['skipped'] = tqdm(desc="Already processed", unit="pkg")
stats_counter['yielded'] = tqdm(desc="Processing", unit="pkg")
for dataset in query:
# Check if metadata file exists
name =
metadata_path = output_path / 'metadata' / collection / name / 'v1.json'
if metadata_path.exists():
yield dataset
@click.option('--db-path', '-d', type=click.Path(exists=True, path_type=Path), default='data/data.db')
@click.option('--output-path', '-o', type=click.Path(path_type=Path), default='data/processed',
help='Output path.')
@click.option('--collection', '-c', type=str, default='data_gov',
help='Collection name.')
@click.option('--workers', '-w', type=int, default=None,
help='Number of worker processes. Defaults to CPU count.')
@click.option('--min-size', '-s', type=int, default=0,
help='Minimum size of dataset to process.')
@click.option('--dataset-name', help='Dataset name to process.')
@click.option('--if-exists', '-e', type=click.Choice(['skip', 'replace', 'version']), default='skip',
help='Whether to skip, replace, or add a version if dataset already exists.')
@click.option('--signatures', help='JSON string of signature configuration.')
@click.option('--profile', '-p', help='AWS profile name')
@click.option('--s3-path', '-s', help='S3 path for uploads, e.g. "<bucket_name>/<path>"')
@click.option('--log-level', '-l', type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']), default=None,
help='Logging level.')
@click.option('--stop-after', help='Stop after processing this many collections', type=int)
@click.option('--no-delete', is_flag=True, help='Set to preserve zipped data on disk as well as metadata')
def main(db_path: Path, output_path: Path, collection: str, workers=None, min_size=0, dataset_name=None,
if_exists='skip', signatures=None, profile=None, s3_path=None, log_level=None, stop_after=None, no_delete=False):
if dataset_name:
workers = 1
stop_after = 1
if signatures:
signatures = json.loads(signatures)
for signature in signatures:
if signature['action'] == 'sign':
if is_encrypted_key(signature['params']['key']):
signature['params']['password'] = click.prompt(
f"Enter password for {signature['params']['key']}: ",
elif signature['action'] == 'timestamp':
if known_tsa := signature.pop('known_tsa', None):
signature['params'] = KNOWN_TSAS[known_tsa]
session_args = {}
if profile:
session_args['profile_name'] = profile
# Initialize database connection
def get_tasks():
processed = 0
for dataset in get_unprocessed_datasets(output_path, collection, min_size, dataset_name):
# handle existing datasets
name =
collection_path = output_path / 'collections' / collection / name / ''
metadata_path = output_path / 'metadata' / collection / name / 'v1.json'
if metadata_path.exists():
if if_exists == 'skip':
elif if_exists == 'replace':
if collection_path.exists():
elif if_exists == 'version':
version = 2
while True:
collection_path = output_path / 'collections' / collection / name / f'v{version}.zip'
metadata_path = output_path / 'metadata' / collection / name / f'v{version}.json'
if not metadata_path.exists():
version += 1
yield dataset, output_path, metadata_path, collection_path, signatures, session_args, s3_path, no_delete
processed += 1
if stop_after and processed >= stop_after:
run_parallel(run_pipeline, get_tasks(), workers, log_level=log_level, catch_errors=False)
# Close progress bars
for counter in stats_counter.values():
if __name__ == '__main__':

import httpx
from typing import Iterator, Dict, Any, List
import time
import click
from pathlib import Path
import logging
from datetime import datetime
from scripts.data_gov.models import db, Dataset, DatasetHistory
from tqdm import tqdm
from playhouse.shortcuts import model_to_dict
from jsondiff import diff
logger = logging.getLogger(__name__)
stats_counter = {}
def init_database(db_path: Path) -> None:
"""Initialize the database connection and create tables."""
db.create_tables([Dataset, DatasetHistory])
def save_to_database(results: List[Dict[str, Any]]) -> None:
Save a batch of packages to the database using Peewee.
if not results:
# Process datetime fields in incoming records
for package in results:
for field in ['metadata_created', 'metadata_modified']:
if package.get(field):
package[field] = datetime.fromisoformat(
package[field].replace('Z', '+00:00')
except ValueError:
package[field] = None
# Get all IDs from incoming packages
incoming_ids = [pkg['id'] for pkg in results]
# Fetch existing records as model instances
existing_records = { record
for record in << incoming_ids)
# Prepare bulk operations
history_records = []
new_records = []
# Compare records and prepare operations
for package_data in results:
# Create a new model instance from the package data
new_package = Dataset(**package_data)
existing = existing_records.get(package_data['id'])
if existing:
# Compare model instances using their dict representations
if diff(model_to_dict(existing), model_to_dict(new_package)):
# Record changed - add to history and update
# Record unchanged - skip
# New record - just add it
with db.atomic():
# Bulk move history records if any exist
if history_records:
Dataset.delete().where( << [ for h in history_records]).execute()
# Bulk insert new records
if new_records:
def save_packages_to_database(output_path: Path, rows_per_page: int = 1000, start_date: str | None = None) -> None:
Save fetched data to the database, resuming from last position if needed.
output_path: Path to save the database
rows_per_page: Number of results to fetch per page
start_date: Optional date to start fetching from
stats_counter['new'] = tqdm(desc="New records", unit="pkg")
stats_counter['updated'] = tqdm(desc="Updated records", unit="pkg")
stats_counter['skipped'] = tqdm(desc="Unchanged records", unit="pkg")
for results in tqdm(fetch_data_gov_packages(rows_per_page=rows_per_page, start_date=start_date, max_retries=10)):
def fetch_data_gov_packages(rows_per_page: int = 1000, start_date: str = None, max_retries: int = 3) -> Iterator[Dict[str, Any]]:
Fetch package data from API using date-based pagination.
rows_per_page: Number of results to fetch per page
start_date: Optional date to start fetching from (format: YYYY-MM-DDTHH:MM:SS.mmmmmm)
max_retries: Maximum number of retry attempts for 5xx errors
Dict containing package data for each result
base_url = ""
current_date = start_date
total_records = 0
while True:"Current date offset: {current_date}")
# Build date filter query
url = f"{base_url}?rows={rows_per_page}&sort=metadata_modified+desc"
if current_date:
# Format date to match Solr's expected format (dropping microseconds)
formatted_date = current_date.split('.')[0] + 'Z'
date_filter = f"+metadata_modified:[* TO {formatted_date}]"
url += f"&fq={date_filter}"
for attempt in range(max_retries):
start_time = time.time()
response = httpx.get(url, timeout=60.0)
request_time = time.time() - start_time
break # Success, exit retry loop
except httpx.HTTPStatusError as e:
if e.response.status_code >= 500 and attempt < max_retries - 1:
retry_wait = 2 ** attempt # Exponential backoff
logger.warning(f"Got {e.response.status_code}, retrying in {retry_wait}s... (attempt {attempt + 1}/{max_retries})")
logger.warning(f"Error URL: {url}")
# If not a 5xx error or we're out of retries, re-raise
logger.error(f"Error URL: {url}")
logger.error(f"Response content: {response.text}")
data = response.json()
results = data["result"]["results"]
if not results:
# Get date of last result for next query
current_date = results[-1]["metadata_modified"]
total_records += len(results)"Request took {request_time:.2f}s. Total records: {total_records}")
yield results
def get_dataset_history(dataset_name: str) -> None:
Fetch and display all versions of a dataset with the given ID,
from oldest to newest, showing only changed fields between versions.
# Get all versions including current
versions = [
model_to_dict(record, recurse=True)
for record in (DatasetHistory
.where( == dataset_name)
current_record = == dataset_name).first()
if current_record:
versions.append(model_to_dict(current_record, recurse=True))
if not versions:
print(f"No dataset found with name: {dataset_name}")
# Print each version with changed fields
prev = None
for curr in versions:
history_id = curr.pop('history_id', None)
if prev:
diff_fields = diff(prev, curr)
diff_fields = curr
print(f"*** Version: {curr.get('metadata_modified')} ***")
for k, v in diff_fields.items():
print(f"- {k}: {v}")
prev = curr
def cli():
""" dataset mirroring tools."""
# Modify the existing main function to be a command in the group
@click.argument('output_path', type=click.Path(path_type=Path), default='data/data.db')
@click.option('--rows-per-page', '-r', type=int, default=1000,
help='Number of results to fetch per page.')
@click.option('--start-date', '-s', type=str, default=None,
help='Date to start fetching from (format: YYYY-MM-DDTHH:MM:SS.mmmmmm)')
@click.option('--log-level', '-l',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
help='Logging level.')
def fetch(output_path: Path, rows_per_page: int, start_date: str, log_level: str):
"""Fetch package data from API and save to database."""
level=getattr(logging, log_level),
format='%(asctime)s - %(levelname)s - %(message)s'
save_packages_to_database(output_path, rows_per_page, start_date)
@click.argument('db_path', type=click.Path(path_type=Path), default='data/data.db')
def history(dataset_name: str, db_path: Path):
"""Show version history for a dataset with the given ID."""
@click.argument('db_path', type=click.Path(path_type=Path), default='data/data.db')
def delete_duplicate_history(db_path: Path):
"""Delete duplicate history records."""
# Get all unique dataset names in history
unique_names = (DatasetHistory
total_deleted = 0
for (name,) in tqdm(unique_names, desc="Processing datasets"):
# Get all versions for this dataset ordered by modification date
versions = [
for record in (DatasetHistory
.where( == name)
current_record = == name).first()
if current_record:
# Track IDs of duplicate records to delete
to_delete = []
# Compare adjacent versions
prev = versions[0]
prev_id = prev.pop('history_id')
for curr in versions[1:]:
curr_id = curr.pop('history_id', None)
# If versions are identical, mark current version for deletion
if not diff(prev, curr):
prev = curr
prev_id = curr_id
# Bulk delete duplicate records
if to_delete:
deleted = (DatasetHistory
.where(DatasetHistory.history_id << to_delete)
total_deleted += deleted
click.echo(f"Deleted {total_deleted} duplicate history records")
if __name__ == "__main__":

import httpx
import json
import time
import logging
from pathlib import Path
from typing import Iterator, Dict, Any, List
import click
from scripts.data_gov.fetch_index import fetch_data_gov_packages
logger = logging.getLogger(__name__)
@click.argument('output_path', type=click.Path(path_type=Path), default='data/data_20250130.jsonl')
@click.option('--rows-per-page', '-r', type=int, default=1000,
help='Number of results to fetch per page.')
@click.option('--log-level', '-l',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
help='Logging level.')
@click.option('--start-date', '-s', type=str, default=None,
help='Start date for fetching packages in YYYY-MM-DD format.')
def main(output_path: Path, rows_per_page: int, log_level: str, start_date: str):
"""Fetch all package data from API and save to JSONL file."""
level=getattr(logging, log_level),
format='%(asctime)s - %(levelname)s - %(message)s'
with open(output_path, 'a') as f:
for results in fetch_data_gov_packages(rows_per_page=rows_per_page, start_date=start_date):
for package in results:
f.write(json.dumps(package) + '\n')
if __name__ == "__main__":

from playhouse.migrate import *
from scripts.data_gov.models import db
migrator = SqliteMigrator(db)
def do_migrate():
crawler_identified_date = DateTimeField(null=True)
crawler_downloaded_date = DateTimeField(null=True)
with db.atomic():
# migrator.add_column('dataset', 'crawler_identified_date', crawler_identified_date),
# migrator.add_column('dataset', 'crawler_downloaded_date', crawler_downloaded_date),
# migrator.add_column('datasethistory', 'crawler_identified_date', crawler_identified_date),
# migrator.add_column('datasethistory', 'crawler_downloaded_date', crawler_downloaded_date),
if __name__ == '__main__':

from peewee import *
from playhouse.sqlite_ext import JSONField
from pathlib import Path
from datetime import datetime
db = SqliteDatabase(Path(__file__).parent.parent.parent / 'data/data.db', pragmas={
# tuning suggested by Claude:
'journal_mode': 'wal', # Write-Ahead Logging for better concurrency
'cache_size': -1024 * 64, # 64MB cache (negative number means kibibytes)
'synchronous': 'normal', # Good balance between safety and speed
'busy_timeout': 30000, # Wait up to 30 seconds when database is locked
'temp_store': 'memory', # Store temp tables in memory
'mmap_size': 268435456, # Memory-mapped I/O (256MB)
'page_size': 4096, # Optimal for most systems
class BaseModel(Model):
class Meta:
database = db
class Dataset(BaseModel):
# fields from
id = CharField(primary_key=True)
name = CharField(null=True)
title = CharField(null=True)
notes = TextField(null=True)
metadata_created = DateTimeField(null=True)
metadata_modified = DateTimeField(null=True)
private = BooleanField(null=True)
state = CharField(null=True)
version = CharField(null=True)
type = CharField(null=True)
num_resources = IntegerField(null=True)
num_tags = IntegerField(null=True)
isopen = BooleanField(null=True)
author = CharField(null=True)
author_email = CharField(null=True)
creator_user_id = CharField(null=True)
license_id = CharField(null=True)
license_url = CharField(null=True)
license_title = CharField(null=True)
maintainer = CharField(null=True)
maintainer_email = CharField(null=True)
owner_org = CharField(null=True)
url = CharField(null=True)
organization = JSONField(null=True)
extras = JSONField(null=True)
resources = JSONField(null=True)
tags = JSONField(null=True)
groups = JSONField(null=True)
relationships_as_subject = JSONField(null=True)
relationships_as_object = JSONField(null=True)
# fields starting with crawler_ are added by our crawler
crawler_identified_date = DateTimeField(null=True,
crawler_downloaded_date = DateTimeField(null=True)
class DatasetHistory(Dataset):
history_id = AutoField(primary_key=True)
id = CharField() # Regular CharField, not primary key
#deleted_by_date = DateTimeField(null=True) # New field to track deletion date

import csv
import logging
from pathlib import Path
from scripts.helpers.parallel import run_parallel
import click
from tqdm import tqdm
from gitspoke import Downloader, GitHubAPI
from gitspoke.cli import valid_include_items, get_token
import os
import json
import requests
from scripts.helpers.config import load_config
logger = logging.getLogger(__name__)
stats_counter = {}
CONFIG_PATH = (os.environ.get("XDG_CONFIG_HOME") or (Path.home() / ".config")) / "data-mirror" / "config.json"
def check_repo_exists(org_name, repo_name, token, output_path=None):
"""Check if a repository still exists on GitHub."""
exists = True
GitHubAPI(token).request(f"repos/{org_name}/{repo_name}", method="HEAD")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
exists = False
raise e
if not exists:
repo_link = f"{org_name}/{repo_name}"
if output_path:
with open(output_path, 'a') as output_file:
return exists
def run_pipeline(org_name, repo_name, collection_path, include, token, check_exists=False, output_path=None):
"""Process a single repository."""
if check_exists:
return check_repo_exists(org_name, repo_name, token, output_path)"Processing repository: {org_name}/{repo_name}")
Downloader(org_name, repo_name, token, max_retries=20).download_repo(collection_path, include=include)"Processing complete")
def get_tasks(csv_path: Path, output_path: Path, collection: str, skip_rows: int = 0, skip_existing: bool = False, stop_after: int = None, include: str = None,
check_exists: bool = False):
"""Get repositories from CSV that haven't been processed yet."""
# Initialize progress bars
if not check_exists:
stats_counter['total'] = tqdm(desc="Total records", unit="repo")
if skip_existing:
stats_counter['skipped'] = tqdm(desc="Skipped", unit="repo")
stats_counter['yielded'] = tqdm(desc="Processing", unit="repo")
# handle --include
if include:
include = include.split(',')
include = ['repo_info']
# import token or tokens
config = load_config()
if config.get('tokens'):
tokens = config['tokens']
tokens = [get_token(None)]
if tokens != [None]:
logger.warning(f"Using {len(tokens)} tokens")
logger.warning("Using unauthenticated rate limits")
with open(csv_path, 'r') as file:
reader = csv.DictReader(file)
# Skip specified number of rows
for _ in range(skip_rows):
processed = 0
for row in reader:
if not check_exists:
if not row['html_url']: # Skip empty rows
org_name, repo_name = row['html_url'].split('/')[-2:]
collection_path = output_path / 'collections' / collection / org_name / repo_name
if skip_existing:
if collection_path.exists():
# use tokens round robin
token = tokens[processed % len(tokens)]
yield org_name, repo_name, collection_path, include, token, check_exists, output_path
processed += 1
if stop_after and processed >= stop_after:
# Close progress bars
for counter in stats_counter.values():
@click.option('--output-path', '-o', type=click.Path(path_type=Path), default='data/processed',
help='Output path.')
@click.option('--collection', '-c', type=str, default='github_raw',
help='Collection name.')
@click.option('--workers', '-w', type=int, default=None,
help='Number of worker processes. Defaults to CPU count.')
@click.option('--skip-rows', type=int, default=0,
help='Number of rows to skip in the CSV.')
help='Comma-separated list of elements to include: ' + ', '.join(valid_include_items))
@click.option('--csv-path', '-csv', type=click.Path(path_type=Path), default='data/repos_by_cumulative_popularity.csv',
help='Path to the CSV file.')
@click.option('--log-level', '-l',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
help='Logging level.')
@click.option('--stop-after', help='Stop after processing this many repositories', type=int)
@click.option('--skip-existing', is_flag=True, help='Set to skip existing repositories')
@click.option('--check-exists', is_flag=True, help='Only check if repositories still exist on GitHub')
def main(csv_path: Path, output_path: Path, collection: str, workers=None, skip_rows=0, include=None,
log_level=None, stop_after=None, skip_existing=False, check_exists=False):
get_tasks(csv_path, output_path, collection, skip_rows, skip_existing, stop_after, include, check_exists),
if __name__ == "__main__":

import json
import os
from pathlib import Path
CONFIG_PATH = (os.environ.get("XDG_CONFIG_HOME") or (Path.home() / ".config")) / "data-mirror" / "config.json"
def load_config():
"""Load configuration from config file."""
if CONFIG_PATH.exists():
config = json.loads(CONFIG_PATH.read_text())
config = {}
return config

from multiprocessing import Queue, Process
from queue import Empty
import os
from tqdm import tqdm
from typing import Callable, Iterable
import logging
# Set up logger
logger = logging.getLogger(__name__)
def worker(task_queue, task, catch_errors: bool = True):
while True:
args = task_queue.get(timeout=1)
if args is None:
logger.debug(f"[PID {os.getpid()}] Processing task")
except Empty:
except Exception as e:
if catch_errors:
logger.error(f"[PID {os.getpid()}] Worker error: {e}")
raise e
def run_parallel(processor: Callable, tasks: Iterable, workers = None, catch_errors: bool = True, log_level: str | None = None, task_count: int | None = None):
workers = workers or os.cpu_count() or 4
# Configure logging based on whether we're running in parallel or not
if log_level is None:
log_level = 'INFO' if workers == 1 else 'WARNING'
format='[%(process)d] %(message)s'
logger.debug(f"Starting processing with {workers} workers")
if workers > 1:
task_queue = Queue(maxsize=100)
# Start worker processes
processes = []
for _ in range(workers):
p = Process(target=worker, args=(task_queue, processor, catch_errors))
# Load tasks into queue
for task_item in tqdm(tasks, total=task_count):
if workers > 1:
if workers > 1:
# Signal workers to exit
for _ in range(workers):
# Wait for all processes to complete
for p in processes:

