Cookbook

Ready-to-run snippets for common Pulse tasks.

Each recipe below is a self-contained snippet you can drop straight into a notebook or script. Swap in your own symbols, dates, and parameters as needed. All examples assume you have the SDK installed and an API key set — see Quickstart if not.

Simulations

Run, wait, and download in one blockPro

Submit a job, block until it finishes, then pull down the first simulation’s data — the most common Pro-tier pattern.

Custom simulation jobs consume Pro-tier credits. Check your remaining allowance on the Dashboard before running large batches.

python
import time
from simudyne import PulseABM

client = PulseABM()  # reads SIMUDYNE_API_KEY from environment

result = client.simulation.run(
    symbol="700.HK",
    cal_date="2025-09-01",
    n_runs=5,
)
job_id  = result["job_id"]
sim_ids = result["queued_sim_ids"]
print(f"Submitted {job_id} — {len(sim_ids)} runs queued")

while True:
    status = client.simulation.get_job_status(job_id)
    done   = status["status_summary"].get("complete", 0)
    print(f"\r  {done}/{len(sim_ids)} complete", end="", flush=True)
    if status["is_complete"] or status.get("has_errors"):
        break
    time.sleep(30)

df = client.simulation.get_sim_data(sim_ids[0])
print(df.head())

Get the latest calibration date for a symbolFree

A handy helper that queries get_available_symbols() and returns the most recent date that is fully ready for simulation — so you never accidentally pass a stale or incomplete date to run().

python
def latest_cal_date(client, symbol: str) -> str | None:
    """Return the most recent fully-calibrated date for *symbol*."""
    for sym in client.data.get_available_symbols():
        if sym["name"] == symbol:
            ready = sorted(
                [
                    d["date"] for d in sym["available_dates"]
                    if d.get("status") == "complete"
                    and d.get("stage") == "model_calibration"
                ],
                reverse=True,
            )
            return ready[0] if ready else None
    return None

cal_date = latest_cal_date(client, "700.HK")
print(f"Latest ready date: {cal_date}")

Pin a seed for reproducible runsPro

Pass seed to run() to lock in the random state. The same seed, symbol, and date always produce the same Monte Carlo paths.

python
result = client.simulation.run(
    symbol="700.HK",
    cal_date="2025-09-01",
    n_runs=5,
    seed=42,
)
job_id = result["job_id"]
# Re-run with the same call to reproduce identical sim_ids
print(result["queued_sim_ids"])

Useful when sharing results with a colleague — send the seed alongside the job config so they can regenerate the exact same paths.

Scenarios

Compare normal vs flash crash on the same datePro

Submit both scenarios concurrently and collect their job IDs, then poll them together before downloading results. The shared cal_date ensures the background market microstructure is identical across scenarios.

python
import time

scenarios = ["normal", "flash_crash"]
jobs: dict[str, str] = {}

for scenario in scenarios:
    result = client.simulation.run(
        symbol="700.HK",
        cal_date="2025-09-01",
        n_runs=5,
        scenario=scenario,
    )
    jobs[scenario] = result["job_id"]
    print(f"Submitted {scenario}: {result['job_id']}")

# Wait for all jobs
for scenario, job_id in jobs.items():
    while True:
        status = client.simulation.get_job_status(job_id)
        if status["is_complete"] or status.get("has_errors"):
            print(f"{scenario} done")
            break
        time.sleep(30)

Tune flash crash timing and severityPro

Use scenario_params to control when the crash starts and how hard it hits. impact_multiplier is the total scenario volume as a multiple of average total resting liquidity (bid + ask combined).

python
result = client.simulation.run(
    symbol="700.HK",
    cal_date="2025-09-01",
    n_runs=5,
    scenario="flash_crash",
    scenario_params={
        "start_time": "11:00:00",   # trigger mid-session
        "impact_multiplier": 20.0,  # severe liquidity drain
        "order_freq": "500ms",      # high-frequency cascade
    },
)
job_id = result["job_id"]

See Simulations for the full list of scenario parameter names and their defaults.

Execution algorithms

Compare TWAP across multiple execution windowsPro

Pass multiple algo configs in a single exec_algos list. Each strategy runs against the same shared baseline, so cost comparisons are apples-to-apples.

python
windows = [
    ("09:30:00", "11:30:00"),  # morning only
    ("09:30:00", "16:00:00"),  # full day
    ("13:00:00", "16:00:00"),  # afternoon only
]

exec_algos = [
    {
        "type": "twap",
        "order_size": 50_000,
        "horizon": 7200,        # 2-hour execution window in seconds
        "start_time": start,
    }
    for start, _ in windows
]

result = client.simulation.run(
    symbol="700.HK",
    cal_date="2025-09-01",
    n_runs=5,
    exec_algos=exec_algos,
)
job_id = result["job_id"]

# sim_ids layout: baseline[0:n_runs], strategy_0, strategy_1, ...
sim_ids = result["queued_sim_ids"]
baseline_ids = sim_ids[:5]
strategy_ids  = [sim_ids[5 + i * 5 : 10 + i * 5] for i in range(len(windows))]

while True:
    s = client.simulation.get_job_status(result["job_id"])
    if s["is_complete"] or s.get("has_errors"):
        break
    time.sleep(30)

# Flat list of all algo sim IDs — used by the metrics recipe below
algo_sim_ids = [sid for group in strategy_ids for sid in group]

# Lookup map and metadata — used by the threadpool download recipe below
strategies = [{"start": start, "end": end} for start, end in windows]
algo_sim_map = {
    (strat_idx, run_idx): sid
    for strat_idx, group in enumerate(strategy_ids)
    for run_idx, sid in enumerate(group)
}

