Python project

Particle Filter

Magnetic-field source localization using a particle filter, simulated sensor data, and a Biot-Savart measurement model.

Project type

Python simulation

Core tools

NumPy, Matplotlib

Live action

Lightweight run demo

Saved media

3 animation artifacts

Magnetic Field Source Localization with a Particle Filter

A particle filter that estimates 3D position and direction of a current carrying wire from three axis magnetic field measurements, using Biot-Savart law as the measurement model. Developed alongside a capstone project on remote underwater cable and pipe location.

Approach

The state of the wire is represented using its 3D position and 3D direction vector. Each particle proposes a hypothesis for the true location and direction of the cable. For every iteration the system takes in a magnetic field measurement and weights each particle based on how similar the estimated magnetic field reading is compared to the true reading. After weights are assigned, a new set particles are drawn from the probability distribution of weights making higher weighted particles more likely to be redrawn multiple times. Finally, propagation noise is added to prevent population collapse and weights are all reset to be uniform.

Because a line in 3D is the same no matter what point is chosen as the reference, particle positions are canonicalized onto the plane perpendicular to the particles direction vector at every step. This removes a redundant degree of freedom that would otherwise make the likelihood ill defined.

Results (static two sensor)

The current implementation uses two static sensors separated by a fixed baseline. Across 100 Monte Carlo trials with random wires:

  • Median position error: 0.23 units
  • Median direction error: 0.9 degrees
  • 72% of trials converged within 1 unit and 5 degrees
  • 43% of trials converged within 0.1 units and 1 degree

The remaining trials show a multimodal failure mode under symmetric sensor geometries, where two collinear sensors do not fully constrain the wire. Adding a third non collinear sensor would address this issue.

Particle Filter Animation Static Two Sensor

Development History

The project originally targeted a single moving sensor that would sweep across space and collect a sequence of measurements along its path. This approach naturally breaks ambiguities with the two sensor static localization since each movement constrains the wire from a different viewpoint. Two sweep geometries were prototyped:

Particle filter using two static sensors. Individual particles are not rendered due to animation file size; the blue arrow shows the mean estimated wire location.

Particle Filter Animation Linear Sweep

Particle filter with linear sweep in three dimensions.

Particle Filter Animation Arc Sweep

Particle filter with semicircle arc sweep in three dimensions.

When the sensor movement data proved unavailable from the capstone hardware, the formulation was reworked to use static sensors with a known baseline. This trades constraints of a sweep trajectory for a simpler setup that does not need to account for position telemetry at the cost of geometric failure mode as mentioned above.

Robustness to Sensor Position Drift

The sensor telemetry was originally going to be tracked using a 6 axis imu so small errors in positioning would compound over time. To verify the system would still function given this possibility of noise in the position data, drift was accounted for in the sweep simulations.

image
image

Running

python3 main.py

Requires numpy and matplotlib

Project Directory

Browse the repo files

main.py

PY

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation
import socket

GAIN = 1000.0

def canonicalize_particles(particles):
    d = particles[:, 3:6]
    p = particles[:, 0:3]
    particles[:, 0:3] = p - np.sum(p * d, axis=1, keepdims=True) * d
    return particles

###############################################################################
# Initialize particles with random positions and directions
# Each particle is represented as [x, y, z, dx, dy, dz]
# x, y, z are the coordinates of the particle
# dx, dy, dz are the components of the direction vector (normalized)
##############################################################################
def initialize_particles(N):
    # Create N particles with random coordinates and directions
    x = np.random.uniform(-10, 10, N)
    y = np.random.uniform(-10, 10, N)
    z = np.random.uniform(-10, 10, N)

    # Random directions on the unit sphere
    dirs = np.random.uniform(-1, 1, (N, 3))
    dirs /= np.linalg.norm(dirs, axis=1, keepdims=True)

    # Return particles as an Nx6 array: [x, y, z, dx, dy, dz]
    particles = np.column_stack((x, y, z, dirs))
    return canonicalize_particles(particles)

