Rockfall monitoring¶
Notebook contributors: Ronald Tabernig, William Albert, Hannah Weiser, July 2025
This notebook demonstrates the preparation of a rockfall monitoring example, including a change detection workflow using py4dgeo. The dataset contains 6 LiDAR scans of a rockfall area in Trier, Germany captured hourly in the night from 25 August 2024 to 26 August 2024.
Related publications:
- Czerwonka-Schröder, D., Schulte, F., Albert, W., Hosseini, K., Tabernig, R., Yang, Y., Höfle, B., Holst, C. & Zimmermann, K. (2025): AImon5.0: Echtzeitüberwachung gravitativer Massenbewegungen – Eine Fallstudie am Trierer Augenscheiner. 23. Internationale Geodätische Woche, 23, 1-13. https://doi.org/10.11588/heidok.00036222
- Tabernig, R., Albert, W., Weiser, H., Fritzmann, P., Anders, K., Rutzinger, M., & Höfle, B. (2025). Temporal aggregation of point clouds improves permanent laser scanning of landslides in forested areas. Science of Remote Sensing, 12, 100254. https://doi.org/10.1016/j.srs.2025.100254
Imports¶
import py4dgeo
import laspy
import numpy as np
import os
import sys
import json
sys.path.insert(0, "../src")
from fourdgeo import projection
from fourdgeo import utilities
from fourdgeo import change
# File download and handling
from pathlib import Path
import pooch
from tqdm import tqdm
# Image handling
from PIL import Image, ImageDraw, ImageFont, ImageSequence
from IPython.display import HTML
Get the data¶
# Handle file download/reading here
data_url = "https://zenodo.org/api/records/16153369/files/rockfall_trier.zip/content"
data_hash = "77b343183c86cbc3f72e0edbe362cc3219d41e00fcf4389ab650a09a13b1b1ec"
file_name = "rockfall_trier.zip"
data_folder = "data/rockfall_trier"
if not Path(data_folder).exists():
fnames = pooch.retrieve(url=data_url,
known_hash=data_hash,
path="./",
fname=file_name,
processor=pooch.Unzip(extract_dir=data_folder),
progressbar=True)
os.remove(file_name)
Change detection¶
Here, we perform change analysis between consecutive pairs of epochs, i.e., between the first and the second, the second and the third, and so on. We compute point cloud distances using the M3C2 algorithm (Lague et al. 2013) as implemented in py4dgeo, mask only significant changes, cluster the masked points using DBSCAN and finally extract change objects, which are defined by a polygon outline, attributes such as the mean M3C2 magnitude and the size, as well as a timestamp.
m3c2_settings = {"cyl_radius":1,
"normal_radii":[1.0,],
"max_distance": 10.0,
"registration_error":0.025
}
# DBScan parameters
dbscan_eps = 1
min_cluster_size = 100
observations = {"observations": []}
# Gather & sort only the .laz files
laz_paths = list(Path(data_folder).glob("*.laz"))
laz_paths = sorted(laz_paths)
# Walk through each consecutive pair
for prev_path, curr_path in zip(laz_paths, laz_paths[1:]):
prev_fname = os.path.basename(prev_path)
curr_fname = os.path.basename(curr_path)
startDateTime = utilities.iso_timestamp(prev_fname) + "Z"
endDateTime = utilities.iso_timestamp(curr_fname) + "Z"
# Load point clouds
epoch_0 = py4dgeo.read_from_las(prev_path.resolve()) # make absolute, so py4dgeo does not download and check if the file is in py4dgeo-test-data
epoch_1 = py4dgeo.read_from_las(curr_path.resolve())
# Compute M3C2
m3c2 = py4dgeo.M3C2(
epochs=(epoch_0, epoch_1),
corepoints=epoch_0.cloud,
cyl_radius=m3c2_settings["cyl_radius"],
normal_radii=m3c2_settings["normal_radii"],
max_distance=m3c2_settings["max_distance"],
registration_error=m3c2_settings["registration_error"],
)
distances, uncertainties = m3c2.run()
# Mask & stack only significant changes
mask = np.abs(distances) >= uncertainties["lodetection"]
if not mask.any():
print(f"No significant changes between {prev_fname} → {curr_fname}")
continue
significant_pts = epoch_0.cloud[mask]
significant_d = distances[mask]
changes = np.column_stack((significant_pts, significant_d))
# Cluster & extract geoObjects
labeled = change.cluster_m3c2_changes(changes, dbscan_eps, min_cluster_size)
geoObjects = change.extract_geoObjects_from_clusters(labeled, endDateTime, prev_fname, curr_fname)
observations["observations"].append({
"backgroundImageData": {},
"startDateTime": startDateTime,
"endDateTime": endDateTime,
"geoObjects": geoObjects,
})
Let's have a look at what a change geoObject
contains:
observations["observations"][2]["geoObjects"][0]
{'id': '096c5b5106264abb891be775a61dd656', 'type': 'unknown', 'dateTime': '2024-08-26T01:00:06Z', 'geometry': {'type': 'Polygon', 'coordinates': [[-258.019, 158.27100000000002, 5.363], [-258.015, 158.19400000000002, 5.831], [-258.005, 158.187, 5.751], [-258.005, 158.18800000000002, 5.672], [-258.397, 158.428, 5.36], [-258.177, 158.293, 5.04], [-258.129, 158.151, 5.9510000000000005], [-258.677, 158.487, 5.244], [-258.25100000000003, 158.225, 5.156], [-258.298, 158.255, 5.0], [-258.615, 158.356, 6.099], [-258.32, 158.17600000000002, 6.009], [-259.543, 158.731, 5.434], [-259.471, 158.687, 5.272], [-259.38, 158.52700000000002, 6.253], [-259.473, 158.584, 6.098], [-259.046, 158.323, 4.811], [-258.974, 158.214, 4.847], [-260.13100000000003, 158.64000000000001, 5.308], [-259.866, 158.478, 5.15], [-260.162, 158.534, 6.9750000000000005], [-260.031, 158.454, 6.892], [-259.777, 158.3, 5.12], [-260.225, 158.491, 7.009], [-260.595, 158.431, 6.273], [-260.55400000000003, 158.406, 6.111], [-260.559, 158.409, 6.027], [-260.64300000000003, 158.46, 5.547], [-261.731, 158.92000000000002, 6.343], [-261.337, 158.681, 5.849], [-261.803, 158.873, 6.354], [-261.598, 158.673, 7.025], [-261.765, 158.775, 6.625], [-261.791, 158.793, 6.465], [-261.66700000000003, 158.607, 6.676]]}, 'customAttributes': {'X_centroid': np.float64(-259.8313391304348), 'Y_centroid': np.float64(158.48924782608697), 'Z_centroid': np.float64(5.991530434782609), 'm3c2_magnitude_abs_average_per_cluster': np.float64(1.0733661294671384), 'volume': 1.2427361073333316, 'surface_area': 11.436782819482161, 'surface_to_volume_ratio': 9.20290538916042, 'cluster_size_points': 230}}
configuration = {
"project_setting": {
"project_name": "Rockfall_monitoring",
"output_folder": "./out/rockfall_monitoring",
"temporal_format": "%y%m%d_%H%M%S",
"silent_mode": True,
"include_timestamp": False
},
"pc_projection": {
"pc_path": "",
"make_range_image": True,
"make_color_image": False,
"top_view": False,
"save_rot_pc": False,
"resolution_cm": 12.5,
"camera_position": [
0.0,
0.0,
0.0
],
"rgb_light_intensity": 100,
"range_light_intensity": 10,
"epsg": None
}
}
Generate the background images¶
We now generate the background images. For this, we are using classes and functions from the fourdgeo
library. The class PCloudProjection
directly takes our configuration file as input and writes them into our specified output_folder
.
images = []
list_background_projections = []
pcs = sorted(laz_paths)
for enum, pc in enumerate(pcs):
lf = laspy.read(pc)
configuration['pc_projection']['pc_path'] = pc
project_name = configuration['project_setting']['project_name']
output_folder = configuration['project_setting']['output_folder']
background_projection = projection.PCloudProjection(
configuration = configuration,
project_name = project_name,
projected_image_folder = output_folder,
)
# First projection
if enum == 0:
(
ref_h_fov, ref_v_fov, ref_anchor_point_xyz,
ref_h_img_res, ref_v_img_res
) = background_projection.project_pc(buffer_m = 0.5)
# Next projections using reference data
else:
background_projection.project_pc(
ref_theta=ref_h_fov[0],
ref_phi=ref_v_fov[0],
ref_anchor_point_xyz=None,
ref_h_fov=ref_h_fov,
ref_v_fov=ref_v_fov,
ref_h_img_res=ref_h_img_res,
ref_v_img_res=ref_v_img_res
)
bg_img = background_projection.bg_image_filename[0]
if bg_img[0] == ".":
bg_img = bg_img[2:]
outfile = bg_img.split('.')[0] + f"_{enum}_{enum+1}." + bg_img.split('.')[1]
try:
os.rename(bg_img, outfile)
except FileExistsError:
os.remove(outfile)
os.rename(bg_img, outfile)
images.append(outfile)
background_projection.bg_image_filename[0] = outfile
list_background_projections.append(background_projection)
Convert .tif Images to .png Images¶
Because the dashboard (currently) does not support TIF files, we need to convert the generated background images to the PNG format.
png_images = []
for background_projection in list_background_projections:
image_path = background_projection.bg_image_filename[0]
filename = str.split(image_path, ".")[0]
try:
with Image.open(image_path) as im:
for i, page in enumerate(ImageSequence.Iterator(im)):
out_path = filename + ".png"
if not os.path.isfile(out_path):
try:
page.save(out_path)
except:
print(out_path)
png_images.append(out_path)
except:
print(filename)
Project the change events onto the image background¶
Here, we also project the change events onto the background image using the ProjectChange
class. The observation
GeoJSON files are written to the output_folder
.
project_name = configuration["project_setting"]["project_name"]
list_observation_projection = []
for epoch_id, observation in enumerate(observations['observations']):
background_projection = list_background_projections[epoch_id]
observation_projection = projection.ProjectChange(observation=observation,
project_name=f"{project_name}_{epoch_id}_{epoch_id+1}",
projected_image_path=background_projection.bg_image_filename[0],
projected_events_folder=output_folder,
epsg=None)
if observation["geoObjects"] is not None:
observation_projection.project_change()
list_observation_projection.append(observation_projection)
Display the rockfall event in the study site, as soon as detected¶
We generate a GIF of the time series and display the rockfall event using the polygon outlines as soon as it is detected.
frames = []
gif_path = "../docs/img/rockfall_projections_plus_observations.gif"
font = ImageFont.load_default(size = 30)
for enum, img in enumerate(images[1:]):
frm = Image.open(img).convert("RGB")
draw = ImageDraw.Draw(frm)
draw.text((500, 36), f"Epoch: {enum}", fill=(255, 255, 255), font=font)
observation_projection = list_observation_projection[enum]
# Load geojson
if observation_projection.observation["geoObjects"] is not None:
with open(observation_projection.geojson_name, 'r') as f:
geojson_data = json.load(f)
for feature in geojson_data["features"]:
coords = np.array(feature["geometry"]['coordinates'][0])
coords[:,1] *= -1
coords = coords.reshape(len(coords)*2)
draw.polygon(list(coords), outline='yellow', width=4)
frames.append(frm)
frames[0].save(
gif_path,
save_all=True,
append_images=frames[1:],
duration=1000,
loop=0
)

