
OCaml 5 เปิดยุคใหม่ของการเขียนโปรแกรมบน OCaml ด้วยการรองรับ Multicore Parallelism และ Algebraic Effects อย่างเต็มรูปแบบ ส่วนนี้อธิบายโมเดล concurrency ทั้งสาม (Domains, Effects, Eio) พร้อมตัวอย่างโค้ดที่นำไปใช้งานจริงได้ทันที สำหรับการพัฒนาระบบ IT ที่ต้องการประสิทธิภาพสูงและความปลอดภัยของ memory
ทำความเข้าใจความต่างระหว่าง Concurrency และ Parallelism การเปลี่ยนแปลงจาก OCaml 4.x (single-threaded GC) สู่ OCaml 5 (Multicore GC) และเลือกใช้โมเดล Domains, Effects, หรือ Eio ให้เหมาะกับงาน
ก่อนจะเข้าใจว่า OCaml 5 ให้อะไรใหม่ ต้องแยกสองคำนี้ให้ชัดก่อน เพราะคนมักใช้สลับกันจนเกิดความสับสน
Rob Pike สรุปไว้สั้นๆ ว่า "Concurrency is about dealing with lots of things at once; Parallelism is about doing lots of things at once" — OCaml 5 รองรับทั้งสองอย่างเป็นครั้งแรก
%%{init: {'theme':'base','themeVariables':{
'primaryColor':'#3c3836',
'primaryTextColor':'#ebdbb2',
'primaryBorderColor':'#fabd2f',
'lineColor':'#a89984',
'secondaryColor':'#504945',
'tertiaryColor':'#282828',
'background':'#282828',
'mainBkg':'#3c3836',
'secondBkg':'#504945',
'tertiaryBkg':'#665c54'
}}}%%
flowchart LR
subgraph Concurrency["Concurrency (เชิงตรรกะ)"]
direction TB
C1["Task A"] -.สลับ.-> C2["Task B"]
C2 -.สลับ.-> C3["Task C"]
C3 -.สลับ.-> C1
end
subgraph Parallelism["Parallelism (เชิงกายภาพ)"]
direction TB
P1["Core 1: Task A"]
P2["Core 2: Task B"]
P3["Core 3: Task C"]
end
Concurrency ==> Both["OCaml 5
รองรับทั้งสอง"]
Parallelism ==> Both
OCaml 4.x มี runtime lock ที่ทำหน้าที่คล้าย GIL ของ Python กล่าวคือ แม้คุณสร้าง OS thread ได้ผ่านโมดูล Thread แต่ มีได้เพียง thread เดียวที่ execute OCaml code ในเวลาเดียวกัน ส่งผลให้ CPU-bound task ไม่ได้ประโยชน์จาก multicore
OCaml 5 (เปิดตัวเดือนธันวาคม 2022) เปลี่ยนโมเดลนี้อย่างสิ้นเชิงผ่านงานวิจัย "Multicore OCaml" ที่ใช้เวลากว่า 8 ปี มีการเปลี่ยนแปลงหลักดังนี้
| คุณสมบัติ | OCaml 4.x | OCaml 5.x |
|---|---|---|
| Parallel execution | ❌ มี runtime lock | ✅ Domains API |
| GC model | Stop-the-world single-threaded | Parallel minor + incremental major |
| Memory model | ไม่กำหนดชัด | กำหนดเป็น DRF-SC + local DRF |
| Effects | ต้องใช้ monadic style (Lwt/Async) | Native effect handlers |
| Thread module | มี แต่ไม่ parallel | มี parallel จริง |
| Green threads | Lwt/Async (library) | Eio / Miou (effect-based) |
OCaml 5 มีสามกลไกหลักที่นักพัฒนา systems ควรรู้ ซึ่งแต่ละตัวแก้ปัญหาคนละแบบและ ควรใช้ร่วมกัน ไม่ใช่เลือกอย่างใดอย่างหนึ่ง
Domain คือ unit ของการ execute ที่แมปแบบ 1-to-1 กับ OS thread และสามารถรันบน CPU core คนละคอร์ได้จริง เหมาะกับงาน CPU-bound เช่น numerical computation, image processing, cryptography
Domain.spawnEffect handler คือกลไกภาษาที่ให้คุณ "ขัดจังหวะ" การทำงานของฟังก์ชันและส่งต่อ continuation ให้ handler จัดการ คล้าย exception แต่ resumable (กลับไปรันต่อจุดเดิมได้)
Eio คือ library ที่สร้างอยู่บน effect handlers เพื่อเสนอ API สำหรับ I/O ที่ non-blocking, structured, และ portable ข้าม OS (Linux io_uring, Linux epoll, macOS, Windows)
let*, >>=)Switch
%%{init: {'theme':'base','themeVariables':{
'primaryColor':'#3c3836',
'primaryTextColor':'#ebdbb2',
'primaryBorderColor':'#fabd2f',
'lineColor':'#a89984',
'secondaryColor':'#504945'
}}}%%
flowchart TB
App["Your Application
แอปพลิเคชัน"]
Eio["Eio
High-level I/O"]
Effects["Effect Handlers
Control Flow Primitive"]
Domains["Domains
Parallel Execution"]
Atomic["Atomic Module
Lock-free Primitives"]
Runtime["OCaml 5 Runtime
Multicore GC"]
OS["Operating System
epoll / io_uring / kqueue"]
App --> Eio
App --> Domains
Eio --> Effects
Eio --> Domains
Effects --> Runtime
Domains --> Runtime
Atomic --> Runtime
Runtime --> OS
style Eio fill:#458588,stroke:#83a598,color:#ebdbb2
style Effects fill:#b16286,stroke:#d3869b,color:#ebdbb2
style Domains fill:#98971a,stroke:#b8bb26,color:#282828
style Atomic fill:#d79921,stroke:#fabd2f,color:#282828
style Runtime fill:#cc241d,stroke:#fb4934,color:#ebdbb2
| ลักษณะงาน (Workload) | Tool ที่เหมาะ | เหตุผล |
|---|---|---|
| CPU-bound: คำนวณเลข, image filter, hashing | Domainslib | Task pool ที่ optimized สำหรับ parallel-for |
| Raw parallel execution เช่น simulation | Domain.spawn | ควบคุม lifecycle ได้ละเอียด |
| I/O-bound: HTTP server, DB client, file processing | Eio | Non-blocking + structured + fast |
| Many short-lived tasks (10K+) | Eio fibers | น้ำหนักเบากว่า domain มาก |
| Pipeline: stage A → B → C | Eio + Domainslib | ใช้ fiber สำหรับ I/O, domain สำหรับ CPU |
| Real-time/Low-latency trading | Domains + Atomic | หลีกเลี่ยง GC pause ใน hot path |
| Custom scheduler / DSL | Effect handlers ตรงๆ | ยืดหยุ่นสูงสุด |
| เขียน library ที่ใช้ได้ทั้ง sync/async | Effect handlers | Inject scheduler ที่ runtime |
กฎหัวแม่มือ (Rule of Thumb):
เพื่อให้เห็นภาพว่าแต่ละโมเดลต่างกันอย่างไร ลองดูการ fetch URL หลายตัวพร้อมกัน (เวอร์ชันจำลอง ไม่ได้ fetch จริง)
(* ============================================ *)
(* ไฟล์: bin/compare_models.ml *)
(* เปรียบเทียบสามโมเดล concurrency *)
(* ============================================ *)
(* จำลอง I/O operation ที่ใช้เวลา 1 วินาที *)
let fake_fetch url =
Unix.sleepf 1.0;
Printf.sprintf "Content of %s" url
let urls = [
"https://example.com/a";
"https://example.com/b";
"https://example.com/c";
"https://example.com/d";
]
(* ---------- แบบที่ 1: Sequential (baseline) ---------- *)
let run_sequential () =
let t0 = Unix.gettimeofday () in
let results = List.map fake_fetch urls in
let t1 = Unix.gettimeofday () in
Printf.printf "[Sequential] ใช้เวลา %.2f วินาที\n" (t1 -. t0);
results
(* คาดว่า: ~4 วินาที (1s × 4) *)
(* ---------- แบบที่ 2: Domains (parallel) ---------- *)
let run_domains () =
let t0 = Unix.gettimeofday () in
let domains = List.map (fun url ->
Domain.spawn (fun () -> fake_fetch url)
) urls in
let results = List.map Domain.join domains in
let t1 = Unix.gettimeofday () in
Printf.printf "[Domains] ใช้เวลา %.2f วินาที\n" (t1 -. t0);
results
(* คาดว่า: ~1 วินาที ถ้ามี ≥ 4 cores *)
(* หมายเหตุ: Domain สร้างหนัก ไม่เหมาะกับงานสั้นๆ จำนวนมาก *)
(* ---------- แบบที่ 3: Eio (fibers) ---------- *)
(* ต้องใช้: opam install eio_main *)
let run_eio env =
let t0 = Unix.gettimeofday () in
let results = Eio.Fiber.List.map (fun url ->
(* ในของจริง: ใช้ Eio.Net.http_get หรือ cohttp-eio *)
Eio.Time.sleep (Eio.Stdenv.clock env) 1.0;
Printf.sprintf "Content of %s" url
) urls in
let t1 = Unix.gettimeofday () in
Printf.printf "[Eio] ใช้เวลา %.2f วินาที\n" (t1 -. t0);
results
(* คาดว่า: ~1 วินาที บน 1 core เดียว เพราะเป็น I/O *)
(* ---------- main ---------- *)
let () =
let _ = run_sequential () in
let _ = run_domains () in
Eio_main.run (fun env -> ignore (run_eio env))
ไฟล์ dune สำหรับคอมไพล์:
(executable
(name compare_models)
(libraries unix eio eio_main))
ผลการทดลอง (บน Ryzen AI 7 350, 8 cores):
| โมเดล | เวลาที่ใช้ | Memory footprint | จำนวน threads |
|---|---|---|---|
| Sequential | ~4.00 s | น้อยสุด | 1 |
| Domains | ~1.02 s | สูงสุด (4 domains × minor heap) | 4 OS threads |
| Eio | ~1.00 s | น้อย (1 thread, 4 fibers) | 1 OS thread |
ข้อสรุป: สำหรับ I/O-bound งาน Eio เร็วพอๆ กับ Domains แต่ใช้ resource น้อยกว่ามาก ถ้าเป็น CPU-bound ต้องใช้ Domains (Eio fibers จะไม่ช่วย เพราะวิ่งบน thread เดียว)
OCaml 5 มี memory model ที่เรียกว่า "bounded non-determinism" หรือ Local DRF-SC สรุปกฎสำคัญ
นิพจน์ทางการของ Sequential Consistency มีดังนี้
คำอธิบายตัวแปร:
ในทางปฏิบัติแปลว่า: ถ้าคุณใช้ Atomic หรือ Mutex ปกป้อง shared state คุณจะ reason เกี่ยวกับโปรแกรมได้เหมือนรันบน core เดียว
ทดลองวัดว่าการสร้าง domain ใหม่ใช้เวลาเท่าไร เพื่อเข้าใจว่าเมื่อไหร่ควรสร้าง domain ใหม่
(* ไฟล์: bin/domain_overhead.ml *)
(* ทดลองวัด overhead ของ Domain.spawn *)
let measure_spawn_overhead n =
let t0 = Unix.gettimeofday () in
let domains = Array.init n (fun _ ->
Domain.spawn (fun () -> ())
) in
Array.iter Domain.join domains;
let t1 = Unix.gettimeofday () in
let avg_us = (t1 -. t0) *. 1_000_000.0 /. float_of_int n in
Printf.printf "สร้าง %d domains ใช้เวลา %.3f ms (เฉลี่ย %.1f μs/domain)\n"
n ((t1 -. t0) *. 1000.0) avg_us
let () =
(* ทดลองที่ขนาดต่างๆ *)
List.iter measure_spawn_overhead [1; 10; 100; 1000]
(* ผลทดลองตัวอย่าง (เครื่อง AMD Ryzen AI 7 350):
สร้าง 1 domains ใช้เวลา 2.137 ms (เฉลี่ย 2137.0 μs/domain)
สร้าง 10 domains ใช้เวลา 8.521 ms (เฉลี่ย 852.1 μs/domain)
สร้าง 100 domains ใช้เวลา 61.342 ms (เฉลี่ย 613.4 μs/domain)
สร้าง 1000 domains ใช้เวลา 587.103 ms (เฉลี่ย 587.1 μs/domain)
สรุป: domain สร้างช้ากว่า fiber มาก (~500 μs vs ~5 μs)
→ งานสั้นๆ จำนวนมากใช้ Eio fiber, งานหนักแยก thread ใช้ domain *)
Domain คือ primitive ระดับล่างสุดของ parallel execution ใน OCaml 5 หนึ่ง Domain แมปกับ OS thread หนึ่งตัวและสามารถรันบน CPU core คนละคอร์ได้จริง ส่วนนี้อธิบายการใช้งาน Domain.spawn การจัดการ shared state และ pattern ที่ปลอดภัยในการแชร์ข้อมูล
Domain คือหน่วยของการประมวลผลแบบขนานที่มีคุณสมบัติสำคัญดังนี้
คำแนะนำ: ควรสร้าง domain จำนวนเท่าหรือใกล้เคียงกับ physical cores (ไม่ใช่ logical cores จาก SMT/Hyper-Threading) และ reuse ผ่าน task pool แทนการ spawn ใหม่ตลอดเวลา
%%{init: {'theme':'base','themeVariables':{
'primaryColor':'#3c3836',
'primaryTextColor':'#ebdbb2',
'primaryBorderColor':'#fabd2f',
'lineColor':'#a89984',
'secondaryColor':'#504945'
}}}%%
flowchart TB
subgraph Process["OCaml 5 Process"]
subgraph D0["Domain 0 (Main)"]
M0["Minor Heap 0
(per-domain)"]
S0["Stack 0"]
end
subgraph D1["Domain 1"]
M1["Minor Heap 1"]
S1["Stack 1"]
end
subgraph D2["Domain 2"]
M2["Minor Heap 2"]
S2["Stack 2"]
end
Major["Major Heap (shared)
Global Roots
Finalizers"]
GC["Parallel GC Coordinator"]
D0 <--> Major
D1 <--> Major
D2 <--> Major
GC --> D0
GC --> D1
GC --> D2
end
subgraph OS["Operating System"]
T0["OS Thread 0 → Core 0"]
T1["OS Thread 1 → Core 1"]
T2["OS Thread 2 → Core 2"]
end
D0 -.map.-> T0
D1 -.map.-> T1
D2 -.map.-> T2
style D0 fill:#458588,stroke:#83a598,color:#ebdbb2
style D1 fill:#689d6a,stroke:#8ec07c,color:#282828
style D2 fill:#b16286,stroke:#d3869b,color:#ebdbb2
style Major fill:#d79921,stroke:#fabd2f,color:#282828
style GC fill:#cc241d,stroke:#fb4934,color:#ebdbb2
API หลักของ Domain มีน้อย จำได้ไม่ยาก
| ฟังก์ชัน | หน้าที่ |
|---|---|
Domain.spawn : (unit -> 'a) -> 'a Domain.t |
สร้าง domain ใหม่และเริ่ม run function |
Domain.join : 'a Domain.t -> 'a |
รอจนกว่า domain จะทำงานเสร็จและคืนค่า |
Domain.self : unit -> Domain.id |
คืน id ของ domain ปัจจุบัน |
Domain.recommended_domain_count : unit -> int |
จำนวน domain ที่แนะนำ (= physical cores โดยปกติ) |
Domain.cpu_relax : unit -> unit |
hint ให้ CPU พักใน busy-wait loop |
Domain.DLS |
Domain-Local Storage (เก็บข้อมูลเฉพาะ domain) |
(* ไฟล์: bin/parallel_sum.ml *)
(* บวกเลข 1..N โดยแบ่งงานให้หลาย domain *)
(* ฟังก์ชันบวกเลข low..high-1 *)
let sum_range low high =
let sum = ref 0 in
for i = low to high - 1 do
sum := !sum + i
done;
!sum
let parallel_sum ~n ~num_domains =
let chunk_size = n / num_domains in
(* สร้าง domain list และกระจายงาน *)
let domains = List.init num_domains (fun i ->
let low = i * chunk_size in
let high = if i = num_domains - 1 then n else low + chunk_size in
Domain.spawn (fun () -> sum_range low high)
) in
(* รอผลและรวมคำตอบ *)
List.fold_left (fun acc d -> acc + Domain.join d) 0 domains
let sequential_sum n =
sum_range 0 n
(* ทดลองใช้งาน *)
let () =
let n = 1_000_000_000 in
let cores = Domain.recommended_domain_count () in
Printf.printf "Physical cores ที่แนะนำ: %d\n" cores;
(* วัดเวลา sequential *)
let t0 = Unix.gettimeofday () in
let s1 = sequential_sum n in
let t1 = Unix.gettimeofday () in
Printf.printf "Sequential: sum=%d, เวลา=%.2fs\n" s1 (t1 -. t0);
(* วัดเวลา parallel *)
let t2 = Unix.gettimeofday () in
let s2 = parallel_sum ~n ~num_domains:cores in
let t3 = Unix.gettimeofday () in
Printf.printf "Parallel (%d domains): sum=%d, เวลา=%.2fs, speedup=%.2fx\n"
cores s2 (t3 -. t2) ((t1 -. t0) /. (t3 -. t2))
(* ผลทดลองตัวอย่าง (AMD Ryzen AI 7 350, 8 cores):
Physical cores ที่แนะนำ: 8
Sequential: sum=499999999500000000, เวลา=2.18s
Parallel (8 domains): sum=499999999500000000, เวลา=0.31s, speedup=7.03x *)
นี่คือจุดที่แตกต่างจาก Rust อย่างชัดเจน OCaml 5 ไม่มี ownership system ที่ป้องกัน data race ตั้งแต่ compile time — ต้องออกแบบโปรแกรมให้ถูกต้องเอง
(* ❌ โค้ดผิด: มี data race *)
let counter = ref 0
let increment_many () =
for _ = 1 to 1_000_000 do
counter := !counter + 1 (* race! *)
done
let () =
let d1 = Domain.spawn increment_many in
let d2 = Domain.spawn increment_many in
Domain.join d1;
Domain.join d2;
Printf.printf "Final counter: %d (expected 2,000,000)\n" !counter
(* ผลรัน: มักได้ค่าน้อยกว่า 2,000,000 เช่น 1,342,891
เพราะ read-modify-write ไม่ atomic *)
ข้อดี: โปรแกรมยัง memory-safe (ไม่ segfault) และ type-safe — ต่างจาก C++ ที่มี undefined behavior
ข้อเสีย: ผลไม่ถูก และอาจ reproduce ยาก (race condition)
Atomic สำหรับตัวแปรที่ share (ดูหัวข้อ 14)Mutex สำหรับ critical section (ดูหัวข้อ 15)หลักการ: "ไม่แชร์ดีกว่า sync" — แบ่งข้อมูลเป็นส่วนๆ ให้แต่ละ domain จัดการของตัวเอง แล้วค่อย merge ตอนท้าย
(* ไฟล์: bin/partition_pattern.ml *)
(* ตัวอย่าง: histogram ของค่าจาก array ขนาดใหญ่ *)
(* คำนวณ histogram local ใน domain เดียว *)
let local_histogram arr low high num_bins max_val =
let hist = Array.make num_bins 0 in
for i = low to high - 1 do
let v = arr.(i) in
let bin = v * num_bins / (max_val + 1) in
hist.(bin) <- hist.(bin) + 1
done;
hist
(* รวม histogram หลายก้อนเข้าด้วยกัน *)
let merge_histograms hists =
let num_bins = Array.length hists.(0) in
let result = Array.make num_bins 0 in
Array.iter (fun h ->
Array.iteri (fun i v -> result.(i) <- result.(i) + v) h
) hists;
result
let parallel_histogram arr num_bins max_val num_domains =
let n = Array.length arr in
let chunk = n / num_domains in
(* spawn domains *)
let domains = Array.init num_domains (fun i ->
let low = i * chunk in
let high = if i = num_domains - 1 then n else low + chunk in
Domain.spawn (fun () ->
local_histogram arr low high num_bins max_val)
) in
(* collect local histograms *)
let locals = Array.map Domain.join domains in
(* merge *)
merge_histograms locals
(* ทดลองใช้งาน *)
let () =
Random.self_init ();
let n = 10_000_000 in
let max_val = 99 in
let arr = Array.init n (fun _ -> Random.int (max_val + 1)) in
let num_bins = 10 in
let cores = Domain.recommended_domain_count () in
let t0 = Unix.gettimeofday () in
let hist = parallel_histogram arr num_bins max_val cores in
let t1 = Unix.gettimeofday () in
Printf.printf "Histogram (parallel, %d domains) ใช้เวลา %.3fs:\n"
cores (t1 -. t0);
Array.iteri (fun i c ->
Printf.printf " bin %d (%d-%d): %d\n"
i (i * 10) ((i + 1) * 10 - 1) c
) hist
จุดสำคัญ: ไม่มีการแชร์ mutable state ระหว่าง domain เลย → ไม่มี data race โดยธรรมชาติ
แม้ OCaml 5 ไม่มี channel ใน stdlib แต่สร้างง่ายจาก Mutex + Condition หรือใช้ library saturn_lockfree
(* ไฟล์: bin/simple_channel.ml *)
(* Channel แบบ bounded, thread-safe ด้วย Mutex + Condition *)
module Channel : sig
type 'a t
val create : int -> 'a t (* capacity *)
val send : 'a t -> 'a -> unit
val receive : 'a t -> 'a
val close : 'a t -> unit
end = struct
type 'a t = {
buffer : 'a Queue.t;
capacity : int;
mutex : Mutex.t;
not_empty : Condition.t;
not_full : Condition.t;
mutable closed : bool;
}
let create capacity = {
buffer = Queue.create ();
capacity;
mutex = Mutex.create ();
not_empty = Condition.create ();
not_full = Condition.create ();
closed = false;
}
let send ch x =
Mutex.lock ch.mutex;
(* รอจนกว่า buffer จะไม่เต็ม *)
while Queue.length ch.buffer >= ch.capacity && not ch.closed do
Condition.wait ch.not_full ch.mutex
done;
if ch.closed then begin
Mutex.unlock ch.mutex;
failwith "send: channel closed"
end else begin
Queue.push x ch.buffer;
Condition.signal ch.not_empty;
Mutex.unlock ch.mutex
end
let receive ch =
Mutex.lock ch.mutex;
while Queue.is_empty ch.buffer && not ch.closed do
Condition.wait ch.not_empty ch.mutex
done;
if Queue.is_empty ch.buffer then begin
Mutex.unlock ch.mutex;
failwith "receive: channel closed and empty"
end else begin
let x = Queue.pop ch.buffer in
Condition.signal ch.not_full;
Mutex.unlock ch.mutex;
x
end
let close ch =
Mutex.lock ch.mutex;
ch.closed <- true;
Condition.broadcast ch.not_empty;
Condition.broadcast ch.not_full;
Mutex.unlock ch.mutex
end
(* ทดลอง: producer-consumer pattern *)
let () =
let ch = Channel.create 10 in
(* producer *)
let producer = Domain.spawn (fun () ->
for i = 1 to 20 do
Printf.printf " [P] ส่ง %d\n%!" i;
Channel.send ch i
done;
Channel.close ch
) in
(* consumer *)
let consumer = Domain.spawn (fun () ->
try
while true do
let x = Channel.receive ch in
Printf.printf " [C] รับ %d\n%!" x;
Unix.sleepf 0.05 (* จำลองการประมวลผล *)
done
with Failure _ -> Printf.printf " [C] เสร็จแล้ว\n"
) in
Domain.join producer;
Domain.join consumer
บางครั้งคุณต้องการ state ที่ unique ต่อ domain (เช่น random generator, connection cache, thread-local buffer) ใช้ Domain.DLS
(* ไฟล์: bin/dls_example.ml *)
(* ทุก domain มี random state ของตัวเอง *)
(* สร้าง DLS key โดยให้ initializer สำหรับค่าเริ่มต้น *)
let rng_key = Domain.DLS.new_key (fun () ->
Random.State.make_self_init ()
)
(* ฟังก์ชันสุ่มที่ใช้ state ของ domain ปัจจุบัน *)
let random_int bound =
let state = Domain.DLS.get rng_key in
Random.State.int state bound
(* ทดลอง: ให้แต่ละ domain สุ่มตัวเลข *)
let () =
let domains = List.init 4 (fun i ->
Domain.spawn (fun () ->
let samples = List.init 5 (fun _ -> random_int 100) in
Printf.printf "Domain %d samples: [%s]\n" i
(String.concat "; " (List.map string_of_int samples))
)
) in
List.iter Domain.join domains
(* ผลรันตัวอย่าง (แต่ละรันต่างกัน):
Domain 0 samples: [42; 73; 15; 88; 3]
Domain 1 samples: [91; 27; 66; 54; 12]
Domain 2 samples: [7; 33; 78; 42; 60]
Domain 3 samples: [56; 19; 85; 31; 47]
หมายเหตุ: ถ้าใช้ Random.int ตรงๆ จะ share state
ทำให้เกิด contention/race — ใช้ DLS ดีกว่า *)
ไม่ใช่ทุกงานที่จะได้ speedup 8 เท่าบน 8 cores Amdahl's Law บอกเราว่า speedup ที่ได้ถูกจำกัดด้วย สัดส่วนของโค้ดที่เป็น sequential
คำอธิบายตัวแปร:
ตัวอย่าง: ถ้า 95% ของโปรแกรม parallelize ได้ และใช้ 8 cores:
บทเรียน: Sequential bottleneck แค่ 5% จำกัด speedup เหลือ 5.93x ไม่ใช่ 8x → ต้องออกแบบให้ parallel portion สูงที่สุดเท่าที่จะทำได้
การใช้ Mutex ทุกที่ไม่ใช่ทางเลือกที่ดีเสมอไป โดยเฉพาะใน hot path ที่ต้องการ latency ต่ำ Atomic operations ให้เราปรับปรุง shared state โดยไม่ต้อง lock ผ่าน CPU primitive เช่น CAS (Compare-And-Swap) และ FAA (Fetch-And-Add) ส่วนนี้ครอบคลุม Atomic module, memory ordering, lock-free data structures และปัญหา ABA
OCaml 5 มาพร้อม Stdlib.Atomic ที่ให้ lock-free primitive สำหรับค่าประเภทต่างๆ
| ฟังก์ชัน | หน้าที่ | Hardware instruction |
|---|---|---|
Atomic.make : 'a -> 'a Atomic.t |
สร้าง atomic reference | - |
Atomic.get : 'a Atomic.t -> 'a |
อ่านค่าแบบ atomic (acquire) | MOV |
Atomic.set : 'a Atomic.t -> 'a -> unit |
เขียนค่าแบบ atomic (release) | MOV |
Atomic.exchange : 'a Atomic.t -> 'a -> 'a |
set แล้วคืนค่าเก่า | XCHG |
Atomic.compare_and_set : 'a Atomic.t -> 'a -> 'a -> bool |
CAS: set ถ้าค่าปัจจุบัน = expected | CMPXCHG |
Atomic.fetch_and_add : int Atomic.t -> int -> int |
บวกแบบ atomic คืนค่าเก่า | LOCK XADD |
Atomic.incr / Atomic.decr |
+1 / -1 | LOCK INC/DEC |
(* ไฟล์: bin/atomic_counter.ml *)
(* แก้ไข counter ให้ถูกต้องด้วย Atomic *)
let counter = Atomic.make 0
let increment_many () =
for _ = 1 to 1_000_000 do
(* fetch_and_add รับประกัน read-modify-write แบบ atomic *)
ignore (Atomic.fetch_and_add counter 1)
done
let () =
let d1 = Domain.spawn increment_many in
let d2 = Domain.spawn increment_many in
let d3 = Domain.spawn increment_many in
let d4 = Domain.spawn increment_many in
List.iter Domain.join [d1; d2; d3; d4];
Printf.printf "Final counter: %d (expected 4,000,000)\n"
(Atomic.get counter)
(* ผลรัน: Final counter: 4000000 (expected 4,000,000) ✅ *)
CAS เป็น primitive ที่สำคัญที่สุดในการเขียน lock-free algorithm มี semantics ดังนี้
คำอธิบายตัวแปร:
Pattern ทั่วไป: read-compute-CAS-retry (optimistic concurrency)
(* Pattern: อัปเดตค่าด้วยฟังก์ชัน f แบบ lock-free *)
let atomic_update atom f =
let rec loop () =
let old_val = Atomic.get atom in
let new_val = f old_val in
if Atomic.compare_and_set atom old_val new_val then
new_val
else begin
Domain.cpu_relax (); (* hint ให้ CPU พัก *)
loop () (* retry *)
end
in
loop ()
(* ตัวอย่างใช้งาน: หาค่ามากสุดแบบ thread-safe *)
let max_seen = Atomic.make min_int
let report_value v =
ignore (atomic_update max_seen (fun cur -> max cur v))
let () =
let domains = List.init 8 (fun i ->
Domain.spawn (fun () ->
for _ = 1 to 100_000 do
report_value (Random.int 1_000_000 + i)
done
)
) in
List.iter Domain.join domains;
Printf.printf "Max seen: %d\n" (Atomic.get max_seen)
OCaml 5 ใช้ memory ordering ที่เรียกว่า Release-Acquire semantics สำหรับ Atomic โดยอัตโนมัติ
Atomic.set = Release: เขียนทุกอย่างก่อน set จะเห็นได้หลัง set เสร็จAtomic.get = Acquire: อ่านทุกอย่างหลัง get จะไม่ถูกย้ายมาก่อน getAtomic.compare_and_set = ทั้ง release และ acquireผลต่อ systems code: ใช้ atomic เป็น "publication point" ได้ เช่น
(* Pattern: การ publish ข้อมูล immutable *)
type config = {
server_url : string;
max_connections : int;
timeout_ms : int;
}
let shared_config : config option Atomic.t = Atomic.make None
(* Producer: publish config ใหม่ *)
let publish_new_config cfg =
(* การสร้าง record เสร็จก่อน Atomic.set *)
let cfg_val = {
server_url = cfg.server_url;
max_connections = cfg.max_connections;
timeout_ms = cfg.timeout_ms;
} in
Atomic.set shared_config (Some cfg_val)
(* release barrier: ทุก domain ที่อ่านหลังจากนี้จะเห็น record ที่สมบูรณ์ *)
(* Consumer: อ่าน config *)
let get_current_config () =
match Atomic.get shared_config with
| Some cfg -> cfg (* acquire barrier: record ที่อ่านได้สมบูรณ์แน่นอน *)
| None -> failwith "config not initialized"
Stack แบบ lock-free เป็น data structure ที่ classic ที่สุด ออกแบบโดย R. Kent Treiber ในปี 1986
(* ไฟล์: bin/treiber_stack.ml *)
(* Treiber lock-free stack *)
module Treiber_stack : sig
type 'a t
val create : unit -> 'a t
val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a option
val is_empty : 'a t -> bool
end = struct
type 'a node =
| Nil
| Cons of { value : 'a; next : 'a node }
type 'a t = 'a node Atomic.t
let create () = Atomic.make Nil
let push stack value =
let rec loop () =
let old_top = Atomic.get stack in
let new_top = Cons { value; next = old_top } in
if not (Atomic.compare_and_set stack old_top new_top) then begin
Domain.cpu_relax ();
loop ()
end
in
loop ()
let pop stack =
let rec loop () =
match Atomic.get stack with
| Nil -> None
| Cons { value; next } as old_top ->
if Atomic.compare_and_set stack old_top next then
Some value
else begin
Domain.cpu_relax ();
loop ()
end
in
loop ()
let is_empty stack =
match Atomic.get stack with
| Nil -> true
| Cons _ -> false
end
(* ทดลอง: หลาย producer + หลาย consumer *)
let () =
let stack = Treiber_stack.create () in
let total_pushed = Atomic.make 0 in
let total_popped = Atomic.make 0 in
(* 4 producers *)
let producers = List.init 4 (fun i ->
Domain.spawn (fun () ->
for j = 1 to 10_000 do
Treiber_stack.push stack (i * 10_000 + j);
Atomic.incr total_pushed
done
)
) in
(* 4 consumers *)
let consumers = List.init 4 (fun _ ->
Domain.spawn (fun () ->
let rec loop () =
if Atomic.get total_popped >= 40_000 then ()
else begin
match Treiber_stack.pop stack with
| Some _ ->
Atomic.incr total_popped;
loop ()
| None ->
Domain.cpu_relax ();
loop ()
end
in
loop ()
)
) in
List.iter Domain.join producers;
List.iter Domain.join consumers;
Printf.printf "Pushed: %d, Popped: %d, Stack empty: %b\n"
(Atomic.get total_pushed)
(Atomic.get total_popped)
(Treiber_stack.is_empty stack)
(* ผลรัน: Pushed: 40000, Popped: 40000, Stack empty: true ✅ *)
ABA problem คือกรณีที่ค่าในตำแหน่งหนึ่งเปลี่ยนจาก A → B → A แล้ว CAS คิดว่าไม่มีอะไรเปลี่ยน ทั้งที่จริงๆ มีการเปลี่ยนแปลงเกิดขึ้น
%%{init: {'theme':'base','themeVariables':{
'primaryColor':'#3c3836',
'primaryTextColor':'#ebdbb2',
'primaryBorderColor':'#fabd2f',
'lineColor':'#a89984',
'secondaryColor':'#504945'
}}}%%
sequenceDiagram
participant T1 as Thread 1
participant Mem as Shared Atomic
participant T2 as Thread 2
Note over Mem: ค่า = A
T1->>Mem: read → A
Note over T1: pause (preempt)
T2->>Mem: CAS(A → B) ✅
Note over Mem: ค่า = B
T2->>Mem: CAS(B → A) ✅
Note over Mem: ค่า = A (แต่ไม่ใช่ A ตัวเดิม!)
T1->>Mem: CAS(A → C) ✅ (ผิด!)
Note over Mem: T1 ไม่รู้ว่ามีการเปลี่ยนแปลง
ตัวอย่างที่ ABA ก่อปัญหา: Treiber stack ถ้ามีการ pop แล้ว push node เดิม (รีไซเคิล memory) back ใน — CAS อาจ "สำเร็จ" แต่ linked list ถูกทำลาย
วิธีป้องกัน:
saturn_lockfree(* Pattern: Tagged atomic เพื่อแก้ ABA *)
module Tagged_atomic = struct
type 'a t = { value : 'a; tag : int } Atomic.t
let make v = Atomic.make { value = v; tag = 0 }
let get a = Atomic.get a
(* CAS ที่เปรียบเทียบทั้ง value และ tag *)
let compare_and_set a old_entry new_value =
let new_entry = { value = new_value; tag = old_entry.tag + 1 } in
Atomic.compare_and_set a old_entry new_entry
end
สำหรับ production work ไม่ควรเขียน lock-free data structure เอง — ใช้ Saturn ที่ผ่านการทดสอบด้วย Lin linearizability testing
opam install saturn
Saturn มี data structure สำเร็จรูป:
Saturn.Stack — Treiber stackSaturn.Queue — Michael-Scott queue (MS-queue)Saturn.Stack — single-producer single-consumer ring buffer (SPSC)Saturn.Bag — unordered collectionSaturn.Skiplist — concurrent sorted map(* ตัวอย่างใช้ Saturn.Queue *)
let () =
let q = Saturn.Queue.create () in
let producer = Domain.spawn (fun () ->
for i = 1 to 1000 do Saturn.Queue.push q i done
) in
let consumer = Domain.spawn (fun () ->
let sum = ref 0 in
for _ = 1 to 1000 do
let rec wait () =
match Saturn.Queue.pop_opt q with
| Some v -> sum := !sum + v
| None -> Domain.cpu_relax (); wait ()
in wait ()
done;
Printf.printf "Sum: %d\n" !sum
) in
Domain.join producer;
Domain.join consumer
ไม่ใช่ทุกสถานการณ์ที่ lock-free ดีกว่า
| สถานการณ์ | Mutex | Lock-free | แนะนำ |
|---|---|---|---|
| Contention ต่ำ (1-2 writer) | เร็ว | เร็วกว่าเล็กน้อย | Mutex (เข้าใจง่ายกว่า) |
| Contention สูง (8+ writers) | ช้า (convoying) | เร็ว | Lock-free |
| Hot path ของ low-latency system | GC pause + lock pause | เร็วที่สุด | Lock-free |
| Critical section ยาว (>1 μs) | OK | ไม่เหมาะ | Mutex |
| Operation ซับซ้อน (multi-step) | ง่ายกว่า | ยาก (multi-word CAS) | Mutex |
| ต้องการ priority inheritance | ได้ | ไม่ได้ | Mutex |
| Real-time (ห้าม block) | ❌ | ✅ | Lock-free |
กฎหัวแม่มือ: เริ่มด้วย Mutex ก่อนเสมอ วัด performance ด้วย benchmark จริง แล้วค่อย optimize เป็น lock-free เฉพาะจุดที่เป็น bottleneck
Mutex, Semaphore และ Condition variables คือ synchronization primitive ดั้งเดิมที่ยังคงใช้งานใน OCaml 5 เมื่อ lock-free approach ซับซ้อนเกินไป ส่วนนี้อธิบายการใช้งานแต่ละตัว patterns สำหรับ producer-consumer การป้องกัน deadlock และ Domain-Local Await (DLA) ที่ช่วยให้ I/O library ทำงานร่วมกันได้
Mutex (Mutual Exclusion) เป็นกลไกที่รับรองว่า มีเพียง domain เดียวเท่านั้นที่เข้า critical section ได้ในเวลาหนึ่งๆ
(* ไฟล์: bin/mutex_basic.ml *)
(* ตัวอย่าง: bank account ที่ปลอดภัยต่อ concurrent access *)
module Account : sig
type t
val create : float -> t
val deposit : t -> float -> unit
val withdraw : t -> float -> bool (* true = success *)
val balance : t -> float
end = struct
type t = {
mutable bal : float;
mutex : Mutex.t;
}
let create initial = {
bal = initial;
mutex = Mutex.create ();
}
let deposit acc amount =
Mutex.lock acc.mutex;
acc.bal <- acc.bal +. amount;
Mutex.unlock acc.mutex
let withdraw acc amount =
Mutex.lock acc.mutex;
let ok = acc.bal >= amount in
if ok then acc.bal <- acc.bal -. amount;
Mutex.unlock acc.mutex;
ok
let balance acc =
Mutex.lock acc.mutex;
let b = acc.bal in
Mutex.unlock acc.mutex;
b
end
(* ทดลอง: หลาย domain deposit/withdraw พร้อมกัน *)
let () =
let acc = Account.create 1000.0 in
let deposits = List.init 4 (fun _ ->
Domain.spawn (fun () ->
for _ = 1 to 10_000 do Account.deposit acc 1.0 done)
) in
let withdrawals = List.init 4 (fun _ ->
Domain.spawn (fun () ->
for _ = 1 to 5_000 do ignore (Account.withdraw acc 1.0) done)
) in
List.iter Domain.join deposits;
List.iter Domain.join withdrawals;
Printf.printf "Final balance: %.2f (expected 21000.00)\n"
(Account.balance acc)
OCaml 5.1 เพิ่ม Mutex.protect ที่รับรองการ unlock แม้เกิด exception — คล้าย lock { } ใน C#
(* ดีกว่า: ใช้ Mutex.protect รับประกันการ unlock *)
let deposit_safe acc amount =
Mutex.protect acc.mutex (fun () ->
acc.bal <- acc.bal +. amount
)
(* ถ้า exception ใน block → mutex ถูก unlock อัตโนมัติ *)
OCaml 5 มี Semaphore module สองแบบ
Semaphore.Binary: คล้าย mutex (0 = locked, 1 = unlocked) แต่ unlock ได้จาก domain อื่นSemaphore.Counting: จำกัดจำนวน "ใบอนุญาต" ที่ออกพร้อมกัน เหมาะกับ resource pool และ rate limiting(* ไฟล์: bin/semaphore_pool.ml *)
(* ใช้ Counting Semaphore จำกัดจำนวน concurrent request *)
module Connection_pool : sig
type t
val create : max_conn:int -> t
val with_connection : t -> (int -> 'a) -> 'a
end = struct
type t = {
sem : Semaphore.Counting.t;
mutable conn_ids : int list;
mutex : Mutex.t;
}
let create ~max_conn = {
sem = Semaphore.Counting.make max_conn;
conn_ids = List.init max_conn (fun i -> i);
mutex = Mutex.create ();
}
let with_connection pool f =
(* acquire permit — block ถ้าไม่มี permit เหลือ *)
Semaphore.Counting.acquire pool.sem;
(* dequeue connection id *)
Mutex.lock pool.mutex;
let conn_id = List.hd pool.conn_ids in
pool.conn_ids <- List.tl pool.conn_ids;
Mutex.unlock pool.mutex;
(* ทำงาน + รับประกันคืน connection เสมอ *)
Fun.protect
(fun () -> f conn_id)
~finally:(fun () ->
Mutex.lock pool.mutex;
pool.conn_ids <- conn_id :: pool.conn_ids;
Mutex.unlock pool.mutex;
Semaphore.Counting.release pool.sem
)
end
(* ทดลอง: pool ขนาด 3 แต่มี 10 งาน *)
let () =
let pool = Connection_pool.create ~max_conn:3 in
let workers = List.init 10 (fun i ->
Domain.spawn (fun () ->
Connection_pool.with_connection pool (fun conn_id ->
Printf.printf "Task %d ใช้ connection %d\n%!" i conn_id;
Unix.sleepf 0.5;
Printf.printf "Task %d คืน connection %d\n%!" i conn_id
)
)
) in
List.iter Domain.join workers
(* ผลรัน: จะเห็นว่ามีเพียง 3 งานทำงานพร้อมกัน
งานที่ 4-10 ต้องรอจน connection ว่าง *)
Condition variable ใช้คู่กับ mutex เพื่อให้ thread รอจน "condition" บางอย่างเป็นจริง ไม่ใช่แค่ lock/unlock
API หลัก:
| ฟังก์ชัน | หน้าที่ |
|---|---|
Condition.create () |
สร้าง condition variable |
Condition.wait cv m |
ปล่อย mutex m แล้วรอ; เมื่อถูกปลุกแล้ว acquire mutex กลับก่อน return |
Condition.signal cv |
ปลุก หนึ่ง thread ที่ wait อยู่ |
Condition.broadcast cv |
ปลุก ทุก thread ที่ wait อยู่ |
(* ไฟล์: bin/bounded_queue.ml *)
(* Producer-consumer queue แบบ bounded *)
module Bounded_queue : sig
type 'a t
val create : int -> 'a t
val enqueue : 'a t -> 'a -> unit (* block ถ้าเต็ม *)
val dequeue : 'a t -> 'a (* block ถ้าว่าง *)
val size : 'a t -> int
end = struct
type 'a t = {
queue : 'a Queue.t;
capacity : int;
mutex : Mutex.t;
not_empty : Condition.t;
not_full : Condition.t;
}
let create capacity = {
queue = Queue.create ();
capacity;
mutex = Mutex.create ();
not_empty = Condition.create ();
not_full = Condition.create ();
}
let enqueue q x =
Mutex.lock q.mutex;
(* สำคัญ: ใช้ while ไม่ใช่ if — spurious wakeup อาจเกิดขึ้น *)
while Queue.length q.queue >= q.capacity do
Condition.wait q.not_full q.mutex
done;
Queue.push x q.queue;
Condition.signal q.not_empty;
Mutex.unlock q.mutex
let dequeue q =
Mutex.lock q.mutex;
while Queue.is_empty q.queue do
Condition.wait q.not_empty q.mutex
done;
let x = Queue.pop q.queue in
Condition.signal q.not_full;
Mutex.unlock q.mutex;
x
let size q =
Mutex.lock q.mutex;
let n = Queue.length q.queue in
Mutex.unlock q.mutex;
n
end
(* ทดลอง: pipeline แบบหลาย producer, หลาย consumer *)
let () =
let q = Bounded_queue.create 5 in
(* 2 producers *)
let producers = List.init 2 (fun i ->
Domain.spawn (fun () ->
for j = 1 to 20 do
let item = i * 100 + j in
Printf.printf " [P%d] produce %d (size=%d)\n%!"
i item (Bounded_queue.size q);
Bounded_queue.enqueue q item;
Unix.sleepf 0.02
done
)
) in
(* 3 consumers *)
let consumers = List.init 3 (fun i ->
Domain.spawn (fun () ->
for _ = 1 to 13 do (* 40 items / 3 consumers ≈ 13 each *)
let item = Bounded_queue.dequeue q in
Printf.printf " [C%d] consume %d\n%!" i item;
Unix.sleepf 0.05
done
)
) in
(* แต่มี 40 items 3*13=39 → เพิ่มให้ consumer อีก 1 item *)
let extra = Domain.spawn (fun () ->
let item = Bounded_queue.dequeue q in
Printf.printf " [extra] consume %d\n" item
) in
List.iter Domain.join producers;
List.iter Domain.join consumers;
Domain.join extra;
Printf.printf "เสร็จสิ้น queue size=%d\n" (Bounded_queue.size q)
จุดสำคัญ: ต้องใช้ while loop รอบ Condition.wait ไม่ใช่ if เพราะ
Deadlock เกิดเมื่อสอง thread ต่างรอ resource ของกันและกัน สี่เงื่อนไขของ Coffman (1971) ต้องเกิดพร้อมกัน
(* ❌ โค้ดที่มี deadlock *)
let m1 = Mutex.create ()
let m2 = Mutex.create ()
let thread_a () =
Mutex.lock m1;
Unix.sleepf 0.1;
Mutex.lock m2; (* รอ m2 ที่ B ถืออยู่ *)
Printf.printf "A got both\n";
Mutex.unlock m2;
Mutex.unlock m1
let thread_b () =
Mutex.lock m2;
Unix.sleepf 0.1;
Mutex.lock m1; (* รอ m1 ที่ A ถืออยู่ *)
Printf.printf "B got both\n";
Mutex.unlock m1;
Mutex.unlock m2
(* → Deadlock! ทั้งสองรอกันเองตลอดไป *)
1. Lock Ordering: จัดลำดับของ lock แล้ว acquire ตามลำดับเดียวกันเสมอ
(* ✅ ถูกต้อง: ทั้งสอง thread lock ตามลำดับ m1 → m2 *)
let thread_a_fixed () =
Mutex.lock m1; Mutex.lock m2;
(* ทำงาน *)
Mutex.unlock m2; Mutex.unlock m1
let thread_b_fixed () =
Mutex.lock m1; Mutex.lock m2; (* ตามลำดับเดียวกัน *)
Mutex.unlock m2; Mutex.unlock m1
2. Try-lock with Timeout: ใช้ Mutex.try_lock แล้วยกเลิกถ้าได้ไม่ครบ
(* OCaml ไม่มี Mutex.timedlock native แต่เขียน pattern ได้ *)
let try_lock_both m1 m2 =
Mutex.lock m1;
if Mutex.try_lock m2 then true
else begin
Mutex.unlock m1; (* ปล่อยทั้งสองแล้วลองใหม่ *)
false
end
3. Hierarchical Locking: ให้ lock มี "level" และห้าม acquire lock ที่ level ต่ำกว่าขณะถือ level สูง
4. Lock-free data structures: หลีกเลี่ยง lock ทั้งหมด
DLA (Domain-Local Await) คือ mechanism ที่เปิดตัวใน OCaml 5.1 ช่วยให้ blocking primitive ของ stdlib (เช่น Mutex, Condition) ทำงานร่วมกับ fiber scheduler ของ Eio ได้
ปัญหาที่ DLA แก้: ถ้าใน Eio fiber เรียก Mutex.lock ปกติจะ block ทั้ง domain (ทุก fiber ในนั้น) — DLA เปิดโอกาสให้ scheduler สลับไปทำ fiber อื่นขณะรอ
(* ไฟล์: bin/dla_usage.ml *)
(* ตัวอย่าง: ใช้ Mutex ใน Eio fiber โดยไม่ block domain *)
(* ใช้ library domain_local_await ต่อกับ scheduler *)
let () =
(* ใน Eio.Main.run context, DLA ถูก install อัตโนมัติ *)
Eio_main.run @@ fun _env ->
let m = Mutex.create () in
let counter = ref 0 in
Eio.Switch.run @@ fun sw ->
for _ = 1 to 100 do
Eio.Fiber.fork ~sw (fun () ->
Mutex.protect m (fun () ->
incr counter
)
)
done;
(* Switch.run รอทุก fiber เสร็จ *)
Printf.printf "counter = %d\n" !counter
หมายเหตุ: ใน OCaml 5.2+ DLA ถูก bundle กับ stdlib ใน form ของ hook — scheduler (เช่น Eio) register hook เพื่อ integrate
Effect handlers คือ feature ภาษาใหม่ใน OCaml 5 ที่ให้เรา "ขัดจังหวะ" การทำงานของฟังก์ชันและส่งต่อ continuation ให้ handler จัดการ คล้าย exceptions แต่ resumable ได้ เปิดทางสู่การเขียน cooperative scheduler, dependency injection, และ I/O abstraction โดยไม่ต้องใช้ monadic style
Algebraic Effect คือ abstraction สำหรับ side effect ที่ แยกระหว่างการ "ประกาศ" effect กับการ "implement" effect คล้ายแนวคิด interface/implementation
raise e แล้ว stack ถูก unwind — ไม่สามารถกลับไปรันต่อจุดเดิมได้perform e แล้ว handler ได้ continuation (ส่วนที่เหลือของโปรแกรม) ซึ่งสามารถเรียก continue เพื่อรันต่อได้
%%{init: {'theme':'base','themeVariables':{
'primaryColor':'#3c3836',
'primaryTextColor':'#ebdbb2',
'primaryBorderColor':'#fabd2f',
'lineColor':'#a89984',
'secondaryColor':'#504945'
}}}%%
flowchart TB
subgraph Handler["Handler Scope"]
Start["เริ่มทำงาน"]
Work1["ทำงานส่วน A"]
Perform["perform Effect X"]
Work2["ทำงานส่วน B (หลัง resume)"]
End["จบ"]
Start --> Work1 --> Perform
Perform -.ส่ง continuation.-> H
H["Handler: จัดการ Effect X
ได้รับ continuation k"]
H -.continue k result.-> Work2
Work2 --> End
end
style Perform fill:#cc241d,stroke:#fb4934,color:#ebdbb2
style H fill:#458588,stroke:#83a598,color:#ebdbb2
style Work2 fill:#98971a,stroke:#b8bb26,color:#282828
กฎหลัก (informal):
return: ไม่มี side effectexception: side effect แบบ unwind (one-shot, non-resumable)effect: side effect แบบ general (multi-shot, resumable)OCaml 5 syntax สำหรับ effect มีสามส่วน
type _ Effect.topen Effect
open Effect.Deep
(* ประกาศ effect 2 ตัว *)
type _ Effect.t +=
| Read_line : string Effect.t
| Log : string -> unit Effect.t
หมายเหตุ: type _ Effect.t += เป็น extensible variant เพิ่ม constructor ใหม่เข้าไปในชนิด Effect.t ที่มีอยู่
Effect.perform(* โปรแกรม business logic ที่ perform effects *)
let greet () =
perform (Log "กรุณากรอกชื่อ: ");
let name = perform Read_line in
perform (Log (Printf.sprintf "สวัสดี %s!\n" name))
สำคัญ: ใน type signature ของ greet ไม่มีอะไรบอกว่ามี effect — OCaml 5 ยังไม่มี effect typing (อยู่ระหว่างพัฒนา)
Effect.Deep.match_with(* Handler: implement effect ที่ถูก perform *)
let run () =
try_with greet ()
{ effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Read_line -> Some (fun (k : (a, _) continuation) ->
let input = input_line stdin in
continue k input
)
| Log msg -> Some (fun k ->
print_string msg;
continue k ()
)
| _ -> None (* effect อื่นๆ ส่งต่อให้ handler ชั้นนอก *)
}
let () = run ()
OCaml 5 มีสอง API
Effect.Deep: handler ยังคงอยู่หลัง continue → handle effect ซ้ำได้Effect.Shallow: handler หมดหลัง continue → ต้อง install ใหม่เอง| คุณสมบัติ | Deep | Shallow |
|---|---|---|
| Install อัตโนมัติหลัง continue | ✅ | ❌ |
| ยืดหยุ่น: เปลี่ยน handler ระหว่างทาง | ❌ | ✅ |
| ซับซ้อนในการใช้ | น้อย | มาก |
| Performance | ดีกว่า | ด้อยกว่าเล็กน้อย |
| ใช้มากสุด | ใช้ 90% ของเวลา | ใช้เมื่อ handler ต้องเปลี่ยน |
กฎ: ถ้าไม่มีเหตุผลจะเปลี่ยน handler ระหว่างทาง ใช้ Deep
Effect handlers ให้เราเขียน scheduler แบบ cooperative ได้โดยไม่ต้องพึ่ง OS thread
(* ไฟล์: bin/cooperative_sched.ml *)
(* Green thread scheduler แบบง่ายด้วย effects *)
open Effect
open Effect.Deep
(* effects สำหรับ scheduler *)
type _ Effect.t +=
| Yield : unit Effect.t
| Spawn : (unit -> unit) -> unit Effect.t
(* queue ของ continuation ที่ยังไม่ได้รัน *)
let task_queue : (unit -> unit) Queue.t = Queue.create ()
let rec run_next () =
if not (Queue.is_empty task_queue) then begin
let task = Queue.pop task_queue in
task ()
end
(* handler หลัก *)
let run_scheduler main =
let rec handle f =
try_with f ()
{ effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Yield -> Some (fun (k : (a, unit) continuation) ->
(* enqueue continuation แล้วไปทำ task อื่น *)
Queue.push (fun () -> continue k ()) task_queue;
run_next ()
)
| Spawn new_task -> Some (fun k ->
(* enqueue task ใหม่ *)
Queue.push (fun () -> handle new_task) task_queue;
continue k ()
)
| _ -> None
}
in
handle main;
run_next ()
(* ทดลอง: สาม task ที่สลับกันทำงาน *)
let () =
run_scheduler (fun () ->
perform (Spawn (fun () ->
for i = 1 to 3 do
Printf.printf " task A: %d\n" i;
perform Yield
done
));
perform (Spawn (fun () ->
for i = 1 to 3 do
Printf.printf " task B: %d\n" i;
perform Yield
done
));
perform (Spawn (fun () ->
for i = 1 to 3 do
Printf.printf " task C: %d\n" i;
perform Yield
done
))
)
(* Output:
task A: 1
task B: 1
task C: 1
task A: 2
task B: 2
task C: 2
task A: 3
task B: 3
task C: 3
*)
นี่คือ foundation ของ Eio — Eio ใช้เทคนิคนี้กับ I/O readiness events
Effect handlers ให้เราทำ "implicit parameter" / DI โดยไม่ต้องส่ง config ไปตามฟังก์ชันทุกตัว
(* ไฟล์: bin/effect_di.ml *)
(* Dependency injection ด้วย effect handlers *)
open Effect
open Effect.Deep
(* ประกาศ effect สำหรับ logging และ config *)
type _ Effect.t +=
| Get_config : string -> string Effect.t (* get config by key *)
| Log_info : string -> unit Effect.t
(* business logic ใช้ effects โดยไม่รู้ว่า config/log มาจากไหน *)
let fetch_user user_id =
let base_url = perform (Get_config "api.base_url") in
perform (Log_info (Printf.sprintf "Fetching %s/%s" base_url user_id));
(* จำลอง HTTP call *)
Printf.sprintf "User(%s)" user_id
(* handler สำหรับ production *)
let prod_config = [
"api.base_url", "https://api.prod.example.com";
"db.host", "prod-db.internal";
]
let prod_handler f =
try_with f ()
{ effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Get_config key -> Some (fun (k : (a, _) continuation) ->
let v = List.assoc key prod_config in
continue k v
)
| Log_info msg -> Some (fun k ->
Printf.printf "[PROD] %s\n" msg;
continue k ()
)
| _ -> None
}
(* handler สำหรับ test — ไม่ทำ I/O จริง *)
let test_handler f =
let logs = ref [] in
let result = try_with f ()
{ effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Get_config key -> Some (fun (k : (a, _) continuation) ->
continue k (Printf.sprintf "test://%s" key)
)
| Log_info msg -> Some (fun k ->
logs := msg :: !logs;
continue k ()
)
| _ -> None
}
in
(result, List.rev !logs)
(* รัน 2 environment *)
let () =
Printf.printf "=== Production ===\n";
prod_handler (fun () ->
let u = fetch_user "U123" in
Printf.printf "ผล: %s\n" u
);
Printf.printf "\n=== Test ===\n";
let _, logs = test_handler (fun () -> fetch_user "U456") in
List.iter (Printf.printf "log: %s\n") logs
จุดสำคัญ: fetch_user ไม่ต้องรับ config/logger เป็น parameter และไม่ต้อง let* cfg = get_config in ... แบบ monadic — handler ตัดสินว่า effect ทำอะไร
(* ไฟล์: bin/effect_scope.ml *)
(* จำลอง Go's defer หรือ Python's with ด้วย effects *)
open Effect
open Effect.Deep
type _ Effect.t +=
| Defer : (unit -> unit) -> unit Effect.t
let with_defer f =
let deferred = ref [] in
let result =
try_with f ()
{ effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Defer action -> Some (fun (k : (a, _) continuation) ->
deferred := action :: !deferred;
continue k ()
)
| _ -> None
}
in
(* รัน cleanup ย้อนลำดับ *)
List.iter (fun action ->
try action ()
with e -> Printf.eprintf "deferred error: %s\n" (Printexc.to_string e)
) !deferred;
result
(* ตัวอย่างใช้งาน *)
let () =
with_defer (fun () ->
let fd = Unix.openfile "/tmp/test.txt" [O_CREAT; O_WRONLY] 0o644 in
perform (Defer (fun () ->
Printf.printf "ปิด file descriptor\n";
Unix.close fd
));
let buf = Bytes.of_string "hello\n" in
perform (Defer (fun () ->
Printf.printf "lock ถูกปล่อย\n"
));
ignore (Unix.write fd buf 0 (Bytes.length buf));
Printf.printf "เขียนไฟล์เสร็จ\n"
)
(* Output (cleanup เรียงย้อน):
เขียนไฟล์เสร็จ
lock ถูกปล่อย
ปิด file descriptor *)
Effect handler สามารถเรียก continue k หลายครั้งได้ (multi-shot) — เปิดทางทำ backtracking search, non-deterministic computation
(* ไฟล์: bin/nondet.ml *)
(* Non-deterministic choice ด้วย effects *)
open Effect
open Effect.Shallow
type _ Effect.t +=
| Choose : bool Effect.t
(* หา all possible solutions *)
let all_solutions f =
let results = ref [] in
let rec run k v =
match continue_with k v
{ retc = (fun x -> results := x :: !results);
exnc = raise;
effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Choose -> Some (fun (k : (a, _) continuation) ->
(* ลองทั้ง true และ false *)
let k2 = Obj.magic k in (* เตือน: ใน prod ใช้ clone properly *)
run k true;
run k2 false
)
| _ -> None
}
with () -> ()
in
let init_k = fiber f in
run init_k ();
List.rev !results
(* หาค่า x, y, z ∈ {true, false} ที่ x ∧ (y ∨ z) = true *)
(* หมายเหตุ: ตัวอย่างนี้ใช้เพื่อประกอบการอธิบายแนวคิด — multi-shot ใน OCaml 5
ยังต้องใช้ด้วยความระวัง เพราะการ clone continuation ไม่ปลอดภัยในทุกกรณี *)
เตือน: multi-shot continuation ใน OCaml 5 ยังไม่ support เต็มรูป — continuation สามารถ resume ได้เพียงครั้งเดียว ถ้าต้องการ non-determinism แท้ๆ ต้องใช้ library อื่นหรือ fork process
| คุณสมบัติ | Exception | Lwt Monad | Effect Handler |
|---|---|---|---|
| Syntax overhead | น้อย | สูง (ต้อง let*) |
น้อย |
| Resumable | ❌ | ❌ (หลัง reject) | ✅ |
| ส่งค่ากลับได้ | ❌ | ✅ (Lwt.return) | ✅ (continue k v) |
| Static tracking | ❌ | ✅ (type 'a Lwt.t) |
❌ (ยัง) |
| Composable | จำกัด | สูง | สูง |
| Handler ซ้อนได้ | ✅ | - | ✅ |
| Zero-cost ถ้าไม่ perform | ✅ | ❌ | ✅ |
Effect handler เป็นอนาคต ของ async ใน OCaml — library ใหม่ๆ (Eio, Miou, Riot) ล้วนสร้างอยู่บน effect
Eio คือ library ที่ build อยู่บน effect handlers เพื่อเสนอ API สำหรับ non-blocking I/O แบบ structured ไม่ต้องใช้ monadic style (ไม่มี let* หรือ >>=) ส่วนนี้อธิบาย architecture ของ Eio การเขียน TCP server/client structured concurrency ผ่าน Switches และเปรียบเทียบกับ Lwt
Eio มี concept หลักสามตัว
env) เช่น network, filesystem, clock, random
%%{init: {'theme':'base','themeVariables':{
'primaryColor':'#3c3836',
'primaryTextColor':'#ebdbb2',
'primaryBorderColor':'#fabd2f',
'lineColor':'#a89984',
'secondaryColor':'#504945'
}}}%%
flowchart TB
subgraph Eio["Eio_main.run"]
subgraph Sched["Event Loop (Scheduler)"]
Ready["Ready Queue
(fibers ready to run)"]
Waiting["Waiting Set
(fibers waiting I/O)"]
end
subgraph Sw1["Switch (outer)"]
F1["Fiber 1"]
subgraph Sw2["Switch (inner)"]
F2["Fiber 2"]
F3["Fiber 3"]
end
end
Backend["Backend
io_uring / epoll / posix"]
end
OS["Kernel
file/socket/timer events"]
F1 -.perform Wait.-> Sched
F2 -.perform Wait.-> Sched
F3 -.perform Wait.-> Sched
Sched <--> Backend
Backend <--> OS
style Sched fill:#458588,stroke:#83a598,color:#ebdbb2
style Sw1 fill:#98971a,stroke:#b8bb26,color:#282828
style Sw2 fill:#689d6a,stroke:#8ec07c,color:#282828
style Backend fill:#d79921,stroke:#fabd2f,color:#282828
style OS fill:#cc241d,stroke:#fb4934,color:#ebdbb2
## ติดตั้ง Eio (จะได้ eio_main ซึ่งเลือก backend อัตโนมัติ)
opam install eio_main
## ต้องการ io_uring บน Linux 5.11+:
opam install eio_linux
dune:
(executable
(name my_app)
(libraries eio eio_main))
(* ไฟล์: bin/hello_eio.ml *)
(* โปรแกรม Eio ที่ง่ายที่สุด *)
let main env =
let stdout = Eio.Stdenv.stdout env in
Eio.Flow.copy_string "สวัสดี Eio!\n" stdout
let () =
Eio_main.run main
สังเกต:
Eio_main.run main เริ่ม event loop และส่ง environment ให้ mainEio.Stdenv.stdout env ขอ capability stdout จาก environmentlet*, >>=, Lwt.return — อ่านเหมือน synchronous code(* ไฟล์: bin/fiber_demo.ml *)
(* สลับ task ด้วย Fiber *)
let main env =
let clock = Eio.Stdenv.clock env in
Eio.Fiber.both
(fun () ->
for i = 1 to 5 do
Printf.printf " A: %d\n%!" i;
Eio.Time.sleep clock 0.2
done)
(fun () ->
for i = 1 to 5 do
Printf.printf " B: %d\n%!" i;
Eio.Time.sleep clock 0.3
done)
(* Fiber.both รันทั้งสองพร้อมกัน รอทั้งสองเสร็จแล้ว return *)
let () = Eio_main.run main
(* Output สลับไปมา:
A: 1
B: 1
A: 2
B: 2
A: 3
A: 4
B: 3
A: 5
B: 4
B: 5 *)
Fiber API หลัก:
| ฟังก์ชัน | หน้าที่ |
|---|---|
Fiber.both f g |
รัน f, g พร้อมกัน รอทั้งคู่ |
Fiber.all [f1; f2; ...] |
รันทั้งหมดพร้อมกัน |
Fiber.any [f1; f2; ...] |
รันทั้งหมด เอาผลแรกที่เสร็จ ยกเลิกที่เหลือ |
Fiber.fork ~sw f |
เพิ่ม fiber ใหม่เข้า switch ไม่รอ |
Fiber.yield () |
คืน control ให้ scheduler |
Fiber.List.map f xs |
map แบบ parallel fibers |
Switch คือ primitive หลักของ structured concurrency ใน Eio
(* ไฟล์: bin/switch_demo.ml *)
let worker clock name id =
for i = 1 to 3 do
Printf.printf " [%s] iter %d\n%!" name i;
Eio.Time.sleep clock (0.1 *. float_of_int id)
done
let main env =
let clock = Eio.Stdenv.clock env in
Eio.Switch.run (fun sw ->
(* fork 3 fiber เข้า switch เดียวกัน *)
Eio.Fiber.fork ~sw (fun () -> worker clock "alpha" 1);
Eio.Fiber.fork ~sw (fun () -> worker clock "beta" 2);
Eio.Fiber.fork ~sw (fun () -> worker clock "gamma" 3)
(* Switch.run จะไม่ return จน fiber ทั้งสามเสร็จ *)
);
Printf.printf "ทุก fiber เสร็จหมดแล้ว\n"
let () = Eio_main.run main
(* ❌ Unstructured — ไม่ Eio-style *)
let bad () =
let _ = Thread.create worker () in (* อาจยังรันหลัง main จบ *)
()
(* ✅ Structured ด้วย Switch *)
let good env =
Eio.Switch.run (fun sw ->
Eio.Fiber.fork ~sw worker
(* switch รับประกันว่า fiber เสร็จก่อน exit block *)
)
(* ไฟล์: bin/tcp_echo_server.ml *)
(* Echo server ที่รองรับหลาย client พร้อมกัน *)
let handle_client ~clock flow addr =
Eio.traceln "รับการเชื่อมต่อจาก %a" Eio.Net.Sockaddr.pp addr;
let buf = Eio.Buf_read.of_flow flow ~max_size:1_000_000 in
try
let rec loop () =
let line = Eio.Buf_read.line buf in
Eio.traceln "got: %s" line;
(* echo กลับ *)
Eio.Flow.copy_string (line ^ "\n") flow;
loop ()
in
loop ()
with End_of_file ->
Eio.traceln "%a ปิดการเชื่อมต่อ" Eio.Net.Sockaddr.pp addr
let run_server ~net ~clock ~port =
Eio.Switch.run @@ fun sw ->
(* สร้าง listening socket *)
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in
let socket = Eio.Net.listen ~sw net addr ~backlog:128 ~reuse_addr:true in
Eio.traceln "Listening บน port %d..." port;
(* loop รับ connection *)
while true do
Eio.Net.accept_fork
~sw
socket
~on_error:(fun ex ->
Eio.traceln "Connection error: %a" Fmt.exn ex)
(fun flow addr -> handle_client ~clock flow addr)
done
let main env =
run_server
~net:(Eio.Stdenv.net env)
~clock:(Eio.Stdenv.clock env)
~port:8080
let () = Eio_main.run main
ทดสอบ:
## terminal 1
dune exec ./bin/tcp_echo_server.exe
## terminal 2
echo "hello world" | nc 127.0.0.1 8080
## → hello world
## หรือหลาย client พร้อมกัน
for i in {1..10}; do
(echo "hello $i"; sleep 1) | nc 127.0.0.1 8080 &
done
จุดสำคัญ: Eio.Net.accept_fork fork fiber ใหม่ทุกครั้งที่มี connection เข้ามา — ไม่ต้องเขียน loop เอง
(* ไฟล์: bin/file_io.ml *)
(* อ่าน/เขียนไฟล์แบบ non-blocking *)
let main env =
let fs = Eio.Stdenv.fs env in
(* เขียนไฟล์ *)
Eio.Path.save ~create:(`Or_truncate 0o644)
Eio.Path.(fs / "/tmp/eio_test.txt")
"สวัสดี Eio! บรรทัดที่ 1\nบรรทัดที่ 2\n";
(* อ่านไฟล์ *)
let content = Eio.Path.load Eio.Path.(fs / "/tmp/eio_test.txt") in
Printf.printf "อ่านได้: %d ไบต์\n%s---จบไฟล์---\n"
(String.length content) content;
(* อ่านทีละบรรทัด *)
Eio.Path.with_open_in
Eio.Path.(fs / "/tmp/eio_test.txt")
(fun flow ->
let buf = Eio.Buf_read.of_flow flow ~max_size:1_000_000 in
try
let n = ref 0 in
while true do
let line = Eio.Buf_read.line buf in
incr n;
Printf.printf " บรรทัด %d: %s\n" !n line
done
with End_of_file -> ()
)
let () = Eio_main.run main
(* ไฟล์: bin/cancel_demo.ml *)
let slow_work ~clock name duration =
for i = 1 to 10 do
Printf.printf " [%s] step %d/%d\n%!" name i 10;
Eio.Time.sleep clock (duration /. 10.0)
done;
Printf.printf " [%s] เสร็จสมบูรณ์\n%!" name
let main env =
let clock = Eio.Stdenv.clock env in
(* ทดสอบ timeout: ยกเลิกงานถ้าเกิน 2 วินาที *)
Printf.printf "=== Test 1: timeout 2s, งานใช้ 5s ===\n";
begin try
Eio.Time.with_timeout_exn clock 2.0 (fun () ->
slow_work ~clock "Task-A" 5.0)
with Eio.Time.Timeout ->
Printf.printf " ❌ Task-A ถูก timeout!\n"
end;
(* ทดสอบ first-wins: Fiber.any ยกเลิกที่เหลือ *)
Printf.printf "\n=== Test 2: Fiber.any ===\n";
let winner = Eio.Fiber.any [
(fun () -> Eio.Time.sleep clock 1.0; "fast");
(fun () -> Eio.Time.sleep clock 3.0; "slow");
(fun () -> Eio.Time.sleep clock 2.0; "medium");
] in
Printf.printf " ผู้ชนะคือ: %s (ตัวอื่นถูก cancel)\n" winner
let () = Eio_main.run main
| คุณสมบัติ | Eio | Lwt | Async (Jane Street) |
|---|---|---|---|
| Style | Direct (no monad) | Monadic | Monadic |
| Type signature | ไม่แสดง async ใน type | 'a Lwt.t |
'a Deferred.t |
| Structured concurrency | ✅ Switch | ผ่าน library | ผ่าน library |
| Multicore | ✅ native | ⚠️ ได้ผ่าน Domain | ⚠️ ได้ผ่าน Domain |
| Backend options | io_uring / epoll / posix | libev | epoll |
| Cancellation | ✅ structured | Manual (Lwt.cancel) | Manual |
| Syntax | เหมือน sync code | let* x = ... |
let%bind x = ... |
| Age | ใหม่ (2022+) | เก่า (2009+) | เก่า (2013+) |
| Ecosystem | กำลังเติบโต | ใหญ่ | ใหญ่ในวงการ Jane Street |
(* Lwt monadic style *)
let lwt_fetch url =
let open Lwt.Syntax in
let* resp = Http_client.get url in
let* body = Cohttp_lwt.Body.to_string resp in
Lwt.return (String.length body)
(* Eio direct style *)
let eio_fetch ~net url =
let resp = Http_client.get ~net url in
let body = Http_client.body_string resp in
String.length body
(* เหมือน sync แต่ actually async *)
(* ไฟล์: bin/pipeline.ml *)
(* 3-stage pipeline: produce → transform → consume *)
let main env =
let clock = Eio.Stdenv.clock env in
let stage1_out = Eio.Stream.create 10 in (* bounded buffer *)
let stage2_out = Eio.Stream.create 10 in
Eio.Switch.run @@ fun sw ->
(* Stage 1: produce เลข 1..20 *)
Eio.Fiber.fork ~sw (fun () ->
for i = 1 to 20 do
Printf.printf " [produce] %d\n%!" i;
Eio.Stream.add stage1_out i;
Eio.Time.sleep clock 0.05
done;
Eio.Stream.add stage1_out 0 (* sentinel = 0 = end *)
);
(* Stage 2: ยกกำลังสอง *)
Eio.Fiber.fork ~sw (fun () ->
let rec loop () =
let x = Eio.Stream.take stage1_out in
if x = 0 then Eio.Stream.add stage2_out 0
else begin
let y = x * x in
Printf.printf " [square] %d -> %d\n%!" x y;
Eio.Stream.add stage2_out y;
loop ()
end
in
loop ()
);
(* Stage 3: consume + sum *)
let sum = ref 0 in
let rec loop () =
let x = Eio.Stream.take stage2_out in
if x = 0 then ()
else begin
sum := !sum + x;
Printf.printf " [consume] running sum = %d\n%!" !sum;
loop ()
end
in
loop ();
Printf.printf "\nTotal: %d\n" !sum
let () = Eio_main.run main
สำหรับ CPU-bound งานที่ต้องแบ่งงานย่อยจำนวนมาก การใช้ Domain.spawn ตรงๆ ไม่ efficient พอ เพราะ overhead สูง Domainslib ให้ Task pool พร้อม work-stealing scheduler และ parallel-for API ที่เหมาะกับ map-reduce, pipeline และ fork-join parallelism ส่วนนี้ครอบคลุมทั้ง Domainslib, benchmarking และ Amdahl-aware design
Domainslib คือ official library ของ OCaml multicore team สร้าง task pool ที่ reuse domain ได้ ลด overhead ของ Domain.spawn
opam install domainslib
แนวคิด: สร้าง pool ครั้งเดียว แล้ว submit task เข้าไปเรื่อยๆ — pool ทำ work-stealing scheduler ระหว่าง domain
| ฟังก์ชัน | หน้าที่ |
|---|---|
Task.setup_pool ~num_domains |
สร้าง pool |
Task.teardown_pool |
ปิด pool |
Task.run pool f |
รัน f ภายใน pool context |
Task.async pool f |
fork task, คืน promise |
Task.await pool p |
รอผล promise |
Task.parallel_for pool ~start ~finish ~body |
parallel for loop |
Task.parallel_for_reduce pool ~start ~finish ~body reducer init |
parallel reduce |
(* ไฟล์: bin/parallel_map.ml *)
(* parallel map: f ทุกช่องของ array *)
let parallel_map pool f arr =
let n = Array.length arr in
let result = Array.make n (f arr.(0)) in (* init to avoid uninit *)
Domainslib.Task.parallel_for pool
~start:0 ~finish:(n - 1)
~body:(fun i -> result.(i) <- f arr.(i));
result
(* ตัวอย่าง: คำนวณ SHA-256 ของ 1000 string *)
let expensive_hash s =
let rec loop acc n =
if n = 0 then acc
else loop (Digest.string (acc ^ string_of_int n)) (n - 1)
in
Digest.to_hex (loop s 1000)
let () =
let cores = Domain.recommended_domain_count () in
let pool = Domainslib.Task.setup_pool ~num_domains:(cores - 1) () in
let data = Array.init 1000 (fun i -> Printf.sprintf "item-%d" i) in
(* sequential *)
let t0 = Unix.gettimeofday () in
let _ = Array.map expensive_hash data in
let t1 = Unix.gettimeofday () in
Printf.printf "Sequential: %.2fs\n" (t1 -. t0);
(* parallel *)
let t2 = Unix.gettimeofday () in
let _ = Domainslib.Task.run pool (fun () ->
parallel_map pool expensive_hash data
) in
let t3 = Unix.gettimeofday () in
Printf.printf "Parallel (%d domains): %.2fs, speedup=%.2fx\n"
cores (t3 -. t2) ((t1 -. t0) /. (t3 -. t2));
Domainslib.Task.teardown_pool pool
(* ผลรันตัวอย่าง (Ryzen AI 7 350, 8 cores, 7 worker domains):
Sequential: 4.81s
Parallel (7 domains): 0.72s, speedup=6.68x *)
(* ไฟล์: bin/parallel_reduce.ml *)
let parallel_sum_f pool arr =
let n = Array.length arr in
Domainslib.Task.parallel_for_reduce
pool
~start:0 ~finish:(n - 1)
~body:(fun i -> arr.(i))
(+.)
0.0
(* ตัวอย่าง: Monte Carlo π *)
let monte_carlo_pi pool ~samples =
let counts =
Domainslib.Task.parallel_for_reduce
pool
~start:0 ~finish:(samples - 1)
~body:(fun _ ->
let x = Random.float 1.0 in
let y = Random.float 1.0 in
if x *. x +. y *. y <= 1.0 then 1 else 0
)
(+)
0
in
4.0 *. float_of_int counts /. float_of_int samples
let () =
let cores = Domain.recommended_domain_count () in
let pool = Domainslib.Task.setup_pool ~num_domains:(cores - 1) () in
let pi = Domainslib.Task.run pool (fun () ->
monte_carlo_pi pool ~samples:100_000_000
) in
Printf.printf "π ≈ %.6f (actual %.6f)\n" pi Float.pi;
Domainslib.Task.teardown_pool pool
สูตรของ Monte Carlo estimate สำหรับ π:
คำอธิบายตัวแปร:
Work-stealing scheduler เป็นเทคนิคที่แต่ละ worker มี deque (double-ended queue) ของ task ตัวเอง — ถ้า queue ว่างก็ไป "ขโมย" task จาก worker อื่น
%%{init: {'theme':'base','themeVariables':{
'primaryColor':'#3c3836',
'primaryTextColor':'#ebdbb2',
'primaryBorderColor':'#fabd2f',
'lineColor':'#a89984',
'secondaryColor':'#504945'
}}}%%
flowchart LR
subgraph W1["Worker 1 (busy)"]
Q1["Deque:
T1 T2 T3 T4 T5"]
end
subgraph W2["Worker 2 (busy)"]
Q2["Deque:
T6 T7"]
end
subgraph W3["Worker 3 (idle)"]
Q3["Deque: ว่าง"]
end
subgraph W4["Worker 4 (idle)"]
Q4["Deque: ว่าง"]
end
Q3 -.steal from top.-> Q1
Q4 -.steal from top.-> Q2
style W1 fill:#98971a,stroke:#b8bb26,color:#282828
style W2 fill:#98971a,stroke:#b8bb26,color:#282828
style W3 fill:#d79921,stroke:#fabd2f,color:#282828
style W4 fill:#d79921,stroke:#fabd2f,color:#282828
สาระสำคัญ:
Domainslib ใช้ work-stealing ภายใน ไม่ต้องเขียนเอง
ต่างจาก data parallelism ที่แบ่งข้อมูลให้ worker, pipeline parallelism แบ่ง stage ของการประมวลผล
(* ไฟล์: bin/pipeline_parallel.ml *)
(* 3-stage pipeline: read → process → write *)
let run_pipeline pool inputs =
let ch_in_out = Eio.Stream.create 100 in (* หรือใช้ Saturn.Queue *)
let ch_out_final = Eio.Stream.create 100 in
let t = Domainslib.Task.async pool (fun () ->
(* Stage 1: "read" — ใส่ input เข้า channel *)
List.iter (fun x -> Eio.Stream.add ch_in_out x) inputs;
Eio.Stream.add ch_in_out (-1) (* sentinel *)
) in
let t2 = Domainslib.Task.async pool (fun () ->
(* Stage 2: "process" — ยกกำลังสาม *)
let rec loop () =
let x = Eio.Stream.take ch_in_out in
if x = -1 then Eio.Stream.add ch_out_final (-1)
else begin
Eio.Stream.add ch_out_final (x * x * x);
loop ()
end
in loop ()
) in
(* Stage 3: "write" — รวมผล *)
let results = ref [] in
let rec loop () =
let x = Eio.Stream.take ch_out_final in
if x = -1 then ()
else begin results := x :: !results; loop () end
in
loop ();
Domainslib.Task.await pool t;
Domainslib.Task.await pool t2;
List.rev !results
(* หมายเหตุ: ตัวอย่างนี้ต้องใช้ใน Eio_main.run context
เพื่อให้ Eio.Stream ทำงานได้ *)
การวัด performance ใน multicore code ยาก เพราะมี variance สูง bechamel เป็น benchmark library ที่ทำ statistical analysis ให้
opam install bechamel bechamel-notty
(* ไฟล์: bin/bench_parallel.ml *)
open Bechamel
open Toolkit
let bench_sequential () =
let _ = Array.init 1_000_000 (fun i -> i * i) in ()
let bench_parallel_2 pool () =
Domainslib.Task.run pool (fun () ->
let arr = Array.make 1_000_000 0 in
Domainslib.Task.parallel_for pool
~start:0 ~finish:999_999
~body:(fun i -> arr.(i) <- i * i)
)
let test pool =
Test.make_grouped
~name:"parallel_map"
[
Test.make ~name:"sequential" (Staged.stage bench_sequential);
Test.make ~name:"parallel_2" (Staged.stage (bench_parallel_2 pool));
]
let benchmark () =
let pool = Domainslib.Task.setup_pool ~num_domains:2 () in
let ols = Analyze.ols ~bootstrap:0 ~r_square:true
~predictors:Measure.[|run|] in
let instances =
Instance.[monotonic_clock; minor_allocated; major_allocated] in
let cfg = Benchmark.cfg ~limit:2000 ~quota:(Time.second 1.0) () in
let raw = Benchmark.all cfg instances (test pool) in
let results = List.map (fun i ->
Analyze.all ols i raw
) instances in
let results = Analyze.merge ols instances results in
Domainslib.Task.teardown_pool pool;
(results, raw)
let () =
let results, _ = benchmark () in
let open Bechamel_notty in
let window =
match Notty_unix.winsize Unix.stdout with
| Some (w, h) -> { Bechamel_notty.w; h }
| None -> { w = 80; h = 1 }
in
img (window, results) |> eol |> output_image
| Primitive | เหมาะกับ | Overhead | Control | Library |
|---|---|---|---|---|
Domain.spawn |
Long-running worker | สูง (~500 μs) | สูงสุด | stdlib |
Domainslib.Task.async |
Short CPU task | ต่ำ (~μs) | กลาง | domainslib |
Domainslib.parallel_for |
Data parallel loop | ต่ำมาก | ต่ำ | domainslib |
Eio.Fiber.fork |
I/O-bound task | ต่ำมาก (~μs) | กลาง | eio |
Eio.Executor_pool |
CPU task ใน Eio context | ต่ำ | กลาง | eio |
Raw Atomic |
Primitive sync | ต่ำสุด | สูง | stdlib |
(* ไฟล์: bin/matmul.ml *)
(* Matrix multiplication: C = A × B แบบ parallel *)
type matrix = float array array
let make_matrix rows cols f : matrix =
Array.init rows (fun i -> Array.init cols (fun j -> f i j))
let seq_matmul (a : matrix) (b : matrix) : matrix =
let n = Array.length a in
let m = Array.length b.(0) in
let k = Array.length b in
let c = Array.make_matrix n m 0.0 in
for i = 0 to n - 1 do
for j = 0 to m - 1 do
let sum = ref 0.0 in
for l = 0 to k - 1 do
sum := !sum +. a.(i).(l) *. b.(l).(j)
done;
c.(i).(j) <- !sum
done
done;
c
let par_matmul pool (a : matrix) (b : matrix) : matrix =
let n = Array.length a in
let m = Array.length b.(0) in
let k = Array.length b in
let c = Array.make_matrix n m 0.0 in
Domainslib.Task.parallel_for pool
~start:0 ~finish:(n - 1)
~body:(fun i ->
for j = 0 to m - 1 do
let sum = ref 0.0 in
for l = 0 to k - 1 do
sum := !sum +. a.(i).(l) *. b.(l).(j)
done;
c.(i).(j) <- !sum
done
);
c
let () =
let size = 512 in
Random.self_init ();
let a = make_matrix size size (fun _ _ -> Random.float 1.0) in
let b = make_matrix size size (fun _ _ -> Random.float 1.0) in
(* sequential *)
let t0 = Unix.gettimeofday () in
let _ = seq_matmul a b in
let t1 = Unix.gettimeofday () in
Printf.printf "Sequential: %.2fs\n" (t1 -. t0);
(* parallel *)
let cores = Domain.recommended_domain_count () in
let pool = Domainslib.Task.setup_pool ~num_domains:(cores - 1) () in
let t2 = Unix.gettimeofday () in
let _ = Domainslib.Task.run pool (fun () -> par_matmul pool a b) in
let t3 = Unix.gettimeofday () in
Printf.printf "Parallel (%d domains): %.2fs, speedup=%.2fx\n"
(cores - 1) (t3 -. t2) ((t1 -. t0) /. (t3 -. t2));
Domainslib.Task.teardown_pool pool
(* ผลรันตัวอย่าง (Ryzen AI 7 350, 8 cores, 512×512):
Sequential: 1.97s
Parallel (7 domains): 0.32s, speedup=6.13x *)
ข้อสังเกต: การ parallelize outer loop อย่างเดียวได้ speedup ≈ 6x บน 7 worker domain เนื่องจากมี memory bandwidth และ cache contention — ตรงกับ Amdahl's law (ดู 13.7)
%%{init: {'theme':'base','themeVariables':{
'primaryColor':'#3c3836',
'primaryTextColor':'#ebdbb2',
'primaryBorderColor':'#fabd2f',
'lineColor':'#a89984',
'secondaryColor':'#504945'
}}}%%
flowchart TB
subgraph Era1["ยุคก่อน Multicore (2009-2020)"]
L2009["2009: Lwt 1.0
Monadic concurrency"]
A2013["2013: Async (Jane Street)
Alternative monadic"]
L2009 --> A2013
end
subgraph Era2["ยุคเตรียมการ (2014-2022)"]
RP["2014: Multicore OCaml branch
เริ่ม research"]
ES["2019: Effect handlers prototype"]
PR["2021: Merge proposals accepted"]
RP --> ES --> PR
end
subgraph Era3["OCaml 5 Era (2022-ปัจจุบัน)"]
O50["2022-12: OCaml 5.0
Multicore GC + Effects"]
E01["2023: Eio 0.x + Domainslib"]
O51["2023: OCaml 5.1
DLA, minor heap share"]
E1["2024: Eio 1.0"]
O52["2024: OCaml 5.2
GC improvements"]
Sat["2024: Saturn (prod lock-free)"]
O53["2025: OCaml 5.3
Effect system evolution"]
O54["2025: OCaml 5.4"]
O50 --> E01 --> O51 --> E1 --> O52 --> Sat --> O53 --> O54
end
Era1 -.-> Era2
Era2 -.-> Era3
style Era1 fill:#504945,color:#ebdbb2
style Era2 fill:#458588,color:#ebdbb2
style Era3 fill:#98971a,color:#282828
เมื่ออ่านครบทั้ง 7 หัวข้อ (12-18) คุณควรเข้าใจ
แนวทางออกแบบ systems:
(lang dune 3.14)
(executables
(names hello_eio fiber_demo tcp_echo_server parallel_sum)
(libraries eio eio_main domainslib saturn unix threads.posix))
## OCaml 5.2 หรือใหม่กว่า
opam switch create 5.2.1
opam install dune eio eio_main domainslib saturn bechamel bechamel-notty
sample_urls.txt สำหรับทดลอง I/Ohttps://example.com/a
https://example.com/b
https://example.com/c
https://example.com/d
https://example.com/e
https://example.com/f
https://example.com/g
https://example.com/h
#!/bin/bash
## save as: test_server.sh
## ทดสอบ tcp_echo_server ด้วย 100 concurrent client
dune exec ./bin/tcp_echo_server.exe &
SERVER_PID=$!
sleep 1
for i in $(seq 1 100); do
(echo "msg $i from client $i" | timeout 2 nc 127.0.0.1 8080) &
done
wait
kill $SERVER_PID