Tutorial: Write a Sampler¶
In case you want to extend the sampling facilities its rather simple to write an own Sampler.
First thing you have to decide is what kind of sampler you want to write. PySurf comes with two base Sampling classes:
CrdSampler:
- Most basic sampler, it only returns coordinates of the system.
TODO: add features
DynSampler: Sampler used for dynamics simulations, it needs to return initial condition, meaning positions and velocities.
TODO: add features
All DynSampler are also at the same time CrdSampler.
Example: Gromacs Trajectory Sampler¶
In this tutorial, we show how to write a simple DynSampler, that uses MDAnalysis to read information from a Gromacs Trajectory and reads both coordinates and velocities.
Step 0: Basic Features¶
As we are writing a DynSampler, we need to import DynSamplerBase. Additionally, we will use numpy an MDAnalysis in this tutorial, so we start with:
1import numpy as np
2import MDAnalysis as mda
3from pysurf.sampler import DynSamplerBase
Step 1: Basic Features¶
Every DynSamplerBase has to implement 3 functions:
- from_config(cls, config), classmethod:
classmethod to initialize the class using the information provided from the user
- get_condition(self):
returns the next condition
- get_init(self):
returns the initial structure
1class GromacsSampler(DynSamplerBase):
2 """Sample from Gromacs Trajectory"""
3
4 @classmethod
5 def from_config(cls, config):
6 """Initialize the object"""
7 return cls()
8
9 def get_condition(self):
10 """return the next condition"""
11
12 def get_init(self):
13 """return the initial condition"""
Step 2: User Input¶
PySurf is build around the Colt framework, developed along the lines of this project. To specify specific input needed for your class you simply use the _questions string: In our example we will need 4 user inputs:
- tpr_file, existing_file:
the tpr file of Gromacs
- trr_file, existing_file:
the trr file of Gromacs
- every, int:
read only every nth step
- selection, str, optional:
save only a specific MDAnalysis-Selection, if not set, use all atoms
1class GromacsSampler(DynSamplerBase):
2 """Sample from Gromacs Trajectory"""
3
4 _questions = """
5 # name of the tpr file
6 tpr_file = :: existing_file
7 # name of the trr file
8 trr_file = :: existing_file
9 # read every nth trajectory from the file
10 every = 1 :: int :: >1
11 # selection
12 selection = :: str, optional
13 """
14
15 @classmethod
16 def from_config(cls, config):
17 return cls(config['tpr_file'], config['trr_file'], every=config['every'],
18 selection=config['selection'])
19
20 def __init__(self, tpr_file, trr_file, every=1, selection=None):
21 # MDA Universe
22 self.universe = mda.Universe(tpr_file, trr_file)
23 # Maximum number of frames
24 self.n_max = len(self.universe.trajectory)
25 # current active frame
26 self.current = 0
27 # save every
28 self.every = every
29 # get the selection
30 if selection is None:
31 # use all atoms
32 self.atoms = self.universe.atoms
33 else:
34 # use a specific selection
35 self.atoms = self.universe.select_atoms(selection)
Step 3: Get Condition¶
The next and last step is to implement the get_condidion function, to actually return the correct information back the the sampler:
1class GromacsSampler(DynSamplerBase):
2 """Sample from Gromacs Trajectory"""
3
4 ...
5
6 def get_condition(self):
7 if self.current < self.n_max:
8 return self.next_condition()
9 raise IndexError(f"Trajectory has only {self.n_max} elements, cannot access {self.current}")
We check that the current trajectory is available, and if so, we read it and move to the next condition. If not we raise an IndexError.
1class GromacsSampler(DynSamplerBase):
2 """Sample from Gromacs Trajectory"""
3
4 ...
5
6 def get_condition(self):
7 if self.current < self.n_max:
8 return self.next_condition()
9 raise IndexError(f"Trajectory has only {self.n_max} elements, cannot access {self.current}")
10
11 def next_condition(self):
12 crd = np.copy(self.atoms.positions)
13 veloc = np.copy(self.atoms.velocities)
14 #
15 self.update_condition()
16 return self.condition(crd=crd, veloc=veloc, state=None)
17
18 def update_condition(self):
19 try:
20 for _ in range(self.every):
21 next(self.universe.trajectory)
22 self.current += self.every
23 except StopIteration:
24 self.current = self.n_max