Retrieve execution quality metricsPro

Once a job with execution algorithms is complete, exec_results.parquet contains per-run slippage and impact metrics. Aggregate across runs for a summary.

python
import polars as pl

rows = []
for sim_id in algo_sim_ids:
    df = client.simulation.get_sim_data(sim_id, "exec_results.parquet")
    rows.append(df.to_pandas().iloc[0].to_dict() | {"sim_id": sim_id})

results_df = pl.DataFrame(rows)
print(results_df.select(["sim_id", "market_slippage", "market_impact"]).describe())

Working with results

Plot simulated price paths across runsPro

Download mid-price data for all runs in a job and overlay them to visualise the distribution of price trajectories.

python
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt

def fetch_mid(sim_id):
    return client.simulation.get_sim_data(sim_id, "mid_price_by_min.parquet")

with ThreadPoolExecutor(max_workers=8) as pool:
    frames = list(pool.map(fetch_mid, sim_ids))

fig, ax = plt.subplots(figsize=(12, 5))
for df in frames:
    ax.plot(df["time"].to_list(), df["mid_price"].to_list(),
            alpha=0.35, linewidth=0.8, color="steelblue")

ax.set_title("Simulated price paths")
ax.set_xlabel("Time")
ax.set_ylabel("Mid price (HKD)")
plt.tight_layout()
plt.show()

Threadpooled download of execution resultsPro

For faster download across a large set of simulation runs, use a thread pool to fetch files in parallel. This is especially useful when collecting exec_results.parquet across many strategy variants.

python
from concurrent.futures import ThreadPoolExecutor, as_completed

FILE_TO_DOWNLOAD = "exec_results.parquet"
DOWNLOAD_WORKERS = 8

def fetch_file(sim_id):
    """Download exec_results.parquet for a single sim_id."""
    try:
        df = client.simulation.get_sim_data(sim_id, FILE_TO_DOWNLOAD)
        return df.to_pandas().iloc[0].to_dict()
    except Exception as exc:
        return {"error": f"download_failed: {exc}"}

print(f"Downloading {len(algo_sim_map)} algo sims...")
all_rows = []
done_count = 0

with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as pool:
    futures = {}
    for (strat_idx, run_idx), sid in algo_sim_map.items():
        futures[pool.submit(fetch_file, sid)] = (strat_idx, run_idx, sid)

    for future in as_completed(futures):
        strat_idx, run_idx, sid = futures[future]
        row = future.result()
        row["sim_id"] = sid
        row["strategy_index"] = strat_idx
        row["run_index"] = run_idx
        row["start"] = str(strategies[strat_idx]["start"])
        row["end"] = str(strategies[strat_idx]["end"])
        all_rows.append(row)

        done_count += 1
        print(f"\r  {done_count}/{len(algo_sim_map)}", end="", flush=True)

List cached baselines and plot Monte Carlo paths by scenarioFree

Use list_cached() to discover available pre-computed baselines, then download and overlay every run coloured by scenario. No job submission or Pro credits required.

python
import matplotlib.pyplot as plt
from simudyne import PulseABM

client = PulseABM()  # reads SIMUDYNE_API_KEY from environment

COLORS  = {"normal": "steelblue", "flash_crash": "crimson"}
LABELS  = {"normal": "Normal",    "flash_crash": "Flash Crash"}

cached = client.simulation.list_cached(symbol="700.HK", date="2025-09-01")

# one entry per scenario — pick the variant with the most runs
by_scenario = {}
for s in cached["simulations"]:
    sc = s["scenario"]
    if sc not in by_scenario or s["n_runs"] > by_scenario[sc]["n_runs"]:
        by_scenario[sc] = s

fig, ax = plt.subplots(figsize=(14, 5))

for sim in by_scenario.values():
    scenario = sim["scenario"]
    if scenario not in LABELS:
        continue
    base  = sim["example_sim_id"].rsplit(":", 1)[0]
    first = True
    for i in range(sim["n_runs"]):
        print(f"\r  {LABELS[scenario]}  {i + 1}/{sim['n_runs']}", end="", flush=True)
        try:
            df = client.simulation.get_sim_data(f"{base}:{i:04d}", "mid_price_by_min.parquet")
        except Exception:
            continue
        ax.plot(df["time"].to_list(), df["mid_price"].to_list(),
                color=COLORS[scenario], linewidth=0.6, alpha=0.35,
                label=LABELS[scenario] if first else "_nolegend_")
        first = False
    print()

ax.set_title("700.HK  2025-09-01 — Monte Carlo paths (mid price by minute)")
ax.set_xlabel("Time")
ax.set_ylabel("Mid Price")
ax.legend(title="Scenario")
fig.autofmt_xdate()
plt.tight_layout()
plt.savefig("mc_paths_700HK_20250901.png", dpi=150)
plt.show()

Bulk download and read without saving to diskFree

Pull a set of simulations as a ZIP archive and read every parquet file directly into Polars — no intermediate files written.

python
import io, zipfile, polars as pl

cached   = client.simulation.list_cached(symbol="700.HK")
sim_ids  = [s["example_sim_id"] for s in cached["simulations"]]

zip_bytes = client.simulation.get_bulk_data(
    sim_ids=sim_ids,
    include_sim_data=True,
    include_mid_price=True,
)

frames: dict[str, pl.DataFrame] = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if name.endswith(".parquet"):
            frames[name] = pl.read_parquet(io.BytesIO(zf.read(name)))

print(f"Loaded {len(frames)} files")
for name, df in frames.items():
    print(f"  {name}: {df.shape}")