################################################################################
# Visualize particles in 3D space, showing their positions and directions
# Optionally, also plot the true wire position and direction if provided
################################################################################
def plot_particles(particles, true_p=None, true_d=None, sensor_pos1=None, sensor_pos2=None):
    # Extract positions from particles
    x = particles[:, 0]
    y = particles[:, 1] 
    z = particles[:, 2]

    # Extract directions from particles
    dx = particles[:, 3]
    dy = particles[:, 4]
    dz = particles[:, 5]

    # Create a new 3D plot
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')

    # Mark the sensor position at the origin
    ax.scatter(sensor_pos1[0], sensor_pos1[1], sensor_pos1[2], s=120, color='black', marker='x')
    ax.scatter(sensor_pos2[0], sensor_pos2[1], sensor_pos2[2], s=120, color='black', marker='x')

    # Plot particles as points and their directions as arrows
    # Adjust scale and alpha for better visibility
    ax.scatter(x, y, z, s=5, alpha=0.5)
    scale = 5
    ax.quiver(x, y, z, dx*scale, dy*scale, dz*scale, length=0.2, normalize=False, alpha=0.3)

    # Plot the true wire if provided
    if true_p is not None and true_d is not None:

        # Convert to numpy arrays and normalize direction
        true_p = np.array(true_p)
        true_d = np.array(true_d)
        true_d = true_d / np.linalg.norm(true_d)

        # plot a long line for the true wire
        t = np.linspace(-10, 10, 100)
        line = true_p[None, :] + t[:, None] * true_d[None, :]
        ax.plot(line[:, 0], line[:, 1], line[:, 2],
                color='red', linewidth=3, label='True wire')

        # also mark the closest point to origin
        ax.scatter(true_p[0], true_p[1], true_p[2],
                   color='red', s=50)
        
    # Plot the mean position of the particles as a blue arrow from the origin
    mean_pos = np.mean(particles[:, 0:3], axis=0)
    direction = mean_pos - sensor_pos1

    ax.quiver(sensor_pos1[0], sensor_pos1[1], sensor_pos1[2],
            direction[0], direction[1], direction[2],
            color='blue', linewidth=2, label='Mean position')
    
    ax.plot(
        sensor_pos1[0],
        sensor_pos1[1],
        sensor_pos1[2],
        color='red',
        linewidth=2,
        label='Sensor Path'
)

    # Set equal aspect ratio for all axes
    ax.set_box_aspect([1,1,1])

    # Set labels for axes
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')

    # Add legend if true wire is plotted
    if true_p is not None:
        ax.legend()

    # Show plot
    plt.show()

#################################################################################
# Compute the predicted magnetic field vector for every particle at one sensor
#################################################################################
def predict_field(particles, sensor_pos, current):
    d = particles[:, 3:6]
    p = particles[:, 0:3]

    # Distance from each particle to the sensor
    r = sensor_pos - p

    # Project r onto the direction d to find the perpendicular component
    dot_r = np.sum(r * d, axis=1, keepdims=True)
    r_perp = r - dot_r * d
    distance = np.linalg.norm(r_perp, axis=1) + 1e-12

    return (current * np.cross(d, r_perp) / (distance[:, None]**2 + 1e-12)) * GAIN

#################################################################################
# Update particles using the fixed two-sensor measurement snapshot
#################################################################################
def update_particles(particles, V_meas1, V_meas2, sensor_pos1, sensor_pos2, current, sigma, position_noise, direction_noise):

    # Number of particles
    N = particles.shape[0]

    B1_pred = predict_field(particles, sensor_pos1, current)
    B2_pred = predict_field(particles, sensor_pos2, current)

    errors1 = np.sum((B1_pred - V_meas1)**2, axis=1)
    errors2 = np.sum((B2_pred - V_meas2)**2, axis=1)
    total_error = errors1 + errors2
    mean_error = np.mean(total_error)
    best_idx = np.argmin(total_error)
    best_particle = particles[best_idx].copy()
    best_error = total_error[best_idx]

    # Recompute weights from scratch each round so the same static measurement
    # snapshot is used for refinement, not compounded as new evidence.
    log_weights = -total_error / (2 * sigma**2)
    log_weights -= np.max(log_weights)
    weights = np.exp(log_weights)
    weight_sum = np.sum(weights)
    if not np.isfinite(weight_sum) or weight_sum == 0:
        weights = np.ones(N) / N
    else:
        weights /= weight_sum

    # Resample particles based on weights, higher weighted particles are selected more frequently
    idx = np.random.choice(N, N, p=weights)
    particles = particles[idx]

    particles[:, 0:3] += np.random.normal(0, position_noise, particles[:, 0:3].shape)
    particles[:, 3:6] += np.random.normal(0, direction_noise, particles[:, 3:6].shape)
    particles[:, 3:6] /= np.linalg.norm(particles[:, 3:6], axis=1, keepdims=True)
    particles = canonicalize_particles(particles)

    return particles, mean_error, best_error, best_particle

#################################################################################
# Generates a fake measurement of the magnetic field vector
#################################################################################
def generate_fake_measurement(sensor_pos, p_true, d_true, current):

    r = sensor_pos - p_true
    dot_r = np.dot(r, d_true)
    r_perp = r - dot_r * d_true

    B = current * np.cross(d_true, r_perp) / (np.linalg.norm(r_perp)**2 + 1e-12)
    return B * GAIN, r_perp