Extract the final JSON¶
In order to include this data into the dashboard, we now need to convert the created geojson data to the dashboard data model. For this, we iterate through all geojson files and create their observation objects and the aggregate them into one final data model object and store it in the output_folder
. This files can then be loaded with the 4DGeo Dashboard.
Note: The path to the background image files has to be a working url. In this example, we use the temporary hosted localhost, explained in this section. Later this can and should be replaced by an actual server containing the files.
aggregated_data = utilities.DataModel([])
for (i, observation_projection) in enumerate(list_observation_projection):
if observation_projection is None:
continue
elif observation_projection.observation["geoObjects"] is None:
img_size = Image.open(png_images[i + 1]).convert("RGB").size
aggregated_data.observations.append(utilities.Observation(
startDateTime=observation_projection.observation["startDateTime"],
endDateTime=observation_projection.observation["endDateTime"],
geoObjects=[],
backgroundImageData=utilities.ImageData(
url=str("http://localhost:8001/" + png_images[i + 1]).replace("\\", "/"),
width=img_size[0],
height=img_size[1]
)
))
continue
with open(observation_projection.geojson_name, 'r') as f:
geojson_data = json.load(f)
img_size = Image.open(png_images[i + 1]).convert("RGB").size
geometry = geojson_data.get("features")[0].get("geometry")
coords = geometry.get("coordinates")
new_observations = utilities.convert_geojson_to_datamodel(
geojson=geojson_data,
bg_img=str("http://localhost:8001/" + png_images[i + 1]).replace("\\", "/"),
width=img_size[0],
height=img_size[1]
)
aggregated_data.observations.extend(new_observations.observations)
with open(f"{output_folder}/final_data_model.json", "w") as f:
f.write(aggregated_data.toJSON())
Visualise the data in the dashboard¶
To see our results in the actual dashboard, we need to host the created json file to make it accessbile. As a quick and easy solution, we can use the http.server python library. The python script server-host.py
hosts all the files in this directory. So in order to setup this local hosting, we need navigate to the 4DGeo/docs
folder to execute the following command in a commandline:
python server_host.py
Lastly, inside of the dashboard, set the data source to http://localhost:8001/out/rockfall_monitoring/final_data_model.json
.