import datetime
import json
import logging
from pathlib import Path
import boto3
import geopandas as gpd
import leafmap
import numpy as np
import rasterio as rio
from botocore import UNSIGNED
from botocore.config import Config
from pystac_client import Client
from rasterio.mask import mask
from rasterio.warp import Resampling, reproject
logging.basicConfig(level=logging.ERROR)
imagery_dir = Path(r"E:\tassie-trees-geospatial\imagery")
if imagery_dir is None:
imagery_dir = Path("../data/imagery")
if not imagery_dir.exists():
imagery_dir.mkdir(parents=True, exist_ok=True)
aoi_path = Path("../data/vector/proposed_study_sites.geojson")
aoi_subset = {"column": "region", "value": "Tarkine south"}
start_date = "2025-01-01"
end_date = "2025-03-31"
search_range = 14
start_datetime = datetime.datetime.strptime(start_date, "%Y-%m-%d")
end_datetime = datetime.datetime.strptime(end_date, "%Y-%m-%d")
max_cloud = 5
region_code = None # confine search to this tile
date_ranges = {
"pre": [
(start_datetime - datetime.timedelta(days=search_range)).strftime("%Y-%m-%d"),
(start_datetime + datetime.timedelta(days=search_range)).strftime("%Y-%m-%d"),
],
"post": [
(end_datetime - datetime.timedelta(days=search_range)).strftime("%Y-%m-%d"),
(end_datetime + datetime.timedelta(days=search_range)).strftime("%Y-%m-%d"),
],
}
collections = ["ga_s2am_ard_3", "ga_s2bm_ard_3", "ga_s2cm_ard_3"]
stac_api_url = "https://explorer.dea.ga.gov.au/stac/"
client = Client.open(stac_api_url)aoi = gpd.read_file(aoi_path)
aoi = aoi[aoi[aoi_subset["column"]] == aoi_subset["value"]]
if aoi.empty:
raise ValueError(f"No AOI found for {aoi_subset['column']} = {aoi_subset['value']}")
if aoi.crs != "EPSG:4326":
aoi = aoi.to_crs("EPSG:4326")
aoi_dis = aoi.dissolve()
if aoi_dis.empty:
raise ValueError("AOI geometry is empty after dissolving.")
geom = aoi_dis.geometry.values[0].__geo_interface__final_items = {}
datasets = []
for date_set, range_minmax in date_ranges.items():
# TODO: ensure tiles are the same for pre and post dates
logging.info(
f"Searching for {date_set} date range: {range_minmax[0]} to {range_minmax[1]}"
)
search_offset = 0
item = None
while item is None or item.properties["eo:cloud_cover"] > max_cloud:
if date_set == "pre":
expanded_start = (
datetime.datetime.strptime(range_minmax[0], "%Y-%m-%d")
- datetime.timedelta(days=search_offset)
).strftime("%Y-%m-%d")
expanded_end = range_minmax[1]
elif date_set == "post":
expanded_start = range_minmax[0]
expanded_end = (
datetime.datetime.strptime(range_minmax[1], "%Y-%m-%d")
+ datetime.timedelta(days=search_offset)
).strftime("%Y-%m-%d")
logging.info(
f"Searching with expanded range: {expanded_start} to {expanded_end}"
)
search = client.search(
collections=collections,
datetime=f"{expanded_start}/{expanded_end}",
intersects=geom,
max_items=100,
)
items = list(search.get_items())
if not items:
search_offset += 7
if search_offset > 365:
raise ValueError(
f"No items found for {date_set} within 1 year. Please check the date range or AOI."
)
continue
item = min(items, key=lambda x: x.properties["eo:cloud_cover"])
if item.properties["eo:cloud_cover"] > max_cloud:
search_offset += 7
if search_offset > 365:
raise ValueError(
f"No items with cloud cover <= {max_cloud}% found for {date_set} within 1 year. Please check the date range or AOI."
)
item = None
else:
logging.info(
f"Found item for {date_set} with {item.properties['eo:cloud_cover']}% cloud cover"
)
gdf = gpd.GeoDataFrame.from_features([item.to_dict() for item in items])
if gdf.empty:
raise ValueError("GeoDataFrame is empty after converting search results.")
final_items[date_set] = item
s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
for asset, properties in item.assets.items():
if asset.startswith("nbart_") or asset in [
"oa_s2cloudless_mask",
"oa_s2cloudless_prob",
]:
bucket = properties.href.split("/")[2]
key = "/".join(properties.href.split("/")[3:])
asset_source_basename = key.split("/")[-1]
local_path = imagery_dir / item.properties["title"] / asset_source_basename
if local_path.exists():
logging.info(f"File {local_path} already exists, skipping download.")
else:
logging.info(f"Downloading {asset} from {bucket}/{key} to {local_path}")
local_path.parent.mkdir(parents=True, exist_ok=True)
s3.download_file(bucket, key, str(local_path))
datasets.append(
{
"asset": asset,
"filepath": Path(local_path).resolve(),
"date_group": date_set,
}
)
else:
logging.info(
f"Skipping asset {asset} as it is not a NBAR or cloudless asset."
)
item_footprint_filename = (
imagery_dir / item.properties["title"] / f"{item.properties['title']}.geojson"
)
gdf[gdf["title"] == item.properties["title"]].to_file(
item_footprint_filename, driver="GeoJSON"
)
asset_metadata_filename = (
imagery_dir
/ item.properties["title"]
/ f"{item.properties['title']}_assets.json"
)
assets_dict = {key: asset.to_dict() for key, asset in item.assets.items()}
with open(asset_metadata_filename, "w") as f:
json.dump(assets_dict, f, indent=2)
def calculate_nbr(datasets, date_group):
group_datasets = [d for d in datasets if d["date_group"] == date_group]
nir_dataset = next(d for d in group_datasets if d["asset"] == "nbart_nir_2")
swir_dataset = next(d for d in group_datasets if d["asset"] == "nbart_swir_2")
with rio.open(nir_dataset["filepath"]) as nir_src:
nir_data = nir_src.read(1).astype(np.float32)
profile = nir_src.profile
with rio.open(swir_dataset["filepath"]) as swir_src:
swir_data = swir_src.read(1).astype(np.float32)
# TODO: handle division by zero by filling with tiny non-zero value epsilon = 1e-10
denominator = nir_data + swir_data
nbr = np.where(denominator != 0, (nir_data - swir_data) / denominator, np.nan)
profile.update(dtype=rio.float32, nodata=np.nan)
output_path = imagery_dir / f"nbr_{date_group}.tif"
with rio.open(output_path, "w", **profile) as dst:
dst.write(nbr, 1)
logging.info(f"NBR for {date_group} saved to {output_path}")
return output_path, nbr
nbr_pre_path, nbr_pre = calculate_nbr(datasets, "pre")
nbr_post_path, nbr_post = calculate_nbr(datasets, "post")dnbr = nbr_pre - nbr_post
dnbr_output_path = imagery_dir / "dnbr.tif"
with rio.open(nbr_pre_path) as src:
profile = src.profile
profile.update(dtype=rio.float32, nodata=np.nan)
with rio.open(dnbr_output_path, "w", **profile) as dst:
dst.write(dnbr, 1)dnbr_classes = [
{
"class": 1,
"label": "Enhanced regrowth",
"min": -0.500,
"max": -0.251,
},
{
"class": 2,
"label": "Regrowth",
"min": -0.250,
"max": -0.101,
},
{"class": 3, "label": "Unburned", "min": -0.100, "max": 0.099},
{"class": 4, "label": "Low burn", "min": 0.100, "max": 0.269},
{"class": 5, "label": "Moderate-low burn", "min": 0.270, "max": 0.439},
{"class": 6, "label": "Moderate-high burn", "min": 0.440, "max": 0.659},
{"class": 7, "label": "High burn", "min": 0.660, "max": 1.300},
]
dnbr_classified = np.zeros_like(dnbr, dtype=np.uint8)
for cls in dnbr_classes:
cls_mask = (dnbr >= cls["min"]) & (dnbr <= cls["max"])
dnbr_classified[cls_mask] = cls["class"]
dnbr_classified_path = imagery_dir / "dnbr_classified.tif"
profile.update(dtype=rio.uint8, nodata=0)
with rio.open(dnbr_classified_path, "w", **profile) as dst:
dst.write(dnbr_classified, 1)
logging.info(f"Classified dNBR saved to {dnbr_classified_path}")def create_false_color_composite(datasets, date_group):
group_datasets = [d for d in datasets if d["date_group"] == date_group]
green_dataset = next(d for d in group_datasets if d["asset"] == "nbart_green")
swir2_dataset = next(d for d in group_datasets if d["asset"] == "nbart_swir_2")
nir2_dataset = next(d for d in group_datasets if d["asset"] == "nbart_nir_2")
with rio.open(green_dataset["filepath"]) as green_src:
green_data = green_src.read(1)
profile = green_src.profile.copy()
with rio.open(swir2_dataset["filepath"]) as swir2_src:
swir2_data = np.empty_like(green_data, dtype=np.int16)
reproject(
source=swir2_src.read(1),
destination=swir2_data,
src_transform=swir2_src.transform,
src_crs=swir2_src.crs,
dst_transform=profile["transform"],
dst_crs=profile["crs"],
resampling=Resampling.bilinear,
)
with rio.open(nir2_dataset["filepath"]) as nir2_src:
nir2_data = np.empty_like(green_data, dtype=np.int16)
reproject(
source=nir2_src.read(1),
destination=nir2_data,
src_transform=nir2_src.transform,
src_crs=nir2_src.crs,
dst_transform=profile["transform"],
dst_crs=profile["crs"],
resampling=Resampling.bilinear,
)
def normalize_to_byte(data):
p2, p98 = np.percentile(data[data > 0], [2, 98])
stretched = np.clip((data - p2) / (p98 - p2) * 255, 0, 255)
return stretched.astype(np.uint8)
swir2_norm = normalize_to_byte(swir2_data)
nir2_norm = normalize_to_byte(nir2_data)
green_norm = normalize_to_byte(green_data)
composite_data = np.stack([swir2_norm, nir2_norm, green_norm], axis=0)
profile.update(
count=3,
dtype=rio.uint8,
nodata=0,
compress="jpeg",
photometric="ycbcr",
interleave="pixel",
tiled=True,
blockxsize=512,
blockysize=512,
)
band_descriptions = [
"SWIR2",
"NIR2",
"Green",
]
output_path = imagery_dir / f"false_color_{date_group}.tif"
with rio.open(output_path, "w", **profile) as dst:
dst.write(composite_data)
for i, desc in enumerate(band_descriptions, 1):
dst.set_band_description(i, desc)
logging.info(f"False color composite for {date_group} saved to {output_path}")
return output_path
false_color_pre = create_false_color_composite(datasets, "pre")
false_color_post = create_false_color_composite(datasets, "post")m = leafmap.Map()
m.split_map(
left_layer=str(false_color_pre),
right_layer=str(false_color_post),
left_label="pre-fire (2024-11-12)",
right_label="Post-fire (2025-03-27)",
)
aoi_style = {
"stroke": True,
"color": "red",
"weight": 2,
"opactity": 1,
"fill": True,
"fillColor": "#ffffff",
"fillOpacity": 0.0,
"clickable": True,
}
m.add_gdf(
aoi[aoi["name"].isin(["Harman River", "Stanley River", "Wilson River"])],
layer_name="Study site (proposed)",
style=aoi_style,
hover_style={"weight": aoi_style["weight"] + 1, "fillOpacity": 0.1},
)
m.zoom_to_gdf(aoi)
mLoading...