def main():

    MONTE_CARLO = False
    NUM_TRIALS = 100 if MONTE_CARLO else 1

    pos_error_list = []
    dir_error_list = []

    for k in range(NUM_TRIALS):

        # Number of particles
        N = 10000
        current = 2
        rounds = 100

        sensor_pos1 = np.array([0, 0, 0])
        sensor_pos2 = np.array([2, 0, 0])

        # Store the history of static refinement for plotting convergence
        errors_history = []
        best_error_history = []
        sigma_history = []
        particles_history = []

        # Randomly generate a wire position and direction
        p_true = np.random.uniform(-10, 10, 3)
        d_true = np.random.uniform(-1, 1, 3)
        d_true /= np.linalg.norm(d_true)
        p_true = p_true - np.dot(p_true, d_true) * d_true

        V_meas1, _ = generate_fake_measurement(sensor_pos1, p_true, d_true, current)
        V_meas2, _ = generate_fake_measurement(sensor_pos2, p_true, d_true, current)

        # Initialize particles once, then repeatedly refine them using the same
        # fixed two-sensor snapshot.
        particles = initialize_particles(N)
        particles_history.append(particles.copy())

        #plot_particles(particles, p_true, d_true, sensor_pos1, sensor_pos2)

        for i in range(rounds):
            alpha = i / max(rounds - 1, 1)
            sigma = (0.2 - 0.18 * alpha) * GAIN
            position_noise = 0.4 - 0.35 * alpha
            direction_noise = 0.05 - 0.04 * alpha

            particles, mean_error, best_error, _ = update_particles(
                particles,
                V_meas1,
                V_meas2,
                sensor_pos1,
                sensor_pos2,
                current=current,
                sigma=sigma,
                position_noise=position_noise,
                direction_noise=direction_noise,
            )

            errors_history.append(mean_error)
            best_error_history.append(best_error)
            sigma_history.append(sigma)
            particles_history.append(particles.copy())

        mean_pos = np.mean(particles[:, 0:3], axis=0)
        mean_dir = np.mean(particles[:, 3:6], axis=0)
        mean_dir /= np.linalg.norm(mean_dir)

        pos_error_list.append(np.linalg.norm(mean_pos - p_true))
        dir_error_deg = np.degrees(np.arccos(np.clip(abs(np.dot(mean_dir, d_true)), 0, 1)))
        dir_error_list.append(dir_error_deg)

    pos = np.array(pos_error_list)
    dir_arr = np.array(dir_error_list)

    print(f"Position error — median: {np.median(pos):.4f}, mean: {np.mean(pos):.4f}")
    print(f"Direction error — median: {np.median(dir_arr):.4f}°, mean: {np.mean(dir_arr):.4f}°")
    print(f"Convergence rate (pos<0.1, dir<1°): {np.mean((pos<0.1) & (dir_arr<1)):.0%}")
    print(f"Convergence rate (pos<1.0, dir<5°): {np.mean((pos<1.0) & (dir_arr<5)):.0%}")

    # Show the distribution
    print("\nPosition error percentiles:")
    for p in [10, 25, 50, 75, 90, 95, 99]:
        print(f"  p{p}: {np.percentile(pos, p):.4f}")

    if not MONTE_CARLO:
        # Final plot of particles after convergence, showing true wire position/direction
        plot_particles(particles, p_true, d_true, sensor_pos1, sensor_pos2)

        plt.figure()
        plt.plot(errors_history, label="Mean Error")
        plt.plot(best_error_history, label="Best Error")
        plt.plot(sigma_history, label="Sigma")
        plt.xlabel("Refinement Round")
        plt.ylabel("Error / Sigma")
        plt.title("Static Two-Sensor Particle Filter Convergence")
        plt.legend()
        plt.grid(True)
        plt.show()

        t = np.linspace(-10, 10, 100)
        line = p_true[None, :] + t[:, None] * d_true[None, :]

        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')

        def update(frame):
            ax.cla()

            particles = particles_history[frame]

            x = particles[:, 0]
            y = particles[:, 1]
            z = particles[:, 2]

            dx = particles[:, 3]
            dy = particles[:, 4]
            dz = particles[:, 5]

            #ax.scatter(x, y, z, s=5, alpha=0.5)
            #ax.quiver(x, y, z, dx, dy, dz, length=0.2, alpha=0.3)

            # sensor
            ax.scatter(*sensor_pos1, color='black', s=50, marker='x')
            ax.scatter(*sensor_pos2, color='black', s=50, marker='x')

            # mean arrow (fixed version)
            mean_pos = np.mean(particles[:, 0:3], axis=0)
            direction = mean_pos - sensor_pos1
            ax.quiver(*sensor_pos1, *direction, color='blue')

            ax.plot(line[:, 0], line[:, 1], line[:, 2], color='red', linewidth=3)
            
            ax.scatter(*p_true, color='red', s=50)

            ax.set_xlim(-10, 10)
            ax.set_ylim(-10, 10)
            ax.set_zlim(-10, 10)
            ax.set_box_aspect([1,1,1])

        ani = FuncAnimation(fig, update, frames=len(particles_history), interval=10)
        ani.save("pf.mp4", fps=10)

        plt.show()

    if MONTE_CARLO:

        print(f"Final position error: {np.mean(pos_error_list):.3f}")
        print(f"Final direction error: {np.mean(dir_error_list):.3f} degrees")

    return 0    

if __name__ == "__main__":
    main()

Live Demo

Run the project output

Runs a lightweight simulation pass and formats the results into readable metrics and media cards.

Higher particle counts and more iterations improve detail but take longer to render.

Use the button above to run demo and load the generated animation plus the plots produced during the run.