#!/usr/bin/py
"""
GravHopper
==========
A package for performing N-body simulations in Python, named in honor of Grace Hopper.
Written by Jeremy Bailin, University of Alabama
First initial last name at ua dot edu
Contains the following classes:
Simulation : Main class for creating and performing an N-body simulation.
IC : Static functions for generating a variety of useful initial conditions for
N-body simulations.
grav : Module containing the functions that calculate the N-body forces.
GravHopperException : Exceptions raised within GravHopper
"""
from . import jbgrav
import numpy as np
from scipy.differentiate import derivative
from scipy.interpolate import interp1d
from scipy import special, integrate
from astropy import units as u, constants as const
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# Check if any of pynbody, galpy, gala, and/or Agama are there. If so, import them.
try:
import pynbody as pyn
from . import unitconverter
USE_PYNBODY = True
except ImportError:
USE_PYNBODY = False
pass
try:
import galpy.util
galpy.util.__config__.set('astropy', 'astropy-units', 'True')
import galpy.potential
USE_GALPY = True
except ImportError:
USE_GALPY = False
pass
try:
import gala.potential
USE_GALA = True
except ImportError:
USE_GALA = False
pass
try:
import agama
USE_AGAMA = True
except ImportError:
USE_AGAMA = False
pass
[docs]
class GravHopperException(Exception):
"""Parent class for all error exceptions."""
pass
[docs]
class UninitializedSimulationException(GravHopperException):
"""Exception for trying to run a simulation without any initial conditions."""
pass
[docs]
class ICException(GravHopperException):
"""Exception for trying to add ICs that don't make sense."""
def __init__(self,msg):
print(msg)
[docs]
class UnknownAlgorithmException(GravHopperException):
"""Exception for using an unknown N-body algorithm name."""
pass
[docs]
class ExternalPackageException(GravHopperException):
"""Exception for trying to call a pynbody/galpy/gala/agama function when not using them."""
def __init__(self,msg):
print(msg)
[docs]
class Simulation(object):
"""Main class for N-body simulation.
Attributes
----------
Np : int
Number of particles
Nsnap : int
Number of snapshots
positions : Quantity array of dimension (Nsnap,Np,3)
numpy array of the positions of each particle at each snapshot
velocities : Quantity array of dimension (Nsnap,Np,3)
numpy array of the velocities of each particle at each snapshot
masses : Quantity array of length Np
numpy array of the masses of each particle
times : Quantity array of length Nsnap
numpy array of the time of each snapshot
lenunit : Unit
Default internal length unit
velunit : Unit
Default internal velocity unit
accelunit : Unit
Default internal acceleration unit
"""
[docs]
def __init__(self, dt=1*u.Myr, eps=100*u.pc, algorithm='tree'):
"""Initializes Simulation object.
Parameters
----------
dt : Quantity
Time step length. Default: 1 Myr
eps : Quantity
Gravitational softening length. Default: 100 pc
algorithm : str ('tree' or 'direct')
Gravitational force calculation algorithm. Default: 'tree'
Example
-------
To create a new simulation with a timestep of 1 year::
sim = Simulation(dt=1.0*u.yr)
"""
self.ICarrays = False
self.Np = 0
self.Nsnap = 0
self.timestep = 0
self.running = False
# positions and velocities have dimension (Nstep, Npart, 3)
self.positions = None
self.velocities = None
# masses are (Npart)
self.masses = None
self.times = None
self.extra_force_functions = []
self.extra_timedependent_force_functions = []
self.extra_velocitydependent_force_functions = []
# Things can come in in various units, but use these internally
self.lenunit = u.kpc
self.velunit = u.km/u.s
self.massunit = u.Msun
self.timeunit = u.Myr
self.accelunit = self.velunit / self.timeunit
# Parameters. For historic reasons, these go in params dict.
# Use the set_ methods which sanity check the inputs
self.params = {}
self.set_dt(dt)
self.set_eps(eps)
self.set_algorithm(algorithm)
# Useful information to have stored while in the middle of generating a movie
# so we don't need to keep figuring out what we're plotting
self._plot_parms = None
[docs]
def set_dt(self, dt):
"""Sets the simulation time step.
Parameters
----------
dt : Quantity with dimensions of time
Time step length
Raises
------
ValueError
If dt does not have dimensions of time.
See also
--------
:meth:`~gravhopper.Simulation.get_dt` : Returns simulation time step.
"""
try:
# Make sure that it has dimensions of time
_ = dt.to(u.Myr)
except u.UnitConversionError:
raise ValueError("dt must have dimensions of time.")
self.params['dt'] = dt
return
[docs]
def get_dt(self):
"""Returns simulation time step.
Returns
-------
dt : Quantity
Time step length
See also
--------
:meth:`~gravhopper.Simulation.set_dt` : Sets simulation time step.
"""
return self.params['dt']
[docs]
def set_eps(self, eps):
"""Sets the simulation gravitational softening length.
Parameters
----------
eps : Quantity with dimensions of length
Gravitational softening length
Raises
------
ValueError
If eps does not have dimensions of length.
See also
--------
:meth:`~gravhopper.Simulation.get_eps` : Returns gravitational softening length.
"""
try:
# Make sure that it has dimensions of length
_ = eps.to(u.kpc)
except u.UnitConversionError:
raise ValueError("eps must have dimensions of length.")
self.params['eps'] = eps
return
[docs]
def get_eps(self):
"""Returns simulation gravitational softening length.
Returns
-------
eps : Quantity
Gravitational softening length
See also
--------
:meth:`~gravhopper.Simulation.set_eps` : Sets gravitational softening length.
"""
return self.params['eps']
[docs]
def set_algorithm(self, algorithm):
"""Sets the simulation gravitational force algorithm.
Parameters
----------
algorithm : str ('tree' or 'direct')
Direct summation or Barnes-Hut tree algorithm
Raises
------
ValueError
If algorithm is not 'tree' or 'direct'.
See also
--------
:meth:`~gravhopper.Simulation.get_algorithm` : Returns current gravitational force algorithm.
"""
if algorithm in ('tree', 'direct'):
self.params['algorithm'] = algorithm
else:
raise ValueError("algorithm must be 'tree' or 'direct'.")
return
[docs]
def get_algorithm(self):
"""Returns current simulation gravitational algorithm.
Returns
-------
algorithm : str
'direct' for direct summation, 'tree' for Barnes-Hut tree
See also
--------
:meth:`~gravhopper.Simulation.set_algorithm` : Sets the gravitational force algorithm.
"""
return self.params['algorithm']
[docs]
def run(self, N=1):
"""Run N timesteps of the simulation. Will either initialize a simulation that has
not yet been run, or continue from the last snapshot if it has.
Parameters
----------
N : int
Number of time steps to perform
"""
# Initialize if first timestep, expand arrays for output if not
if self.running==False:
self.init_run(N)
else:
self.Nsnap += N
# Expand arrays
extra_vec = np.zeros((N, self.Np, 3))
extra_scalar = np.zeros((N, self.Np))
extra_single = np.zeros((N))
self.positions = np.concatenate((self.positions,extra_vec*self.lenunit), axis=0)
self.velocities = np.concatenate((self.velocities,extra_vec*self.velunit), axis=0)
self.times = np.concatenate((self.times,extra_single*self.timeunit), axis=0)
# Perform time steps
for i in range(self.timestep, self.timestep+N):
self.timestep += 1
self.perform_timestep()
self.times[self.timestep] = self.times[self.timestep-1] + self.params['dt']
def init_run(self, Nsnap=None):
"""Initialize an N-body run."""
if self.ICarrays==False:
raise UninitializedSimulationException
self.Nsnap = Nsnap+1
self.Np = len(self.ICarrays['pos'])
# Create the pos, vel, and mass arrays and put ICs in index 0
self.positions = np.zeros((self.Nsnap, self.Np, 3)) * self.lenunit
self.velocities = np.zeros((self.Nsnap, self.Np, 3)) * self.velunit
self.masses = np.zeros((self.Np)) * self.massunit
self.times = np.zeros((self.Nsnap)) * self.timeunit
self.positions[0,:,:] = self.ICarrays['pos']
self.velocities[0,:,:] = self.ICarrays['vel']
self.masses[:] = self.ICarrays['mass']
self.running = True
[docs]
def reset(self):
"""Reset a simulation to the state before init_run() was called. Initial conditions
and external forces are preserved."""
self.running = False
self.positions = None
self.velocities = None
self.masses = None
self.times = None
self.Nsnap = 0
self.timestep = 0
[docs]
def snap(self, step):
"""Return the given snapshot.
Parameters
----------
step : int
Time step to return
Returns
-------
snap : dict
snap['pos'] is an (Np,3) array of positions
snap['vel'] is an (Np,3) array of velocities
snap['mass'] is a length-Np array of masses
"""
return {'pos':self.positions[step, :, :], 'vel':self.velocities[step, :, :], \
'mass':self.masses[:]}
[docs]
def current_snap(self):
"""Return the current snapshot.
Returns
-------
snap : dict
* **snap['pos']** is an (Np,3) array of positions
* **snap['vel']** is an (Np,3) array of velocities
* **snap['mass']** is a length-Np array of masses
"""
return self.snap(self.timestep)
[docs]
def prev_snap(self):
"""Return the snapshot before the current snapshot.
Returns
-------
snap : dict
* **snap['pos']** is an (Np,3) array of positions
* **snap['vel']** is an (Np,3) array of velocities
* **snap['mass']** is a length-Np array of masses
"""
return self.snap(self.timestep-1)
def perform_timestep(self):
"""Advance the N-body simulation by one snapshot using a DKD leapfrog integrator."""
# drift-kick-drift leapfrog:
# half-step drift
self.current_snap()['pos'][:] = self.prev_snap()['pos'] + 0.5 * self.prev_snap()['vel'] * self.params['dt']
# full-step kick
# For time-dependent forces, acceleration is calculated on the half step
kick_time = self.times[self.timestep-1] + 0.5*self.params['dt']
accelerations = self.calculate_acceleration(time=kick_time)
self.current_snap()['vel'][:] = self.prev_snap()['vel'] + accelerations * self.params['dt']
# half-step drift
self.current_snap()['pos'][:] += 0.5 * self.current_snap()['vel'] * self.params['dt']
[docs]
def calculate_acceleration(self, time=None):
"""Calculate acceleration for particle positions at current_snap() due to gravitational N-body force, plus any external
forces that have been added.
Parameters
----------
time : Quantity
Time at which to calculate any time-dependent external forces. Default: None
Returns
-------
acceleration : array
An (Np,3) numpy array of the acceleration vector calculated for each particle
Raises
------
UnknownAlgorithmException
If the algorithm parameter is not 'tree' or 'direct'
"""
# gravity
# If there is only one particle, there is no force, and the C code will give
# a divide by zero... so just force a zero.
if self.Np > 1:
if self.params['algorithm']=='direct':
nbody_gravity = jbgrav.direct_summation(self.current_snap(), self.params['eps'])
elif self.params['algorithm']=='tree':
nbody_gravity = jbgrav.tree_force(self.current_snap(), self.params['eps'])
else:
raise UnknownAlgorithmException()
else:
nbody_gravity = np.zeros((self.Np, 3)) * self.accelunit
# any extra forces that have been added. Note that accelerations are computed
# before the kick, so we need to use the previous snap's velocity in case there
# are velocity-dependent forces
extra_accel = self.calculate_extra_acceleration(self.current_snap()['pos'], nbody_gravity, \
time=time, vel=self.prev_snap()['vel'])
totaccel = nbody_gravity + extra_accel
return totaccel
def calculate_extra_acceleration(self, pos, template_array, time=None, vel=None):
"""Calculates the acceleration just due to external added forces."""
extaccel = np.zeros_like(template_array)
for fn, args in self.extra_force_functions:
extaccel += fn(pos, args)
# and the time-dependent extra forces
for fn, args in self.extra_timedependent_force_functions:
extaccel += fn(pos, time, args)
# and the velocity-dependent extra forces
for fn, args in self.extra_velocitydependent_force_functions:
extaccel += fn(pos, vel, args)
return extaccel
def gala_potential_wrapper(self, galapot, pos, time=None):
"""Wrapper to calculate acceleration from a gala Potential object."""
if time is None:
accel = galapot.acceleration(pos.T).T
else:
accel = galapot.acceleration(pos.T, t=time).T
return accel
def galpy_potential_wrapper(self, galpypot, pos, time=None):
"""Wrapper to calculate acceleration from a galpy potential object."""
# Galpy works exclusively in cylindrical coordinate frame, so we need to
# convert there and back again, a vector's tale.
R, phi, z = galpy.util.coords.rect_to_cyl(pos[:,0], pos[:,1], pos[:,2])
# Some galpy potentials work on arrays of coordinates, but not all. For
# potentials where it doesn't, loop through each position (note: much slower,
# so only do those where I know it doesn't work).
single_potentials = [galpy.potential.RazorThinExponentialDiskPotential]
if any([isinstance(galpypot, s) for s in single_potentials]):
# Do them one by one
if(len(pos.shape)) > 1:
Npos = pos.shape[0]
else:
Npos = 1
accel_R = np.zeros((Npos)) * self.accelunit
accel_phi = np.zeros((Npos)) * self.accelunit
accel_z = np.zeros((Npos)) * self.accelunit
for parti in range(Npos):
if time is None:
accel_R[parti] = galpy.potential.evaluateRforces(galpypot, R[parti], z[parti], phi=phi[parti])
accel_phi[parti] = galpy.potential.evaluatephitorques(galpypot, R[parti], z[parti], phi=phi[parti]) / R[parti]
accel_z[parti] = galpy.potential.evaluatezforces(galpypot, R[parti], z[parti], phi=phi[parti])
else:
accel_R[parti] = galpy.potential.evaluateRforces(galpypot, R[parti], z[parti], phi=phi[parti], t=time)
# See above.
accel_phi[parti] = galpy.potential.evaluatephitorques(galpypot, R[parti], z[parti], phi=phi[parti], t=time) / R[parti]
accel_z[parti] = galpy.potential.evaluatezforces(galpypot, R[parti], z[parti], phi=phi[parti], t=time)
ax, ay, az = galpy.util.coords.cyl_to_rect_vec(accel_R, accel_phi, accel_z, phi=phi)
else:
# Do it vectorized
if time is None:
accel_R = galpy.potential.evaluateRforces(galpypot, R, z, phi=phi)
# phitorques returns dPhi/dphi, which is R times the phi force.
# So we need to divide by R to get a physical force that we can transform
# as a vector.
accel_phi = galpy.potential.evaluatephitorques(galpypot, R, z, phi=phi) / R
accel_z = galpy.potential.evaluatezforces(galpypot, R, z, phi=phi)
ax, ay, az = galpy.util.coords.cyl_to_rect_vec(accel_R, accel_phi, accel_z, phi=phi)
else:
accel_R = galpy.potential.evaluateRforces(galpypot, R, z, phi=phi, t=time)
# See above.
accel_phi = galpy.potential.evaluatephitorques(galpypot, R, z, phi=phi, t=time) / R
accel_z = galpy.potential.evaluatezforces(galpypot, R, z, phi=phi, t=time)
ax, ay, az = galpy.util.coords.cyl_to_rect_vec(accel_R, accel_phi, accel_z, phi=phi)
accel = np.vstack((ax,ay,az)).T
return accel
def galpy_dissipativeforce_wrapper(self, galpypot, pos, vel=None):
"""Wrapper to calculate acceleration from a galpy DissipativeForce object."""
# Galpy works exclusively in cylindrical coordinate frame, so we need to
# convert there and back again, a vector's tale.
R, phi, z = galpy.util.coords.rect_to_cyl(pos[:,0], pos[:,1], pos[:,2])
vR, vphi, vz = galpy.util.coords.rect_to_cyl_vec(vel[:,0], vel[:,1], vel[:,2], pos[:,0], pos[:,1], pos[:,2])
# Some galpy potentials work on arrays of coordinates, but not all. For
# potentials where it doesn't, loop through each position (note: much slower,
# so only do those where I know it doesn't work).
single_potentials = [galpy.potential.ChandrasekharDynamicalFrictionForce]
if any([isinstance(galpypot, s) for s in single_potentials]):
# Do them one by one
if(len(pos.shape)) > 1:
Npos = pos.shape[0]
else:
Npos = 1
accel_R = np.zeros((Npos)) * self.accelunit
accel_phi = np.zeros((Npos)) * self.accelunit
accel_z = np.zeros((Npos)) * self.accelunit
for parti in range(Npos):
veli = [vR[parti], vphi[parti], vz[parti]]
accel_R[parti] = galpy.potential.evaluateRforces(galpypot, R[parti], z[parti], phi=phi[parti], v=veli)
accel_phi[parti] = galpy.potential.evaluatephitorques(galpypot, R[parti], z[parti], phi=phi[parti], v=veli) / R[parti]
accel_z[parti] = galpy.potential.evaluatezforces(galpypot, R[parti], z[parti], phi=phi[parti], v=veli)
ax, ay, az = galpy.util.coords.cyl_to_rect_vec(accel_R, accel_phi, accel_z, phi=phi)
else:
# Do it vectorized
vel_cyl = np.vstack((vR, vphi, vz))
accel_R = galpy.potential.evaluateRforces(galpypot, R, z, phi=phi, v=vel_cyl)
accel_phi = galpy.potential.evaluatephitorques(galpypot, R, z, phi=phi, v=vel_cyl) / R
accel_z = galpy.potential.evaluatezforces(galpypot, R, z, phi=phi, v=vel_cyl)
ax, ay, az = galpy.util.coords.cyl_to_rect_vec(accel_R, accel_phi, accel_z, phi=phi)
accel = np.vstack((ax,ay,az)).T
return accel
def agama_potential_wrapper(self, agamapot, pos, agama_units=None, time=None):
"""Wrapper to calculate acceleration from an Agama Potential object."""
pos_without_units = pos.to(agama_units['lenunit']).value
if time is None:
accel = agamapot.force(pos_without_units)
else:
timeunit = np.sqrt(agama_units['lenunit'] / agama_units['accelunit'])
time_without_units = time.to(timeunit).value
accel = agamapot.force(pos_without_units, t=time_without_units)
return accel*agama_units['accelunit']
[docs]
def add_external_force(self, fn, args=None, agama_units=None):
"""Add an external position-dependent force to the simulation.
Forces can be in the form of:
1. A function that takes two arguments: an (Np,3) array of positions
(must be Quantities) that contains the positions where the accelerations
are to be calculated, and an additional argument that is a dictionary
containing any extra parameters you need.
2. A galpy potential object.
3. A gala Potential object.
4. An Agama Potential object.
You may add as many external forces as you want - they will be summed
together along with the N-body force.
Parameters
----------
fn : function or galpy potential.Potential object or gala potential.PotentialBase object
External force function to add
args : dict
Any extra parameters that should be passed to the function when it is called
agama_units : dict
Agama potentials take unitless values, so assume that the Agama potential has
been defined with this set of units. The dict should contain the keys
'lenunit' and 'accelunit', which should each by an Astropy Unit or Quantity.
Required for Agama potentials, ignored otherwise.
Example
-------
This function calculates an external force from a single point source::
def my_point_source_force(pos, args):
# acceleration is G M / r^2
GM = const.G * args['mass']
d_pos = pos - args['pos']
r2 = (d_pos**2).sum(axis=1)
rhat = d_pos / np.sqrt(r2)
return rhat * GM / r2
To add the force from a particle at 10kpc on the x-axis with
a mass of 1e8 Msun::
mysimulation = Simulation()
mysimulation.add_external_force(my_point_source_force, {'mass':1e8*u.Msun,
'pos':np.array([10,0,0])*u.kpc})
"""
if isinstance(fn, list):
# Probably a galpy combined potential. Add each one individually.
for item in fn:
self.add_external_force(item, args)
return
if USE_GALA:
# Check if it's a gala potential
if isinstance(fn, gala.potential.PotentialBase):
gala_fn = lambda x, a: self.gala_potential_wrapper(fn, x)
self.extra_force_functions.append( (gala_fn, args) )
return
if USE_GALPY:
# Check if it's a galpy potential
if isinstance(fn, galpy.potential.Potential):
galpy_fn = lambda x, a: self.galpy_potential_wrapper(fn, x)
self.extra_force_functions.append( (galpy_fn, args) )
return
if USE_AGAMA:
# Check if it's an Agama potential
if isinstance(fn, agama.Potential):
# Make sure units were defined.
if (agama_units is None) or ('lenunit' not in agama_units) or \
('accelunit' not in agama_units):
raise ExternalPackageException("Agama potential requires agama_units.")
agama_fn = lambda x, a: self.agama_potential_wrapper(fn, x, agama_units=agama_units)
self.extra_force_functions.append( (agama_fn, args) )
return
# If it hasn't returned before now, it's presumably just a function
self.extra_force_functions.append( (fn,args) )
[docs]
def add_external_timedependent_force(self, fn, agama_units=None, args=None):
"""
Add an external time-dependent force to the simulation.
Forces can be in the form of:
1. A function that takes three arguments: an (Np,3) array of positions (must be
Quantities) that contains the positions where the accelerations are to be
calculated, a scalar time, and an additional argument that is a dictionary
containing any extra parameters you need.
2. A galpy potential object.
3. A gala Potential object.
4. An Agama Potential object.
You may add as many external forces as you want - they will be summed
together along with the N-body force.
Parameters
----------
fn : function or galpy potential.Potential object or gala potential.PotentialBase object
External force function to add
args : dict
Any extra parameters that should be passed to the function when it is called
agama_units : dict
Agama potentials take unitless values, so assume that the Agama potential has
been defined with this set of units. The dict should contain the keys
'lenunit' and 'accelunit', which should each by an Astropy Unit or Quantity.
Required for Agama potentials, ignored otherwise.
Example
-------
This function calculates an external force
from a single point source whose mass oscilllates in time::
def my_oscillating_point_source_force(pos, time, args):
# args has 3 parameters:
# args['pos']: position of mass
# args['massamplitude']: amplitude of oscillating mass (should have mass units)
# args['period']: period of oscillation of mass (should have time units)
# acceleration is G M / r^2
GM = const.G * args['massamplitude'] * (np.cos(2. * np.pi * time / args['period']) + 1.)
d_pos = pos - args['pos']
r2 = (d_pos**2).sum(axis=1)
rhat = d_pos / np.sqrt(r2)
return rhat * GM / r2
To add the force from a particle at 10kpc on the x-axis with
a mass of 10:sup:`8` M:sub:`sun` that oscillates every 100 Myr::
mysimulation = Simulation()
mysimulation.add_external_timedependent_force(my_point_source_force, {'massamplitude':1e8*u.Msun,
'period':100*u.Myr, 'pos':np.array([10,0,0])*u.kpc})
"""
if isinstance(fn, list):
# Probably a galpy combined potential. Add each one individually.
for item in fn:
self.add_external_timedependent_force(item, args)
return
if USE_GALA:
# Check if it's a gala potential
if isinstance(fn, gala.potential.PotentialBase):
gala_fn = lambda x, t, a: self.gala_potential_wrapper(fn, x, time=t)
self.extra_timedependent_force_functions.append( (gala_fn, args) )
return
if USE_GALPY:
# Check if it's a galpy potential
if isinstance(fn, galpy.potential.Potential):
galpy_fn = lambda x, t, a: self.galpy_potential_wrapper(fn, x, time=t)
self.extra_timedependent_force_functions.append( (galpy_fn, args) )
return
if USE_AGAMA:
# Check if it's an Agama potential
if isinstance(fn, agama.Potential):
# Make sure units were defined.
if (agama_units is None) or ('lenunit' not in agama_units) or \
('accelunit' not in agama_units):
raise ExternalPackageException("Agama potential requires agama_units.")
agama_fn = lambda x, t, a: self.agama_potential_wrapper(fn, x, agama_units=agama_units, time=t)
self.extra_timedependent_force_functions.append( (agama_fn, args) )
return
# If it hasn't returned before now, it's presumably just a function
self.extra_timedependent_force_functions.append( (fn,args) )
[docs]
def add_external_velocitydependent_force(self, fn, args=None):
"""Add an external velocity-dependent force to the simulation.
Forces can be in the form of:
1. A function that takes three arguments: an (Np,3) array of positions (must
be Quantities) that contains the positions where the accelerations
are to be calculated, an (Np,3) array of velocities (must be Quantities),
and an additional argument that is a dictionary containing any extra
parameters you need.
2. A galpy potential object (must derive from galpy.potential.DissipativeForce).
You may add as many external forces as you want - they will be summed
together along with the N-body force.
Parameters
----------
fn : function or galpy.potential.DissipativeForcePotential object
External force function to add
args : dict
Any extra parameters that should be passed to the function when it is called
Example
-------
This function adds on an external force that goes in the opposite directly of the
current velocity of every particle with magnitude abs(velocity) / timescale given in args::
def my_friction_force(pos, vel, args):
# args has 1 parameter:
# args['t0']: timescale (should have time units)
velmag = np.sqrt(np.sum(vel**2, axis=1))
forcemag = velmag / args['t0']
forcearray = -vel/velmag[:,np.newaxis] * forcemag[:,np.newaxis]
return forcearray
Then to add a force that slows all particles down on a 100 Myr timescale::
mysimulation = Simulation()
mysimulation.add_external_velocitydependent_force(my_friction_force, {'t0':100*u.Myr})
"""
if isinstance(fn, list):
# Probably a galpy combined potential. Add each one individually.
for item in fn:
self.add_external_velocitydependent_force(item, args)
return
if USE_GALPY:
# Check if it's a galpy velocity-dependent potential
if isinstance(fn, galpy.potential.DissipativeForce.DissipativeForce):
galpy_fn = lambda x, v, a: self.galpy_dissipativeforce_wrapper(fn, x, vel=v)
self.extra_velocitydependent_force_functions.append( (galpy_fn, args) )
return
# If it hasn't returned before now, it's presumably just a function
self.extra_velocitydependent_force_functions.append( (fn,args) )
def nrows(self, array):
"""Return the number of rows in an array or scalar."""
return array.shape[0] if array.ndim>1 else 1
[docs]
def add_IC(self, newIC):
"""Adds particles to the initial conditions.
Parameters
----------
newIC : dict
Properties of new particles to add. Must have the following key-value pairs:
* **pos:** an array of positions
* **vel:** an array of velocities
* **mass:** an array of masses
All should be astropy Quantities, with shape (Np,3) or just (3) if a single particle.
Example
-------
Create a simulation object whose initial conditions consist of a
particle of mass 10:sup:`8` M:sub:`sun` at a position 10 kpc from the origin on
the x-axis, and a velocity of 200 km/s in the positive y-direction::
sim = Simulation()
sim.add_IC( {'pos':np.array([10,0,0])*u.kpc, 'vel':np.array([0,200,0])*u.km/u.s,
'mass':np.array([1e8])*u.Msun} )
See Also
--------
IC : Class containing static functions that generate initial conditions that can be added using add_IC().
"""
#sanity check that all pieces are there and have the same number of particles
if 'pos' not in newIC:
raise ICException("Missing 'pos' key in initial conditions function.")
if 'vel' not in newIC:
raise ICException("Missing 'vel' key in initial conditions function.")
if 'mass' not in newIC:
raise ICException("Missing 'mass' key in initial conditions function.")
npos = self.nrows(newIC['pos'])
nvel = self.nrows(newIC['vel'])
nmass = len(newIC['mass'])
if (npos != nvel) | (npos != nmass):
raise ICException('Inconsistent number of particles in initial conditions function.')
if self.ICarrays==False:
# need to initialize arrays
self.ICarrays = {}
self.ICarrays['pos'] = newIC['pos']
self.ICarrays['vel'] = newIC['vel']
self.ICarrays['mass'] = newIC['mass']
else:
self.ICarrays['pos'] = np.vstack( (self.ICarrays['pos'], newIC['pos']) )
self.ICarrays['vel'] = np.vstack( (self.ICarrays['vel'], newIC['vel']) )
self.ICarrays['mass'] = np.hstack( (self.ICarrays['mass'], newIC['mass']) )
[docs]
def pyn_snap(self, timestep=None):
"""Return snapshot given by the timestep as a pynbody SimSnap.
Parameters
----------
timestep : int
Snapshot number to return. Returns final snapshot if timestep is None (default: None)
Returns
-------
sim : pynbody SimSnap
Snapshot
Notes
-----
Returned snapshot will attempt to put the SimSnap in equivalent units as what
the Simulation object is using internally.
Raises
------
ExternalPackageException
If pynbody could not be imported.
"""
if USE_PYNBODY:
if timestep is None:
timestep = self.timestep
ap_pos_unit = self.positions.unit
pyn_pos_unit = unitconverter.astropy_to_pynbody(ap_pos_unit)
ap_vel_unit = self.velocities.unit
pyn_vel_unit = unitconverter.astropy_to_pynbody(ap_vel_unit)
ap_mass_unit = self.masses.unit
pyn_mass_unit = unitconverter.astropy_to_pynbody(ap_mass_unit)
sim = pyn.new(self.Np)
sim['pos'] = pyn.array.SimArray(self.snap(timestep)['pos'].to(ap_pos_unit).value,\
pyn_pos_unit)
sim['vel'] = pyn.array.SimArray(self.snap(timestep)['vel'].to(ap_vel_unit).value, \
pyn_vel_unit)
sim['mass'] = pyn.array.SimArray(self.snap(timestep)['mass'].to(ap_mass_unit).value, \
pyn_mass_unit)
sim['eps'] = pyn.array.SimArray(self.params['eps'].to(ap_pos_unit).value, pyn_pos_unit)
sim.physical_units(distance=pyn_pos_unit, velocity=pyn_vel_unit, mass=pyn_mass_unit)
return sim
else:
raise ExternalPackageException("Could not import pynbody to use pyn_snap().")
[docs]
def plot_particles(self, parm='pos', coords='xy', snap='final', xlim=None, ylim=None, \
s=0.2, unit=None, ax=None, timeformat='{0:.1f}', nolabels=False, particle_range=None, **kwargs):
"""
Scatter plot of particle positions or velocities for a snapshot.
Parameters
----------
parms : str
'pos' or 'vel' to plot particle positions or velocities (default: 'pos')
coords : str
'xy', 'xz', or 'yz' to plot the different Cartesian projections (default 'xy')
snap : int or str
Which snapshot to plot. Either a snapshot number, 'final' for the last snapshot,
or 'IC' for the initial conditions (default: 'final')
xlim : array-like, optional
x limits of plot
ylim : array-like, optional
y limits of plot
s : float
Scatter plot point size (default: 0.2)
unit: astropy Unit, optional
Plot the quantities in the given unit system. The default is whatever units
the positions or velocities array is in, which is kpc or km/s by default but
can be changed.
ax : matplotlib Axis, optional
Axis on which to plot. If missing or None, uses plt.subplot(111, aspect=1.0) to
create a new Axis object.
timeformat : str or False
Format string for snapshot time in title, or False for no title (default: '{0:.1f}')
nolabels : bool, optional
If True, do not label x and y axes (default: False)
particle_range: array-like, optional
Only plot particles with indices in the slice particle_range[0]:particle_range[1]
**kwargs : dict
All additional keyword arguments are passed onto plt.scatter()
Returns
-------
scatter : matplotlib PathCollection
The scatter plot
"""
# Create axis if necessary.
if ax is None:
ax = plt.subplot(111, aspect=1.0)
# Get the things that will be plotted
if parm=='pos':
data = self.positions
elif parm=='vel':
data = self.velocities
else:
raise ValueError("parm must be 'pos' or 'vel'")
# Figure out coordinate indices
if len(coords) != 2:
raise ValueError("coords must have length 2")
if (coords[0] not in 'xyz') or (coords[1] not in 'xyz'):
raise ValueError("coords characters must be x, y, or z")
xindex = ord(coords[0]) - ord('x')
yindex = ord(coords[1]) - ord('x')
# Figure out snapshot number
if snap=='final':
snapnum = self.timestep
elif snap=='IC':
snapnum = 0
else:
# it's the snapshot number, hopefully
snapnum = snap
# If unit is None, get default unit
if unit is None:
unit = data.unit
# Full range of particles if not specified
if particle_range is None:
particle_range = (0, self.Np)
# Make the plot!
output = ax.scatter(data[snapnum, particle_range[0]:particle_range[1] ,xindex].to(unit).value,\
data[snapnum, particle_range[0]:particle_range[1], yindex].to(unit).value, s=s, **kwargs)
# Store things we'll need for following frames if making a movie
self._plot_parms = {'data_parm':parm, 'particle_range':particle_range, 'xindex':xindex, \
'yindex':yindex, 'unit':unit}
# Set ranges, axis labels, and title
if xlim is not None:
ax.set_xlim(xlim)
if ylim is not None:
ax.set_ylim(ylim)
if nolabels == False:
if parm=='pos':
xlabel = coords[0]
ylabel = coords[1]
elif parm=='vel':
xlabel = 'v_'+coords[0]
ylabel = 'v_'+coords[1]
ax.set_xlabel('${0}$ ({1})'.format(xlabel, str(unit)))
ax.set_ylabel('${0}$ ({1})'.format(ylabel, str(unit)))
self._plot_particles_settitle(ax, snapnum, timeformat)
return output
def _plot_particles_settitle(self, ax, snapnum, timeformat=False):
"""Utility routine to set title of axis to the time of snapnum using the given format."""
if timeformat != False:
ax.set_title(timeformat.format(self.times[snapnum]))
def _plot_particles_setoffsets(self, scatterplot, snapnum):
"""Update a previously-made plot_particles() with a new snapshot number."""
# Get the things that will be plotted
if self._plot_parms['data_parm']=='pos':
data = self.positions
elif self._plot_parms['data_parm']=='vel':
data = self.velocities
else:
raise ValueError("data_parm must be 'pos' or 'vel'")
xdat = data[snapnum, self._plot_parms['particle_range'][0]:self._plot_parms['particle_range'][1], self._plot_parms['xindex']].to(self._plot_parms['unit']).value
ydat = data[snapnum, self._plot_parms['particle_range'][0]:self._plot_parms['particle_range'][1], self._plot_parms['yindex']].to(self._plot_parms['unit']).value
scatterplot.set_offsets(np.c_[xdat, ydat])
return
[docs]
def movie_particles(self, fname, fps=25, ax=None, skip=None, timeformat='{0:.1f}', *args, **kwargs):
"""Create a movie of the particles using the :meth:`~gravhopper.Simulation.plot_particles` function.
Parameters
----------
fname : str
Movie output file name
fps : int
Frames per second (default: 25)
ax : matplotlib Axis, optional
Axis on which to plot. If missing or None, uses plt.subplot(111, aspect=1.0) to
create a new Axis object and closes the figure after the movie is made.
skip : int, optional
Skip every N frames (e.g. skip=5 only has 1/5th of the full number of frames).
xlim : array-like, optional
x limits of the movie (default: encompasses the full extent any particle
reaches)
ylim : array-like, optional
y limits of the movie (default: encompasses the full extent any particle
reaches)
*args : object
Passed through to :meth:`~gravhopper.Simulation.plot_particles`
**kwargs : dict
Passed through to :meth:`~gravhopper.Simulation.plot_particles`
"""
# Create axis if necessary.
if ax is None:
fig = plt.figure()
ax = fig.add_subplot(111, aspect=1.0)
close_plot = True
else:
fig = ax.get_figure()
close_plot = False
# If skip is None, equivalent to 1
if skip is None:
skip = 1
# Initial frame
particles = self.plot_particles(*args, ax=ax, snap='IC', timeformat=timeformat, **kwargs)
# Default x and y limits are min/max over the whole simulation
# Get the things that will be plotted
if self._plot_parms['data_parm']=='pos':
data = self.positions
elif self._plot_parms['data_parm']=='vel':
data = self.velocities
xdat = data[:, self._plot_parms['particle_range'][0]:self._plot_parms['particle_range'][1], self._plot_parms['xindex']].to(self._plot_parms['unit']).value
ydat = data[:, self._plot_parms['particle_range'][0]:self._plot_parms['particle_range'][1], self._plot_parms['yindex']].to(self._plot_parms['unit']).value
if 'xlim' not in kwargs:
kwargs['xlim'] = [np.min(xdat), np.max(xdat)]
ax.set_xlim(kwargs['xlim'])
if 'ylim' not in kwargs:
kwargs['ylim'] = [np.min(ydat), np.max(ydat)]
ax.set_ylim(kwargs['ylim'])
# Function that updates each frame
def animate(frame):
framesnap = frame * skip
self._plot_particles_setoffsets(particles, framesnap)
self._plot_particles_settitle(ax, framesnap, timeformat)
return particles
ms_per_frame = 1000 / fps
anim = FuncAnimation(fig, animate, frames=(self.timestep+1) // skip, interval=ms_per_frame)
anim.save(fname)
if close_plot:
plt.close(fig)
[docs]
class IC(object):
"""Namespace for holding static methods that create a variety of initial conditions."""
[docs]
@staticmethod
def from_galpy_df(df, N=None, totmass=None, center_pos=None, center_vel=None, force_origin=True):
"""Sample a galpy sphericaldf distribution function object and return as an IC.
Parameters
----------
df : galpy df.sphericaldf object
Distribution function to sample. Assumed to be in the ro=8, vo=220 unit system.
N : int
Number of particles
totmass : astropy Quantity
Total mass
center_pos : 3 element array-like Quantity, optional
Force the center of mass of the IC to be at this position
center_vel : 3 element array-like Quantity, optional
Force the mean velocity of the IC to have this velocity
force_origin : bool
Force the center of mass to be at the origin and the mean velocity to be zero;
equivalent to setting center_pos=[0,0,0]*u.kpc and
center_vel=[0,0,0]*u.km/u.s. Default is True unless center_pos and
center_vel is set. If force_origin is True and only one of center_pos or
center_vel is set, the other is set to zero.
Returns
-------
IC : dict
Properties of new particles to add, which sample the given distribution function. Contains
the following key/value pairs:
* **pos:** an array of positions
* **vel:** an array of velocities
* **mass:** an array of masses
Each are astropy Quantities, with shape (Np,3).
Note
----
Up to at least galpy v1.7, galpy df objects don't fully incorporate astropy Quantity inputs and
outputs. Therefore, it is recommended that you define any relevant potential using
the default ro=8, vo=220 unit system, turn off physical output for the potential,
create the df object from the potential, use ``from_galpy_df()`` to create the ICs,
and then turn physical output back on again afterwards if needed (see Example).
Example
-------
Sample an NFW halo with scale radius 20 kpc, scale amplitude 2x10:sup:`11` solar masses, and a maximum
radius of 1 Mpc with 10,000 particles::
from astropy import units as u
from galpy import potential, df
NFWamp = 2e11 * u.Msun
NFWrs = 20 * u.kpc
ro = 8.
vo = 220.
rmax = 1 * u.Mpc
rmax_over_ro = (rmax/(ro*u.kpc)).to(1).value
Nhalo = 10000
NFWpot = potential.NFWPotential(amp=NFWamp, a=NFWrs)
NFWmass = potential.mass(NFWpot, rmax)
potential.turn_physical_off(NFWpot)
NFWdf = df.isotropicNFWdf(pot=NFWpot, rmax=rmax_over_ro)
halo_IC = IC.from_galpy_df(NFWdf, N=Nhalo, totmass=NFWmass)
potential.turn_physical_on(NFWpot) # optional if needed later
Raises
------
ExternalPackageException
If galpy could not be imported.
"""
if USE_GALPY:
if not isinstance(df, galpy.df.sphericaldf):
raise ICException("from_galpy_df() only currently works on galpy.df.sphericaldf objects")
R,vR,vT,z,vz,phi = df.sample(n=N, return_orbit=False)
# galpy.df absolutely refuses to return physical units. So I will assume
# that it's returned in "length_unit=8kpc velocity_unit=220km/s" units.
lenunit = 8*u.kpc
velunit = 220*u.km/u.s
x, y, z = galpy.util.coords.cyl_to_rect(R, phi, z) * lenunit
vx, vy, vz = galpy.util.coords.cyl_to_rect_vec(vR, vT, vz, phi) * velunit
m = np.ones((N)) * (totmass/N)
# Force COM and/or COV
positions, velocities = force_centers(np.vstack((x,y,z)).T, np.vstack((vx,vy,vz)).T, \
center_pos=center_pos, center_vel=center_vel, force_origin=force_origin)
outIC = {'pos':positions, 'vel':velocities, 'mass': m}
return outIC
else:
raise ExternalPackageException("Could not import galpy to use from_galpy_df()")
[docs]
@staticmethod
def from_pyn_snap(pynsnap):
"""Turn a pynbody SimSnap into a set of GravHopper initial conditions.
Parameters
----------
pynsnap : SimSnap
pynbody snapshot to convert
Returns
-------
IC : dict
Properties of new particles to add, which sample the given distribution function. Contains
the following key/value pairs:
* **pos:** an array of positions
* **vel:** an array of velocities
* **mass:** an array of masses
Each are astropy Quantities, with shape (Np,3).
Note
----
The IC will try to use equivalent units to what are in the SimSnap.
Raises
------
ExternalPackageException
If pynbody could not be imported.
"""
if USE_PYNBODY:
# Figure out original units and their equivalents in astropy
pyn_pos_unit = pynsnap['pos'].units
ap_pos_unit = unitconverter.pynbody_to_astropy(pyn_pos_unit)
pyn_vel_unit = pynsnap['vel'].units
ap_vel_unit = unitconverter.pynbody_to_astropy(pyn_vel_unit)
pyn_mass_unit = pynsnap['mass'].units
ap_mass_unit = unitconverter.pynbody_to_astropy(pyn_mass_unit)
# Grab as numpy arrays and switch from pynsnap units to astropy units
positions = pynsnap['pos'].in_units(pyn_pos_unit).view(type=np.ndarray) * ap_pos_unit
velocities = pynsnap['vel'].in_units(pyn_vel_unit).view(type=np.ndarray) * ap_vel_unit
masses = pynsnap['mass'].in_units(pyn_mass_unit).view(type=np.ndarray) * ap_mass_unit
outIC = {'pos':positions, 'vel':velocities, 'mass':masses}
return outIC
else:
raise ExternalPackageException("Could not import pynbody to use from_pyn_snap()")
[docs]
@staticmethod
def TSIS(N=None, maxrad=None, totmass=None, center_pos=None, center_vel=None, force_origin=True, seed=None):
"""Generate the initial conditions for a Truncated Singular Isothermal Sphere
(e.g. BT eqs 4.103 and 4.104 with a maximum radius imposed). Note that this is
not a true equilibrium because of the truncation -- it will be in apporoximate
equilibrium in the inner regions, at least at first, but not in the outer regions.
Parameters
----------
N : int
Number of particles
maxrad : astropy Quantity with length dimensions
Truncation radius
totmass : astropy Quantity with mass dimensions
Total mass
center_pos : 3 element array-like Quantity, optional
Force the center of mass of the IC to be at this position
center_vel : 3 element array-like Quantity, optional
Force the mean velocity of the IC to have this velocity
force_origin : bool
Force the center of mass to be at the origin and the mean velocity to be zero;
equivalent to setting center_pos=[0,0,0]*u.kpc and
center_vel=[0,0,0]*u.km/u.s. Default is True unless center_pos and
center_vel is set. If force_origin is True and only one of center_pos or
center_vel is set, the other is set to zero.
seed : {None, int, array_like[ints], SeedSequence, BitGenerator, Generator}, optional
Seed to initialize random number generator to enable repeatable ICs.
Returns
-------
IC : dict
Properties of new particles to add, which sample the given distribution function. Contains
the following key/value pairs:
* **pos:** an array of positions
* **vel:** an array of velocities
* **mass:** an array of masses
Each are astropy Quantities, with shape (Np,3).
Example
-------
To create a truncated singular isothermal sphere with a total mass of 10:sup:`11` solar masses,
a truncation radius of 100 kpc, sampled with 10,000 particles::
particles = IC.TSiS(N=10000, maxrad=100*u.kpc, totmass=1e11*u.Msun)
"""
if (N is None) or (maxrad is None) or (totmass is None):
raise ICException("TSIS requires N, maxrad, and totmass.")
rng = np.random.default_rng(seed)
sigma = np.sqrt(totmass * const.G / (2 * maxrad)).to(u.km/u.s)
# generate random coordinates and velocities, and compute particle mass
radius = rng.uniform(0.0, maxrad.value, size=N) * maxrad.unit
costheta = rng.uniform(-1.0, 1.0, size=N)
phi = rng.uniform(0.0, 2.0*np.pi, size=N)
sintheta = np.sqrt(1.0 - costheta**2)
x = radius * sintheta * np.cos(phi)
y = radius * sintheta * np.sin(phi)
z = radius * costheta
vx = rng.normal(0.0, sigma.value, size=N) * sigma.unit
vy = rng.normal(0.0, sigma.value, size=N) * sigma.unit
vz = rng.normal(0.0, sigma.value, size=N) * sigma.unit
m = np.ones((N)) * (totmass/N)
# Force COM and/or COV
positions, velocities = force_centers(np.vstack((x,y,z)).T, np.vstack((vx,vy,vz)).T, \
center_pos=center_pos, center_vel=center_vel, force_origin=force_origin)
outIC = {'pos':positions, 'vel':velocities, 'mass': m}
return outIC
[docs]
@staticmethod
def Plummer(N=None, b=None, totmass=None, center_pos=None, center_vel=None, force_origin=True, seed=None):
"""Generate the initial conditions for an isotropic Plummer model (BT eqs. 4.83 with n=5, 4.92, 2.44b).
Parameters
----------
N : int
Number of particles
b : astropy Quantity of dimension length
Scale radius
totmass : astropy Quantity of dimensions mass
Total mass
center_pos : 3 element array-like Quantity, optional
Force the center of mass of the IC to be at this position
center_vel : 3 element array-like Quantity, optional
Force the mean velocity of the IC to have this velocity
force_origin : bool
Force the center of mass to be at the origin and the mean velocity to be zero;
equivalent to setting center_pos=[0,0,0]*u.kpc and
center_vel=[0,0,0]*u.km/u.s. Default is True unless center_pos and
center_vel is set. If force_origin is True and only one of center_pos or
center_vel is set, the other is set to zero.
seed : {None, int, array_like[ints], SeedSequence, BitGenerator, Generator}, optional
Seed to initialize random number generator to enable repeatable ICs.
Returns
-------
IC : dict
Properties of new particles to add, which sample the given distribution function. Contains
the following key/value pairs:
* **pos:** an array of positions
* **vel:** an array of velocities
* **mass:** an array of masses
Each are astropy Quantities, with shape (Np,3).
Example
-------
To create a Plummer sphere with scale radius 1 pc and a total mass of 10:sup:`6` M:sub:`sun`
sampled with 10,000 particles::
particles = IC.Plummer(N=10000, b=1*u.pc, totmass=1e6*u.Msun)
"""
if (N is None) or (b is None) or (totmass is None):
raise ICException("Plummer requires N, b, and totmass.")
rng = np.random.default_rng(seed)
# generate random coordinates and velocities. Uses the law of
# transformation of probabilities.
rad_xi = rng.uniform(0.0, 1.0, size=N)
radius = b / np.sqrt(rad_xi**(-2./3) - 1)
costheta = rng.uniform(-1.0, 1.0, size=N)
phi = rng.uniform(0.0, 2.0*np.pi, size=N)
sintheta = np.sqrt(1.0 - costheta**2)
x = radius * sintheta * np.cos(phi)
y = radius * sintheta * np.sin(phi)
z = radius * costheta
# need to do the velocity component numerically
# from Aarseth+ 1974, we want to draw q from q^2 (1-q^2)^(7/2)
# and then assign the magnitude of v to be
# v = q sqrt(2) (1 + r^2/b^2)^(-1/4)
qax = np.arange(0, 1.01, 0.01)
q_prob = qax**2 * (1. - qax**2)**(3.5)
q_cumprob = np.cumsum(q_prob) # cumulative probability
q_cumprob /= q_cumprob[-1] # normalized correctly to end up at 1
probtransform = interp1d(q_cumprob, qax) # reverse interpolation
# now get the uniform random deviate and transform it
vel_xi = rng.uniform(0.0, 1.0, size=N)
q = probtransform(vel_xi)
velocity = q * np.sqrt(2. * const.G * totmass / b).to(u.km/u.s) * (1. + (radius/b)**2)**(-0.25)
cosveltheta = rng.uniform(-1.0, 1.0, size=N)
velphi = rng.uniform(0.0, 2.0*np.pi, size=N)
sinveltheta = np.sqrt(1.0 - cosveltheta**2)
vx = velocity * sinveltheta * np.cos(velphi)
vy = velocity * sinveltheta * np.sin(velphi)
vz = velocity * cosveltheta
m = np.ones((N)) * (totmass/N)
# Force COM and/or COV
positions, velocities = force_centers(np.vstack((x,y,z)).T, np.vstack((vx,vy,vz)).T, \
center_pos=center_pos, center_vel=center_vel, force_origin=force_origin)
outIC = {'pos':positions, 'vel':velocities, 'mass': m}
return outIC
[docs]
@staticmethod
def Hernquist(N=None, a=None, totmass=None, cutoff=10., center_pos=None, center_vel=None, force_origin=True, seed=None):
"""Generate the initial conditions for an isotropic Hernquist model (Hernquist 1990).
Parameters
----------
N : int
Number of particles
a : astropy Quantity of dimension length
Scale radius
totmass : astropy Quantity of dimension mass
Total mass
cutoff : float
Cut off the distribution at cutoff times the scale radius
center_pos : 3 element array-like Quantity, optional
Force the center of mass of the IC to be at this position
center_vel : 3 element array-like Quantity, optional
Force the mean velocity of the IC to have this velocity
force_origin : bool
Force the center of mass to be at the origin and the mean velocity to be zero;
equivalent to setting center_pos=[0,0,0]*u.kpc and
center_vel=[0,0,0]*u.km/u.s. Default is True unless center_pos and
center_vel is set. If force_origin is True and only one of center_pos or
center_vel is set, the other is set to zero.
seed : {None, int, array_like[ints], SeedSequence, BitGenerator, Generator}, optional
Seed to initialize random number generator to enable repeatable ICs.
Returns
-------
IC : dict
Properties of new particles to add, which sample the given distribution function. Contains
the following key/value pairs:
* **pos:** an array of positions
* **vel:** an array of velocities
* **mass:** an array of masses
Each are astropy Quantities, with shape (Np,3).
Example
-------
To create a Hernquist sphere with a total mass of 10:sup:`10` solar masses, a scale radius
of 1 kpc, sampled with 10,000 particles::
particles = IC.Hernquist(N=10000, a=1*u.kpc, totmass=1e10*u.Msun)
"""
rng = np.random.default_rng(seed)
# Based on equation 10 from Hernquist 1990, I get r/a = 1/(xi^(-1/2) - 1)
# and xi = r^2 / (1 + r)^2 where r means r/a
# get positions
# turn cutoff into max xi
xi_cutoff = cutoff**2 / ((1. + cutoff)**2)
rad_xi = rng.uniform(0.0, xi_cutoff, size=N)
r_over_a = 1./(1./np.sqrt(rad_xi) - 1.)
radius = r_over_a * a
costheta = rng.uniform(-1.0, 1.0, size=N)
phi = rng.uniform(0.0, 2.*np.pi, size=N)
sintheta = np.sqrt(1.0 - costheta**2)
x = radius * sintheta * np.cos(phi)
y = radius * sintheta * np.sin(phi)
z = radius * costheta
# For velocities, define f(E) from (17) (as re-expressed by Baes & Dejonghe 2002) and
# sample the cumulative function for interpolation.
def fE(E):
return (np.sqrt(E)*(1-2.*E)*(8.*E*E - 8*E - 3.)/((1.-E)**2) + \
3.*np.arcsin(np.sqrt(E))/((1.-E)**(5./2)))/(8.*np.sqrt(2)*np.pi**3)
Eax = np.arange(0.0, 1.0, 0.002)
cumulative_fE = [integrate.quad(fE, 0.0, Etop)[0] for Etop in Eax]
# binding E must be greater than -potential
potential = -1./(1. + r_over_a)
# check and make sure the top of the interpolation bound is okay, otherwise add an extra point
most_bound_potential = np.max(-potential)
if most_bound_potential > Eax.max():
np.append(Eax, most_bound_potential)
cumulative_fE.append(integrate.quad(fE, 0.0, most_bound_potential)[0])
cumulative_fE = np.array(cumulative_fE)/np.max(cumulative_fE)
# build interpolation functions
Einterp = interp1d(cumulative_fE, Eax)
inverse_Einterp = interp1d(Eax, cumulative_fE)
# instead of going from 0 to 1, go from 0 to max possible for that radius
# use inverse interpolation function to find maximum possible xi for a given radius
max_possible_xi = inverse_Einterp(-potential)
E_xi = rng.uniform(0.0, max_possible_xi, size=N)
bindingE = Einterp(E_xi)
# convert to velocity in real units
energy_units = const.G * totmass / a
# E = - 0.5 v^2 - Phi(r)
# so v = sqrt(2 |E+Phi(r)|)
velocity = np.sqrt(2. * (-(bindingE+potential))*energy_units).to(u.km/u.s)
# give it a random direction
cosveltheta = rng.uniform(-1.0, 1.0, size=N)
velphi = rng.uniform(0.0, 2.0*np.pi, size=N)
sinveltheta = np.sqrt(1.0 - cosveltheta**2)
vx = velocity * sinveltheta * np.cos(velphi)
vy = velocity * sinveltheta * np.sin(velphi)
vz = velocity * cosveltheta
m = np.ones((N)) * (totmass/N)
# Force COM and/or COV
positions, velocities = force_centers(np.vstack((x,y,z)).T, np.vstack((vx,vy,vz)).T, \
center_pos=center_pos, center_vel=center_vel, force_origin=force_origin)
outIC = {'pos':positions, 'vel':velocities, 'mass': m}
return outIC
[docs]
@staticmethod
def expdisk(sigma0=None, Rd=None, z0=None, sigmaR_Rd=None, external_rotcurve=None, N=None, \
center_pos=None, center_vel=None, force_origin=True, seed=None):
"""Generates initial conditions of an exponential disk with a sech^2 vertical distribution that is
in (very) approximate equilibrium: rho(R,z) = (sigma0 / 2 z0) exp(-R/Rd) sech^2(z/z0)
Parameters
----------
sigma0 : astropy Quantity with dimensions of surface density
Central surface density
Rd : astropy Quantity with dimensions of length
Radial exponential scale length
z0 : astropy Quantity with dimensions of length
Vertical scale height
sigmaR_Rd : astropy Quantity with dimensions of velocity
Radial velocity dispersion at R=Rd
external_rotcurve : function or None
Function that returns the circular velocity of any external potential that contributes
to the rotation curve aside from the disk itself. The function should accept input
as an astropy Quantity of dimension length, and should return an astropy Quantity of
dimension velocity.
N : int
Number of particles
center_pos : 3 element array-like Quantity, optional
Force the center of mass of the IC to be at this position
center_vel : 3 element array-like Quantity, optional
Force the mean velocity of the IC to have this velocity
force_origin : bool
Force the center of mass to be at the origin and the mean velocity to be zero;
equivalent to setting center_pos=[0,0,0]*u.kpc and
center_vel=[0,0,0]*u.km/u.s. Default is True unless center_pos and
center_vel is set. If force_origin is True and only one of center_pos or
center_vel is set, the other is set to zero.
seed : {None, int, array_like[ints], SeedSequence, BitGenerator, Generator}, optional
Seed to initialize random number generator to enable repeatable ICs.
Returns
-------
IC : dict
Properties of new particles to add, which sample the given distribution function. Contains
the following key/value pairs:
* **pos:** an array of positions
* **vel:** an array of velocities
* **mass:** an array of masses
Each are astropy Quantities, with shape (Np,3).
Example
-------
To create an exponential disk that is in a background logarithmic halo potential that
generates a flat rotation curve of 200 km/s::
particles = IC.expdisk(N=10000, sigma0=200*u.Msun/u.pc**2, Rd=2*u.kpc,
z0=0.5*u.kpc, sigmaR_Rd=10*u.km/u.s,
external_rotcurve=lambda x: 200*u.km/u.s)
"""
rng = np.random.default_rng(seed)
Rd_kpc = Rd.to(u.kpc).value
totmass = (np.pi * Rd**2 * sigma0).to(u.Msun)
# cylindrical radius transformation to give an exponential
Rax = np.arange(0.001*Rd_kpc, 10*Rd_kpc, 0.01*Rd_kpc)
R_cumprob = Rd_kpc**2 - Rd_kpc*np.exp(-Rax/Rd_kpc)*(Rax+Rd_kpc)
R_cumprob /= R_cumprob[-1]
probtransform = interp1d(R_cumprob, Rax) # reverse interpolation
# now get the uniform random deviate and transform it
R_xi = rng.uniform(0.0, 1.0, size=N)
R = probtransform(R_xi) * u.kpc
# use random azimuth
phi = rng.uniform(0.0, 2.0*np.pi, size=N)
x = R * np.cos(phi)
y = R * np.sin(phi)
# get z from uniform random deviate
z_xi = rng.uniform(0, 1.0, size=N)
z = 2 * z0 * np.arctanh(z_xi)
z *= (2 * (rng.uniform(0, 1, size=N) < 0.5)) - 1
# the velocity dispersions go as:
# sigma_R = sigmaR_Rd * exp(-R/Rd)
# sigma2_phi = sigmaR^2 * kappa^2 / 4 Omega^2
# sigma2_z = pi G Sigma(R) z0 / 2
# and the mean azimuthal velocity is
# <vphi> = vc
def om2(rad):
# Input must be in kpc but with units stripped, because it's passed into np.derivative.
y_R = rad/(2.*Rd_kpc)
# Disk contribution
omega2 = np.pi * const.G * sigma0 / Rd * (special.iv(0,y_R)*special.kv(0,y_R) -
special.iv(1,y_R)*special.kv(1,y_R))
# Halo contribution
if external_rotcurve is not None:
omega_halo = external_rotcurve(rad*u.kpc) / (rad*u.kpc)
omega2 += omega_halo**2
return omega2
Omega2 = om2(R.to(u.kpc).value)
kappa2 = 4.*Omega2 + R * derivative(om2, R.to(u.kpc).value, 1e-3) / u.kpc
sigma_R = sigmaR_Rd * np.exp(0.25*(1-R/Rd))
sigma2_phi = sigma_R**2 * 4 * Omega2 / kappa2
sigma2_z = np.pi * const.G * z0 * sigma0 * 0.5 * np.exp(-R/Rd)
vphi_mean = (R * np.sqrt(Omega2)).to(u.km/u.s)
vphi = np.sqrt(sigma2_phi).to(u.km/u.s) * rng.normal(size=N) + vphi_mean
vR = sigma_R.to(u.km/u.s) * rng.normal(size=N)
vx = -vphi * np.sin(phi) + vR * np.cos(phi)
vy = vphi * np.cos(phi) + vR * np.sin(phi)
vz = np.sqrt(sigma2_z).to(u.km/u.s) * rng.normal(size=N)
m = np.ones((N)) * (totmass/N)
# Force COM and/or COV
positions, velocities = force_centers(np.vstack((x,y,z)).T, np.vstack((vx,vy,vz)).T, \
center_pos=center_pos, center_vel=center_vel, force_origin=force_origin)
outIC = {'pos':positions, 'vel':velocities, 'mass': m}
return outIC
def force_centers(positions, velocities, center_pos=None, center_vel=None, force_origin=True):
"""Move positions and velocities to have the desired center of mass position and mean velocity.
Parameters
----------
positions : array of Quantities of dimension length
(Np,3) array of particle positions
velocities : array of Quantities of dimension velocity
(Np,3) array of particle velocities
center_pos : 3 element array-like Quantity, optional
Force the center of mass of the IC to be at this position
center_vel : 3 element array-like Quantity, optional
Force the mean velocity of the IC to have this velocity
force_origin : bool
Force the center of mass to be at the origin and the mean velocity to be zero;
equivalent to setting center_pos=np.array([0,0,0])*u.kpc and
center_vel=np.array([0,0,0])*u.km/u.s. Default is True unless center_pos and
center_vel is set. If force_origin is True and only one of center_pos or
center_vel is set, the other is set to zero.
Returns
-------
newpositions : array of Quantities of dimension length
New shifted positions
newvelocities : array of Quantities of dimension velocity
New shifted velocities
"""
newpos = positions
newvel = velocities
if force_origin:
if center_pos is None:
center_pos = np.array([0,0,0])*u.kpc
if center_vel is None:
center_vel = np.array([0,0,0])*u.km/u.s
if center_pos is not None:
sampled_com = np.mean(positions, axis=0)
dpos = center_pos - sampled_com
newpos += dpos
if center_vel is not None:
sampled_cov = np.mean(velocities, axis=0)
dvel = center_vel - sampled_cov
newvel += dvel
return (newpos, newvel)