-
Notifications
You must be signed in to change notification settings - Fork 137
WIP: Icechunk opener #1135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
WIP: Icechunk opener #1135
Conversation
|
I will automatically update this comment whenever this PR is modified
|
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
earthaccess/__init__.py
Outdated
| ) | ||
| from .auth import Auth | ||
| from .dmrpp_zarr import open_virtual_dataset, open_virtual_mfdataset | ||
| from .icechunk import _open_icechunk_from_url |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there an underscore prefix? This does not match up with the entry you added to __all__.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks for catching this @chuckwondo. My intention here was to hack most of this together and develop it further after an initial test (with lots of hardcoded bits) passes.
earthaccess/icechunk.py
Outdated
| # TODO: Figure out how to ensure authentication here. | ||
|
|
||
|
|
||
| def _get_daac_provider_from_url(url: str) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type is annotated incorrectly.
earthaccess/icechunk.py
Outdated
| @@ -0,0 +1,109 @@ | |||
| from datetime import datetime | |||
| from typing import Dict, List, Optional | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| from typing import Dict, List, Optional |
Co-authored-by: Chuck Daniels <cjdaniels4@gmail.com>
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
Thank you so much for looking into this @chuckwondo. |
|
Follow up on Slack discussion I had with @betolink and Ryan Abbott over on slack. The main thing I need for this functionality is a mapping from bucket (+prefix) to a credentials endpoint. With the help of claude I created a little script that crawls CMR to find this mapping, and crucially see if the mapping between each bucket and credentials endpoint is unique: Crawl Script v0#!/usr/bin/env python3
"""
Async CMR Query - Map S3 Buckets to Auth Endpoints
Crawls NASA CMR API and maps buckets (no prefix) to S3 credentials endpoints.
Warns about buckets with conflicting endpoints.
"""
import asyncio
import aiohttp
import json
from typing import Dict, Set, Tuple
from collections import defaultdict
async def fetch_page(
session: aiohttp.ClientSession,
base_url: str,
page_num: int,
page_size: int,
cloud_hosted: bool = True
) -> Tuple[int, list, int]:
"""
Fetch a single page from CMR API.
Returns:
Tuple of (page_num, items, total_hits)
"""
params = {
"page_size": page_size,
"page_num": page_num
}
if cloud_hosted:
params["cloud_hosted"] = "true"
try:
async with session.get(base_url, params=params, timeout=aiohttp.ClientTimeout(total=30)) as response:
response.raise_for_status()
data = await response.json()
items = data.get("items", [])
total_hits = int(response.headers.get("CMR-Hits", 0))
return page_num, items, total_hits
except Exception as e:
print(f" ✗ Error fetching page {page_num}: {e}")
return page_num, [], 0
async def query_cmr_async(
base_url: str = "https://cmr.earthdata.nasa.gov/search/collections.umm_json",
cloud_hosted: bool = True,
max_pages: int = 100,
page_size: int = 100,
concurrent_requests: int = 10
) -> Dict[str, Dict]:
"""
Asynchronously query CMR API and collect DirectDistributionInformation.
Args:
base_url: CMR API endpoint
cloud_hosted: Filter for cloud-hosted collections
max_pages: Maximum number of pages to fetch
page_size: Results per page
concurrent_requests: Number of concurrent requests
Returns:
Dictionary mapping concept_id to DirectDistributionInformation
"""
print(f"Starting async CMR query...")
print(f" Max pages: {max_pages}")
print(f" Page size: {page_size}")
print(f" Concurrent requests: {concurrent_requests}")
print()
results = {}
async with aiohttp.ClientSession() as session:
# First, fetch page 1 to get total hits
_, items, total_hits = await fetch_page(session, base_url, 1, page_size, cloud_hosted)
if items:
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
print(f"Total collections available: {total_hits}")
total_pages = min(max_pages, (total_hits + page_size - 1) // page_size)
print(f"Will fetch {total_pages} page(s)\n")
if total_pages <= 1:
return results
# Fetch remaining pages concurrently
tasks = []
for page_num in range(2, total_pages + 1):
task = fetch_page(session, base_url, page_num, page_size, cloud_hosted)
tasks.append(task)
# Process in batches to limit concurrency
if len(tasks) >= concurrent_requests:
batch_results = await asyncio.gather(*tasks)
for page_num, items, _ in batch_results:
if items:
print(f" ✓ Page {page_num}: {len(items)} collections")
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
tasks = []
# Process remaining tasks
if tasks:
batch_results = await asyncio.gather(*tasks)
for page_num, items, _ in batch_results:
if items:
print(f" ✓ Page {page_num}: {len(items)} collections")
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
print(f"\n✓ Collected {len(results)} collections with DirectDistributionInformation\n")
return results
def extract_bucket(s3_path: str) -> str:
"""Extract just the bucket name from S3 path."""
if s3_path.startswith('s3://'):
s3_path = s3_path[5:]
bucket = s3_path.split('/')[0]
return bucket
def create_bucket_mapping(results: Dict[str, Dict]) -> Tuple[Dict[str, str], Dict[str, Set[str]]]:
"""
Create mapping from bucket to endpoint.
Returns:
Tuple of (bucket_to_endpoint, bucket_conflicts)
where bucket_conflicts contains buckets with multiple endpoints
"""
print("Processing bucket mappings...")
# Track all endpoints seen for each bucket
bucket_endpoints = defaultdict(set)
for concept_id, info in results.items():
endpoint = info.get('S3CredentialsAPIEndpoint')
if not endpoint:
continue
s3_paths = info.get('S3BucketAndObjectPrefixNames', [])
for s3_path in s3_paths:
bucket = extract_bucket(s3_path)
bucket_endpoints[bucket].add(endpoint)
# Create final mapping (using first endpoint alphabetically for conflicts)
bucket_to_endpoint = {}
bucket_conflicts = {}
for bucket, endpoints in bucket_endpoints.items():
if len(endpoints) > 1:
# Conflict detected
bucket_conflicts[bucket] = endpoints
# Use first endpoint alphabetically
bucket_to_endpoint[bucket] = sorted(endpoints)[0]
else:
bucket_to_endpoint[bucket] = next(iter(endpoints))
print(f"✓ Mapped {len(bucket_to_endpoint)} unique buckets to endpoints")
print(f"⚠ Found {len(bucket_conflicts)} bucket(s) with conflicting endpoints\n")
return bucket_to_endpoint, bucket_conflicts
def print_conflicts(conflicts: Dict[str, Set[str]]):
"""Print warning about buckets with multiple endpoints."""
if not conflicts:
print("="*80)
print("✓ NO CONFLICTS - All buckets have consistent endpoints")
print("="*80)
return
print("="*80)
print("⚠ WARNING: BUCKETS WITH MULTIPLE ENDPOINTS")
print("="*80)
print(f"\nFound {len(conflicts)} bucket(s) with conflicting endpoints:\n")
for bucket, endpoints in sorted(conflicts.items()):
print(f"Bucket: {bucket}")
for endpoint in sorted(endpoints):
print(f" - {endpoint}")
print()
def print_summary(mapping: Dict[str, str], conflicts: Dict[str, Set[str]]):
"""Print summary statistics."""
print("="*80)
print("SUMMARY")
print("="*80)
unique_endpoints = len(set(mapping.values()))
print(f"Total unique buckets: {len(mapping)}")
print(f"Unique endpoints: {unique_endpoints}")
print(f"Buckets with conflicts: {len(conflicts)}")
# Group by endpoint
endpoint_groups = defaultdict(list)
for bucket, endpoint in mapping.items():
endpoint_groups[endpoint].append(bucket)
print(f"\nBuckets per endpoint:")
for endpoint, buckets in sorted(endpoint_groups.items(), key=lambda x: len(x[1]), reverse=True):
print(f" {endpoint}")
print(f" → {len(buckets)} bucket(s)")
async def main():
"""Main execution."""
print("\n" + "="*80)
print("NASA CMR ASYNC S3 BUCKET TO ENDPOINT MAPPER")
print("="*80 + "\n")
# Configuration
MAX_PAGES = 10000 # Adjust this to crawl more/fewer pages
PAGE_SIZE = 100 # Max is 2000, but 100 is more stable
CONCURRENT_REQUESTS = 10 # Number of simultaneous requests
# Step 1: Query CMR asynchronously
results = await query_cmr_async(
max_pages=MAX_PAGES,
page_size=PAGE_SIZE,
concurrent_requests=CONCURRENT_REQUESTS
)
if not results:
print("No results collected. Exiting.")
return
# Step 2: Create bucket mapping and detect conflicts
mapping, conflicts = create_bucket_mapping(results)
# Step 3: Print conflicts
print_conflicts(conflicts)
# Step 4: Print summary
print()
print_summary(mapping, conflicts)
# Step 5: Save outputs
print(f"\n{'='*80}")
print("SAVING RESULTS")
print("="*80)
with open('bucket_to_endpoint.json', 'w') as f:
json.dump(mapping, f, indent=2, sort_keys=True)
print("✓ Saved bucket_to_endpoint.json")
if conflicts:
conflicts_serializable = {k: list(v) for k, v in conflicts.items()}
with open('bucket_conflicts.json', 'w') as f:
json.dump(conflicts_serializable, f, indent=2, sort_keys=True)
print("✓ Saved bucket_conflicts.json")
with open('cmr_raw_results.json', 'w') as f:
json.dump(results, f, indent=2)
print("✓ Saved cmr_raw_results.json")
print(f"\n{'='*80}")
print("COMPLETE!")
print("="*80 + "\n")
if __name__ == "__main__":
asyncio.run(main())What I get as a result is that mostly that mapping is unique: {
"TestBucket": "www.testexample.com",
"asdc-prod-protected": "https://data.asdc.earthdata.nasa.gov/s3credentials",
"asf-cumulus-prod-alos2-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-aria-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-browse": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-browse": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-product": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-seasat-products": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-grd-7d1b4348": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-ocn-1e29d408": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-raw-98779950": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-slc-7b420b89": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-xml-8cf7476b": "https://sentinel1.asf.alaska.edu/s3credentials",
"csda-cumulus-prod-protected-5047": "https://data.csdap.earthdata.nasa.gov/s3credentials",
"gesdisc-cumulus-prod-protected": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"gesdisc-cumulus-prod-protectedAqua_AIRS_Level2": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"ghrcw-protected": "https://data.ghrc.earthdata.nasa.gov/s3credentials",
"ghrcwuat-protected": "https://data.ghrc.uat.earthdata.nasa.gov/s3credentials",
"lp-prod-protected": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-prod-public": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-protected": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-public": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-sit-protected": "https://data.lpdaac.sit.earthdatacloud.nasa.gov/s3credentials",
"lp-sit-public": "https://data.lpdaac.sit.earthdatacloud.nasa.gov/s3credentials",
"nsidc-cumulus-prod-protected": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"nsidc-cumulus-prod-public": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-prod-public": "https://obdaac-tea.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-sit-public": "https://obdaac-tea.sit.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-uat-public": "https://obdaac-tea.uat.earthdatacloud.nasa.gov/s3credentials",
"ornl-cumulus-prod-protected": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"ornl-cumulus-prod-public": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-docs": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-protected": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-public": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-swot-ops-cumulus-protected": "https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"podaac-swot-ops-cumulus-public": "https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"prod-lads": "https://data.laadsdaac.earthdatacloud.nasa.gov/s3credentials"
}with a few exceptions: {
"asf-cumulus-prod-opera-browse": [
"https://cumulus.asf.alaska.edu/s3credentials",
"https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials"
],
"asf-cumulus-prod-opera-products": [
"https://cumulus.asf.alaska.edu/s3credentials",
"https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials"
]
}Ill keep digging, to see if the bucket+first 'folder' is in fact unique! |
|
Ok i think this might just be sufficient for now. Crawl script V1#!/usr/bin/env python3
"""
Async CMR Query - Map S3 Buckets to Auth Endpoints
Crawls NASA CMR API and maps bucket/prefix keys to S3 credentials endpoints.
Warns about bucket/prefix keys with conflicting endpoints.
Note: In S3, there are no actual "folders" - only keys with delimiters (/).
What looks like a folder structure is just part of the object key.
"""
import asyncio
import aiohttp
import json
from typing import Dict, Set, Tuple
from collections import defaultdict
async def fetch_page(
session: aiohttp.ClientSession,
base_url: str,
page_num: int,
page_size: int,
cloud_hosted: bool = True
) -> Tuple[int, list, int]:
"""
Fetch a single page from CMR API.
Returns:
Tuple of (page_num, items, total_hits)
"""
params = {
"page_size": page_size,
"page_num": page_num
}
if cloud_hosted:
params["cloud_hosted"] = "true"
try:
async with session.get(base_url, params=params, timeout=aiohttp.ClientTimeout(total=30)) as response:
response.raise_for_status()
data = await response.json()
items = data.get("items", [])
total_hits = int(response.headers.get("CMR-Hits", 0))
return page_num, items, total_hits
except Exception as e:
print(f" ✗ Error fetching page {page_num}: {e}")
return page_num, [], 0
async def query_cmr_async(
base_url: str = "https://cmr.earthdata.nasa.gov/search/collections.umm_json",
cloud_hosted: bool = True,
max_pages: int = 100,
page_size: int = 100,
concurrent_requests: int = 10
) -> Dict[str, Dict]:
"""
Asynchronously query CMR API and collect DirectDistributionInformation.
Args:
base_url: CMR API endpoint
cloud_hosted: Filter for cloud-hosted collections
max_pages: Maximum number of pages to fetch
page_size: Results per page
concurrent_requests: Number of concurrent requests
Returns:
Dictionary mapping concept_id to DirectDistributionInformation
"""
print(f"Starting async CMR query...")
print(f" Max pages: {max_pages}")
print(f" Page size: {page_size}")
print(f" Concurrent requests: {concurrent_requests}")
print()
results = {}
async with aiohttp.ClientSession() as session:
# First, fetch page 1 to get total hits
_, items, total_hits = await fetch_page(session, base_url, 1, page_size, cloud_hosted)
if items:
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
print(f"Total collections available: {total_hits}")
total_pages = min(max_pages, (total_hits + page_size - 1) // page_size)
print(f"Will fetch {total_pages} page(s)\n")
if total_pages <= 1:
return results
# Fetch remaining pages concurrently
tasks = []
for page_num in range(2, total_pages + 1):
task = fetch_page(session, base_url, page_num, page_size, cloud_hosted)
tasks.append(task)
# Process in batches to limit concurrency
if len(tasks) >= concurrent_requests:
batch_results = await asyncio.gather(*tasks)
for page_num, items, _ in batch_results:
if items:
print(f" ✓ Page {page_num}: {len(items)} collections")
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
tasks = []
# Process remaining tasks
if tasks:
batch_results = await asyncio.gather(*tasks)
for page_num, items, _ in batch_results:
if items:
print(f" ✓ Page {page_num}: {len(items)} collections")
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
print(f"\n✓ Collected {len(results)} collections with DirectDistributionInformation\n")
return results
def extract_bucket_prefix_key(s3_path: str, prefix_depth: int = 0) -> str:
"""
Extract bucket and prefix up to specified depth from S3 path.
In S3, there are no actual folders - only object keys with '/' delimiters.
This function extracts the bucket and the first N prefix components.
Args:
s3_path: S3 path like 's3://bucket/prefix/component1/component2'
prefix_depth: How many prefix components to include (0 = bucket only)
Returns:
String like 'bucket' (depth=0) or 'bucket/prefix/component1' (depth=2)
Examples:
extract_bucket_prefix_key('s3://my-bucket/data/2024/file.txt', 0) -> 'my-bucket'
extract_bucket_prefix_key('s3://my-bucket/data/2024/file.txt', 1) -> 'my-bucket/data'
extract_bucket_prefix_key('s3://my-bucket/data/2024/file.txt', 2) -> 'my-bucket/data/2024'
"""
if s3_path.startswith('s3://'):
s3_path = s3_path[5:]
parts = s3_path.split('/')
bucket = parts[0]
if prefix_depth == 0:
return bucket
# Include bucket + prefix_depth components
# Handle case where path doesn't have enough components
end_idx = min(1 + prefix_depth, len(parts))
key = '/'.join(parts[:end_idx])
return key
def create_bucket_key_mapping_recursive(
results: Dict[str, Dict],
max_depth: int = 5
) -> Tuple[Dict[str, str], Dict[str, Set[str]]]:
"""
Create mapping from bucket/prefix key to endpoint.
Recursively increases depth only for keys that have conflicts.
Strategy:
1. Start at depth 0 (bucket only)
2. For any key with multiple endpoints, increase depth by 1
3. Repeat until no conflicts or max_depth reached
Args:
results: Dictionary of DirectDistributionInformation
max_depth: Maximum prefix depth to try
Returns:
Tuple of (key_to_endpoint, remaining_conflicts)
"""
print(f"Building bucket/prefix mapping with recursive conflict resolution...")
print(f"Maximum depth: {max_depth}\n")
# First, collect all S3 paths and their endpoints
path_endpoints = [] # List of (s3_path, endpoint)
for concept_id, info in results.items():
endpoint = info.get('S3CredentialsAPIEndpoint')
if not endpoint:
continue
s3_paths = info.get('S3BucketAndObjectPrefixNames', [])
for s3_path in s3_paths:
path_endpoints.append((s3_path, endpoint))
print(f"Total S3 paths to process: {len(path_endpoints)}\n")
# Track final mapping and paths that still need processing
final_mapping = {}
paths_to_process = path_endpoints # Start with all paths at depth 0
for depth in range(max_depth + 1):
if not paths_to_process:
break
print(f"Processing depth {depth}...")
# Build mapping at current depth for paths still being processed
key_endpoints = defaultdict(set)
key_original_paths = defaultdict(list) # Track which original paths map to each key
for s3_path, endpoint in paths_to_process:
key = extract_bucket_prefix_key(s3_path, depth)
key_endpoints[key].add(endpoint)
key_original_paths[key].append((s3_path, endpoint))
# Separate unique keys from conflicting keys
paths_still_conflicting = []
resolved_count = 0
conflict_count = 0
for key, endpoints in key_endpoints.items():
if len(endpoints) == 1:
# No conflict at this depth - add to final mapping
final_mapping[key] = next(iter(endpoints))
resolved_count += 1
else:
# Still conflicting - need to go deeper
if depth < max_depth:
# Add these paths back for processing at next depth
paths_still_conflicting.extend(key_original_paths[key])
conflict_count += 1
else:
# Max depth reached, pick first endpoint alphabetically
final_mapping[key] = sorted(endpoints)[0]
resolved_count += 1
conflict_count += 1
print(f" Resolved: {resolved_count} unique keys")
print(f" Conflicts: {conflict_count} keys")
if depth < max_depth and paths_still_conflicting:
print(f" → Moving {len(paths_still_conflicting)} paths to depth {depth + 1}")
paths_to_process = paths_still_conflicting
# Find any remaining conflicts (shouldn't happen unless max_depth reached)
remaining_conflicts = {}
key_endpoints_final = defaultdict(set)
for key, endpoint in final_mapping.items():
key_endpoints_final[key].add(endpoint)
# Re-scan to find actual conflicts in final mapping
# (can happen if max_depth is reached)
for concept_id, info in results.items():
endpoint = info.get('S3CredentialsAPIEndpoint')
if not endpoint:
continue
s3_paths = info.get('S3BucketAndObjectPrefixNames', [])
for s3_path in s3_paths:
# Find which key this path mapped to
for depth in range(max_depth + 1):
key = extract_bucket_prefix_key(s3_path, depth)
if key in final_mapping:
if final_mapping[key] != endpoint:
if key not in remaining_conflicts:
remaining_conflicts[key] = set()
remaining_conflicts[key].add(endpoint)
remaining_conflicts[key].add(final_mapping[key])
break
print(f"\n✓ Final mapping has {len(final_mapping)} unique bucket/prefix keys")
print(f"⚠ Unresolved conflicts: {len(remaining_conflicts)} keys\n")
return final_mapping, remaining_conflicts
def print_conflicts(conflicts: Dict[str, Set[str]]):
"""Print warning about bucket/prefix keys with multiple endpoints."""
if not conflicts:
print("="*80)
print("✓ NO CONFLICTS - All bucket/prefix keys have unique endpoints")
print("="*80)
return
print("="*80)
print("⚠ WARNING: BUCKET/PREFIX KEYS WITH MULTIPLE ENDPOINTS")
print("="*80)
print(f"\nFound {len(conflicts)} key(s) with unresolved conflicts:\n")
print("(These conflicts could not be resolved even at maximum depth)\n")
for key, endpoints in sorted(conflicts.items()):
depth = key.count('/')
print(f"Key: {key} (depth={depth})")
for endpoint in sorted(endpoints):
print(f" - {endpoint}")
print()
def print_summary(mapping: Dict[str, str], conflicts: Dict[str, Set[str]]):
"""Print summary statistics."""
print("="*80)
print("SUMMARY")
print("="*80)
unique_endpoints = len(set(mapping.values()))
print(f"Total unique bucket/prefix keys: {len(mapping)}")
print(f"Unique endpoints: {unique_endpoints}")
print(f"Unresolved conflicts: {len(conflicts)}")
# Show depth distribution
depth_counts = defaultdict(int)
for key in mapping.keys():
depth = key.count('/')
depth_counts[depth] += 1
print(f"\nDepth distribution:")
for depth in sorted(depth_counts.keys()):
print(f" Depth {depth}: {depth_counts[depth]} keys")
# Group by endpoint
endpoint_groups = defaultdict(list)
for key, endpoint in mapping.items():
endpoint_groups[endpoint].append(key)
print(f"\nKeys per endpoint:")
for endpoint, keys in sorted(endpoint_groups.items(), key=lambda x: len(x[1]), reverse=True):
print(f" {endpoint}")
print(f" → {len(keys)} key(s)")
# Show a few examples with their depths
if len(keys) <= 3:
for key in sorted(keys):
depth = key.count('/')
print(f" - {key} (depth={depth})")
else:
for key in sorted(keys)[:3]:
depth = key.count('/')
print(f" - {key} (depth={depth})")
print(f" ... and {len(keys) - 3} more")
async def main():
"""Main execution."""
print("\n" + "="*80)
print("NASA CMR ASYNC S3 BUCKET/PREFIX TO ENDPOINT MAPPER")
print("(Recursive Conflict Resolution)")
print("="*80 + "\n")
# Configuration
MAX_PAGES = 1000 # Adjust this to crawl more/fewer pages
PAGE_SIZE = 100 # Max is 2000, but 100 is more stable
CONCURRENT_REQUESTS = 10 # Number of simultaneous requests
MAX_DEPTH = 5 # Maximum prefix depth to try for conflict resolution
print(f"Configuration:")
print(f" Max depth for conflict resolution: {MAX_DEPTH}")
print(f" Strategy: Start at depth 0, increase depth only for conflicts")
print()
# Step 1: Query CMR asynchronously
results = await query_cmr_async(
max_pages=MAX_PAGES,
page_size=PAGE_SIZE,
concurrent_requests=CONCURRENT_REQUESTS
)
if not results:
print("No results collected. Exiting.")
return
# Step 2: Create bucket/prefix mapping with recursive conflict resolution
mapping, conflicts = create_bucket_key_mapping_recursive(results, max_depth=MAX_DEPTH)
# Step 3: Print conflicts
print_conflicts(conflicts)
# Step 4: Print summary
print()
print_summary(mapping, conflicts)
# Step 5: Save outputs
print(f"\n{'='*80}")
print("SAVING RESULTS")
print("="*80)
with open('bucket_to_endpoint.json', 'w') as f:
json.dump(mapping, f, indent=2, sort_keys=True)
print("✓ Saved bucket_to_endpoint.json")
if conflicts:
conflicts_serializable = {k: list(v) for k, v in conflicts.items()}
with open('bucket_conflicts.json', 'w') as f:
json.dump(conflicts_serializable, f, indent=2, sort_keys=True)
print("✓ Saved bucket_conflicts.json")
with open('cmr_raw_results.json', 'w') as f:
json.dump(results, f, indent=2)
print("✓ Saved cmr_raw_results.json")
print(f"\n{'='*80}")
print("COMPLETE!")
print("="*80 + "\n")
if __name__ == "__main__":
asyncio.run(main())gives: {
"TestBucket": "www.testexample.com",
"asdc-prod-protected": "https://data.asdc.earthdata.nasa.gov/s3credentials",
"asf-cumulus-prod-alos2-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-aria-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-browse": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L2_CSLC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L2_RTC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L4_TROPO-ZENITH_V1": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-product": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1_STATIC": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_RTC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_RTC-S1_STATIC": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L4_TROPO-ZENITH_V1": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-seasat-products": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-grd-7d1b4348": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-ocn-1e29d408": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-raw-98779950": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-slc-7b420b89": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-xml-8cf7476b": "https://sentinel1.asf.alaska.edu/s3credentials",
"csda-cumulus-prod-protected-5047": "https://data.csdap.earthdata.nasa.gov/s3credentials",
"gesdisc-cumulus-prod-protected": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"gesdisc-cumulus-prod-protectedAqua_AIRS_Level2": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"ghrcw-protected": "https://data.ghrc.earthdata.nasa.gov/s3credentials",
"ghrcwuat-protected": "https://data.ghrc.uat.earthdata.nasa.gov/s3credentials",
"lp-prod-protected": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-prod-public": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-protected": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-public": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-sit-protected": "https://data.lpdaac.sit.earthdatacloud.nasa.gov/s3credentials",
"lp-sit-public": "https://data.lpdaac.sit.earthdatacloud.nasa.gov/s3credentials",
"nsidc-cumulus-prod-protected": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"nsidc-cumulus-prod-public": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-prod-public": "https://obdaac-tea.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-sit-public": "https://obdaac-tea.sit.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-uat-public": "https://obdaac-tea.uat.earthdatacloud.nasa.gov/s3credentials",
"ornl-cumulus-prod-protected": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"ornl-cumulus-prod-public": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-docs": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-protected": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-public": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-swot-ops-cumulus-protected": "https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"podaac-swot-ops-cumulus-public": "https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"prod-lads": "https://data.laadsdaac.earthdatacloud.nasa.gov/s3credentials"
}I am inclined to just commit this mapping to the repo and add the script so we could update it quickly if things change? Or is this a really crappy idea? |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
This probably makes the most sense for now.
These both should work identically but It'd be worth prioritizing the |
|
Also, these ones: {
"asf-cumulus-prod-opera-browse/OPERA_L2_CSLC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L2_RTC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L4_TROPO-ZENITH_V1": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1_STATIC": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_RTC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_RTC-S1_STATIC": "https://cumulus.asf.alaska.edu/s3credentials",
}All have a prefix included with the bucket name and could just be: {
"asf-cumulus-prod-opera-browse": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials"
} |
Ah that is super helpful. @jhkennedy will you be at the hack on tuesday by any chance? |
|
Another question for DAAC folks here: Is there any way to add a dummy icechunk store to any of the EDL authenticated buckets? I think that might be the easiest way to test the top level functionality here. I am revising the structure of the code quite a bit at the moment. My plan is to support two main usecases: 1. "Full EDL" case - Icechunk store and any virtual chunks (if present) are within EDL bucketsfrom earthaccess.icechunk import open_icechunk_from_url
url = 's3://some-edl-bucket/pointing/to/ic/store' # how to get this url will be solved by different logic
store = open_icechunk_from_url(url)simple as that, but at this point this is a non-existent use case AFAICT? 2. "Virtual EDL Chunks" Icechunk store is wherever, but all the virtual chunks point to one or more EDL buckets:import icechunk as ic
from earthaccess.icechunk import get_virtual_chunk_credentials
storage = ... # configure your custom icechunk storage
vchunk_credentials = get_virtual_chunk_credentials(storage)
repo = ic.Repository.open(storage=storage, authorize_virtual_chunk_access=vchunk_credentials)
...This is not quite as automatic but will actually help a lot of current use cases I think. It also is quite a LOT shorter than what I have to do here for example. I think this would even enable more 'frankenstein-ish' cases, where an icechunk repo points to some EDL, and some non-EDL buckets (to be tested). |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
@betolink could you assign me to this PR so we can track it easier on the devseed side? |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
1 similar comment
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
@jbusecke, looks like integration tests are failing due to h5netcdf engine not being installed. See https://github.com/nsidc/earthaccess/actions/runs/20077100690/job/57595175133?pr=1135#step:8:768 |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
1 similar comment
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
@jbusecke, integration tests are still failing due to missing dependency: https://github.com/nsidc/earthaccess/actions/runs/20254367444/job/58196492650?pr=1135#step:8:1079 |
|
Ok I think this is getting into shape. I have tested the top level functions locally today (basically all the test in Disclaimer: This all is currently still based on this hardcoded mapping which needs to be refactored after #1154 is fixed. So this should not be merged, but I think the functionality can be reviewed nonetheless I ll post a few questions I had on the code directly after this post. I did run the integration tests locally on the veda hub and got these (seemingly unrelated?) errors. Could somebody advise if I interpreted this correctly and maybe also enable me run the integration tests here? @jhkennedy @betolink @chuckwondo would any of you have a bit of time to take a look at this? Details================================================================================== ERRORS =================================================================================== |
| raise ValueError( | ||
| "A valid Earthdata login instance is required to retrieve credentials for icechunk stores" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adopted this from earthaccess.get_s3_credentials. Please let me know if you think I should change the wording.
| """ | ||
| # get config and extract virtual containers | ||
| config = ic.Repository.fetch_config(storage=storage) | ||
| # TODO: accommodate case without virtual chunk containers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # TODO: accommodate case without virtual chunk containers. |
This currently works but emits a warning. We can discuss these details further.
| # try to build authentication for all virtual chunk containers. If any of the virtual | ||
| # chunk containers is not 'approved' it will raise an error in `_get_credential_endpoint`. | ||
| # We will catch the error here, warn, and only return the authenticated urls. | ||
| # Users will then get an error for the remaining containers and need to add those manually! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # Users will then get an error for the remaining containers and need to add those manually! | |
| # Users will then get a warning for the remaining containers and need to add those manually! |
| If the URL is a non EDL bucket, you have to manually construct credentials (...)" | ||
| ) | ||
|
|
||
| # TODO: Check how easy it is to 'splice' this output with manually created credentials |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will address this in the docs.
| return ic.containers_credentials(credential_mapping) | ||
|
|
||
|
|
||
| # TODO: Review datacube vocab? Do we want to use this? What is a good general term for zarr-ish data? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More general question (tagging in @DeanHenze), what vocabulary should we use here and throughout ea?
This does not influence functionality at all, just a matter of consistency. The current vocab has Granule and dataset?
- virtual-zarr (not all stores have to be virtual so this might not be general enoug?)
- virtual dataset (We could have e.g. datatrees too)
- datacube (my current favorite, but happy to change this here).
Curious to hear what others think.
| . | ||
| """ | ||
| # currently only supports s3 | ||
| # How would this support e.g. http, which other protocols make sense? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would love to discuss this, but I think this is better served in a separate PR?
| ) | ||
|
|
||
| # return readonly store from main | ||
| # TODO: should this be configurable? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we want to give users at least the tag/branch id as a configurable input parameter?
| # TODO: I this the way to do it? | ||
| earthaccess.login() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the actual technical questions remaining: Is this the way to ensure a login within the integration tests? It worked locally (when I set up env variables), but I was not sure if there is another pattern that you use for testing specifically.
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
@betolink can correct me if I'm wrong but is ea already refactored for zarr v3? For the use case where the icechunk store is outside of a DAAC, I'm not sure why this requires alternate code? For kerchunk JSON's I've created and stored a JSON locally and used it to access PO.DAAC data, so is icechunk different or what am I missing?
I need to catch up, vocab within the ea package? Or user-facing vocab. "granule" and "collection" are the two terms to use internally to be consistent with NASA Earthdata terms. Is there a user-facing front end case you're thinking of?
Could I get more context here as well, is this internal ea package vocab we're referring to? |
This is a first start towards building an icechunk opener for earthaccess (see #1132 for context).
This PR depends on #1154
This is still very rough and might change a lotThis implements a very minimalist test and opener function in
earthaccess.icechunk._open_icechunk_from_url. There are a ton of todos and questions, but let me try to point out the most pressing ones.Pull Request (PR) draft checklist - click to expand
contributing documentation
before getting started.
title such as "Add testing details to the contributor section of the README".
Example PRs: #763
example
closes #1. SeeGitHub docs - Linking a pull request to an issue.
CHANGELOG.mdwith details about your change in a section titled## Unreleased. If such a section does not exist, please create one. FollowCommon Changelog for your additions.
Example PRs: #763
README.mdwith details of changes to theearthaccess interface, if any. Consider new environment variables, function names,
decorators, etc.
Click the "Ready for review" button at the bottom of the "Conversation" tab in GitHub
once these requirements are fulfilled. Don't worry if you see any test failures in
GitHub at this point!
Pull Request (PR) merge checklist - click to expand
Please do your best to complete these requirements! If you need help with any of these
requirements, you can ping the
@nsidc/earthaccess-supportteam in a comment and wewill help you out!
Request containing "pre-commit.ci autofix" to automate this.
📚 Documentation preview 📚: https://earthaccess--1135.org.readthedocs.build/en/1135/