Cookbook
Ready-to-run snippets for common Pulse tasks.
Each recipe below is a self-contained snippet you can drop straight into a notebook or script. Swap in your own symbols, dates, and parameters as needed. All examples assume you have the SDK installed and an API key set — see Quickstart if not.
Simulations
Run, wait, and download in one blockPro
Submit a job, block until it finishes, then pull down the first simulation’s data — the most common Pro-tier pattern.
Custom simulation jobs consume Pro-tier credits. Check your remaining allowance on the Dashboard before running large batches.
import time
from simudyne import PulseABM
client = PulseABM() # reads SIMUDYNE_API_KEY from environment
result = client.simulation.run(
symbol="700.HK",
cal_date="2025-09-01",
n_runs=5,
)
job_id = result["job_id"]
sim_ids = result["queued_sim_ids"]
print(f"Submitted {job_id} — {len(sim_ids)} runs queued")
while True:
status = client.simulation.get_job_status(job_id)
done = status["status_summary"].get("complete", 0)
print(f"\r {done}/{len(sim_ids)} complete", end="", flush=True)
if status["is_complete"] or status.get("has_errors"):
break
time.sleep(30)
df = client.simulation.get_sim_data(sim_ids[0])
print(df.head())Get the latest calibration date for a symbolFree
A handy helper that queries get_available_symbols() and returns the most recent date that is fully ready for simulation — so you never accidentally pass a stale or incomplete date to run().
def latest_cal_date(client, symbol: str) -> str | None:
"""Return the most recent fully-calibrated date for *symbol*."""
for sym in client.data.get_available_symbols():
if sym["name"] == symbol:
ready = sorted(
[
d["date"] for d in sym["available_dates"]
if d.get("status") == "complete"
and d.get("stage") == "model_calibration"
],
reverse=True,
)
return ready[0] if ready else None
return None
cal_date = latest_cal_date(client, "700.HK")
print(f"Latest ready date: {cal_date}")Pin a seed for reproducible runsPro
Pass seed to run() to lock in the random state. The same seed, symbol, and date always produce the same Monte Carlo paths.
result = client.simulation.run(
symbol="700.HK",
cal_date="2025-09-01",
n_runs=5,
seed=42,
)
job_id = result["job_id"]
# Re-run with the same call to reproduce identical sim_ids
print(result["queued_sim_ids"])Useful when sharing results with a colleague — send the seed alongside the job config so they can regenerate the exact same paths.
Scenarios
Compare normal vs flash crash on the same datePro
Submit both scenarios concurrently and collect their job IDs, then poll them together before downloading results. The shared cal_date ensures the background market microstructure is identical across scenarios.
import time
scenarios = ["normal", "flash_crash"]
jobs: dict[str, str] = {}
for scenario in scenarios:
result = client.simulation.run(
symbol="700.HK",
cal_date="2025-09-01",
n_runs=5,
scenario=scenario,
)
jobs[scenario] = result["job_id"]
print(f"Submitted {scenario}: {result['job_id']}")
# Wait for all jobs
for scenario, job_id in jobs.items():
while True:
status = client.simulation.get_job_status(job_id)
if status["is_complete"] or status.get("has_errors"):
print(f"{scenario} done")
break
time.sleep(30)Tune flash crash timing and severityPro
Use scenario_params to control when the crash starts and how hard it hits. impact_multiplier is the total scenario volume as a multiple of average total resting liquidity (bid + ask combined).
result = client.simulation.run(
symbol="700.HK",
cal_date="2025-09-01",
n_runs=5,
scenario="flash_crash",
scenario_params={
"start_time": "11:00:00", # trigger mid-session
"impact_multiplier": 20.0, # severe liquidity drain
"order_freq": "500ms", # high-frequency cascade
},
)
job_id = result["job_id"]See Simulations for the full list of scenario parameter names and their defaults.
Execution algorithms
Compare TWAP across multiple execution windowsPro
Pass multiple algo configs in a single exec_algos list. Each strategy runs against the same shared baseline, so cost comparisons are apples-to-apples.
windows = [
("09:30:00", "11:30:00"), # morning only
("09:30:00", "16:00:00"), # full day
("13:00:00", "16:00:00"), # afternoon only
]
exec_algos = [
{
"type": "twap",
"order_size": 50_000,
"horizon": 7200, # 2-hour execution window in seconds
"start_time": start,
}
for start, _ in windows
]
result = client.simulation.run(
symbol="700.HK",
cal_date="2025-09-01",
n_runs=5,
exec_algos=exec_algos,
)
job_id = result["job_id"]
# sim_ids layout: baseline[0:n_runs], strategy_0, strategy_1, ...
sim_ids = result["queued_sim_ids"]
baseline_ids = sim_ids[:5]
strategy_ids = [sim_ids[5 + i * 5 : 10 + i * 5] for i in range(len(windows))]
while True:
s = client.simulation.get_job_status(result["job_id"])
if s["is_complete"] or s.get("has_errors"):
break
time.sleep(30)
# Flat list of all algo sim IDs — used by the metrics recipe below
algo_sim_ids = [sid for group in strategy_ids for sid in group]
# Lookup map and metadata — used by the threadpool download recipe below
strategies = [{"start": start, "end": end} for start, end in windows]
algo_sim_map = {
(strat_idx, run_idx): sid
for strat_idx, group in enumerate(strategy_ids)
for run_idx, sid in enumerate(group)
}Retrieve execution quality metricsPro
Once a job with execution algorithms is complete, exec_results.parquet contains per-run slippage and impact metrics. Aggregate across runs for a summary.
import polars as pl
rows = []
for sim_id in algo_sim_ids:
df = client.simulation.get_sim_data(sim_id, "exec_results.parquet")
rows.append(df.to_pandas().iloc[0].to_dict() | {"sim_id": sim_id})
results_df = pl.DataFrame(rows)
print(results_df.select(["sim_id", "market_slippage", "market_impact"]).describe())Working with results
Plot simulated price paths across runsPro
Download mid-price data for all runs in a job and overlay them to visualise the distribution of price trajectories.
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt
def fetch_mid(sim_id):
return client.simulation.get_sim_data(sim_id, "mid_price_by_min.parquet")
with ThreadPoolExecutor(max_workers=8) as pool:
frames = list(pool.map(fetch_mid, sim_ids))
fig, ax = plt.subplots(figsize=(12, 5))
for df in frames:
ax.plot(df["time"].to_list(), df["mid_price"].to_list(),
alpha=0.35, linewidth=0.8, color="steelblue")
ax.set_title("Simulated price paths")
ax.set_xlabel("Time")
ax.set_ylabel("Mid price (HKD)")
plt.tight_layout()
plt.show()Threadpooled download of execution resultsPro
For faster download across a large set of simulation runs, use a thread pool to fetch files in parallel. This is especially useful when collecting exec_results.parquet across many strategy variants.
from concurrent.futures import ThreadPoolExecutor, as_completed
FILE_TO_DOWNLOAD = "exec_results.parquet"
DOWNLOAD_WORKERS = 8
def fetch_file(sim_id):
"""Download exec_results.parquet for a single sim_id."""
try:
df = client.simulation.get_sim_data(sim_id, FILE_TO_DOWNLOAD)
return df.to_pandas().iloc[0].to_dict()
except Exception as exc:
return {"error": f"download_failed: {exc}"}
print(f"Downloading {len(algo_sim_map)} algo sims...")
all_rows = []
done_count = 0
with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as pool:
futures = {}
for (strat_idx, run_idx), sid in algo_sim_map.items():
futures[pool.submit(fetch_file, sid)] = (strat_idx, run_idx, sid)
for future in as_completed(futures):
strat_idx, run_idx, sid = futures[future]
row = future.result()
row["sim_id"] = sid
row["strategy_index"] = strat_idx
row["run_index"] = run_idx
row["start"] = str(strategies[strat_idx]["start"])
row["end"] = str(strategies[strat_idx]["end"])
all_rows.append(row)
done_count += 1
print(f"\r {done_count}/{len(algo_sim_map)}", end="", flush=True)List cached baselines and plot Monte Carlo paths by scenarioFree
Use list_cached() to discover available pre-computed baselines, then download and overlay every run coloured by scenario. No job submission or Pro credits required.
import matplotlib.pyplot as plt
from simudyne import PulseABM
client = PulseABM() # reads SIMUDYNE_API_KEY from environment
COLORS = {"normal": "steelblue", "flash_crash": "crimson"}
LABELS = {"normal": "Normal", "flash_crash": "Flash Crash"}
cached = client.simulation.list_cached(symbol="700.HK", date="2025-09-01")
# one entry per scenario — pick the variant with the most runs
by_scenario = {}
for s in cached["simulations"]:
sc = s["scenario"]
if sc not in by_scenario or s["n_runs"] > by_scenario[sc]["n_runs"]:
by_scenario[sc] = s
fig, ax = plt.subplots(figsize=(14, 5))
for sim in by_scenario.values():
scenario = sim["scenario"]
if scenario not in LABELS:
continue
base = sim["example_sim_id"].rsplit(":", 1)[0]
first = True
for i in range(sim["n_runs"]):
print(f"\r {LABELS[scenario]} {i + 1}/{sim['n_runs']}", end="", flush=True)
try:
df = client.simulation.get_sim_data(f"{base}:{i:04d}", "mid_price_by_min.parquet")
except Exception:
continue
ax.plot(df["time"].to_list(), df["mid_price"].to_list(),
color=COLORS[scenario], linewidth=0.6, alpha=0.35,
label=LABELS[scenario] if first else "_nolegend_")
first = False
print()
ax.set_title("700.HK 2025-09-01 — Monte Carlo paths (mid price by minute)")
ax.set_xlabel("Time")
ax.set_ylabel("Mid Price")
ax.legend(title="Scenario")
fig.autofmt_xdate()
plt.tight_layout()
plt.savefig("mc_paths_700HK_20250901.png", dpi=150)
plt.show()Bulk download and read without saving to diskFree
Pull a set of simulations as a ZIP archive and read every parquet file directly into Polars — no intermediate files written.
import io, zipfile, polars as pl
cached = client.simulation.list_cached(symbol="700.HK")
sim_ids = [s["example_sim_id"] for s in cached["simulations"]]
zip_bytes = client.simulation.get_bulk_data(
sim_ids=sim_ids,
include_sim_data=True,
include_mid_price=True,
)
frames: dict[str, pl.DataFrame] = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if name.endswith(".parquet"):
frames[name] = pl.read_parquet(io.BytesIO(zf.read(name)))
print(f"Loaded {len(frames)} files")
for name, df in frames.items():
print(f" {name}: {df.shape}")