第八章 并发编程
Table of Contents
- 第八章 并发编程
- 8.1 并发原语
- 创建进程
- 向进程发送消息
- 接收消息
 
- 8.2 一个简单的例子
- 8.3 客户/服务器介绍
- 第一步
- 第二步
- 第三步
 
- 8.4 创建一个进程需要花费多少时间
- 8.5 带超时的receive
- 8.5.1 只有超时的receive
- 8.5.2 超时时间为0的receive
- 8.5.3 使用一个无限等待超时进行接收
- 8.5.4 实现一个计时器
 
- 8.6 选择性接收
- 8.7 注册进程
- 注册进程
- 取消注册
- 判断是否已注册
- 查看注册列表
 
- 8.8 如何编写一个并发程序
- 8.9 尾递归技术
- 8.10 使用MFA启动进程
- 8.11 习题
- 测试注册函数
- 测试发送消息
 
 
- 8.1 并发原语
第八章 并发编程
Erlang中的进程并非属于操作系统, 它是属于程序语言本身的。
Erlang中的进程的特点:
1、  创建和销毁进程非常迅速
2、  在两个进程间收发消息非常迅速
3、  进程在所有操作系统上行为相同
4、  可以创建大量进程
5、  进程之间不共享任何数据, 彼此间完全独立
6、  进程间交互的唯一方式是消息传递
8.1 并发原语
创建进程
Pid = spawn(Fun).
向进程发送消息
Pid ! Message
Pid1 ! Pid2 ! ... M
接收消息
receive
    Pattern1 [when Guard1] ->
        Expressions1;
    Pattern2 [when Guard2] ->
        Expressions2;
    ...
end.
8.2 一个简单的例子
-module(area_server0).
-export([loop/0]).
# 在loop函数中根据接收的消息执行不同的动作, 并继续等待接收消息
loop() ->
    receive
        {rectangle, Width, Ht} ->
            io:format("Area of rectangle is ~p~n", [Width * Ht]),
            loop();
        {circle, R} ->
            io:format("Area of circle is ~p~n", [3.14159 * R * R]),
            loop();
        Other ->
            io:format("I don't know what the area of a ~p is ~n", [Other]),
            loop()
    end.
向指定进程(通过Pid指定)传递消息
1> c(area_server0).
{ok,area_server0}
2> Pid = spawn(fun area_server0:loop/0).
<0.37.0>
3> Pid ! {rectangle, 6, 10}.
Area of rectangle is 60
{rectangle,6,10}
4> Pid ! {circle, 23}.
Area of circle is 1661.90111
{circle,23}
5> Pid ! {triangle, 2, 4, 5}.
I don't know what the area of a {triangle,2,4,5} is 
{triangle,2,4,5}
8.3 客户/服务器介绍
第一步
-module(area_server1).
-export([loop/0, rpc/2]).
%% 远程过程调用函数
%% 封装发送请求和等待回应
rpc(Pid, Request) ->
    %% 发送时带上进程号以区分发送者
    %% 使用self()函数获取进程自己的Pid
    %% 发送后使用receive原语等待回应 
    Pid ! {self(), Request},
    receive
        Response ->
            Response
    end.
%% 在loop函数中根据接收的消息和Pid, 完成计算后将结果返回发送者 
loop() ->
    receive
        {From, {rectangle, Width, Ht}} ->
            From ! Width * Ht,
            loop();
        {From, {circle, R}} ->
            From ! 3.14159 * R * R,
            loop();
        {From, Other} ->
            From ! {error, Other},
            loop()
    end.
调用结果:
1> c(area_server1).
{ok,area_server1}
2> Pid = spawn(fun area_server1:loop/0).
<0.48.0>
3> area_server1:rpc(Pid, {rectangle, 6, 8}).
48
4> area_server1:rpc(Pid, {circle, 6}).
113.09723999999999
5> area_server1:rpc(Pid, socks).
{error,socks}
第二步
-module(area_server2).
-export([loop/0, rpc/2]).
%% 远程过程调用函数
%% 相比于第一步, 多了区分服务器进程的Pid 
rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        {Pid, Response} ->
            Response
    end.
%% 为了便于客户端区分, 将结果返回时带上自己的进程号
loop() ->
    receive
        {From, {rectangle, Width, Ht}} ->
            From ! {self(), Width * Ht},
            loop();
        {From, {circle, R}} ->
            From ! {self(), 3.14159 * R * R},
            loop();
        {From, Other} ->
            From ! {self(), {error, Other}},
            loop()
    end.
调用结果:
1> c(area_server2).
{ok,area_server2}
2> Pid = spawn(fun area_server2:loop/0).
<0.59.0>
3> area_server2:rpc(Pid, {circle, 5}).
78.53975
第三步
-module(area_server3).
-export([start/0, area/2]).
%% 封装进程创建的过程 
start() ->spawn(fun loop/0).
%% 隐藏远程过程调用
area(Pid, What) ->rpc(Pid, What).
rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        {Pid, Response} ->
            Response
    end.
loop() ->
    receive
        {From, {rectangle, Width, Ht}} ->
            From ! {self(), Width * Ht},
            loop();
        {From, {circle, R}} ->
            From ! {self(), 3.14159 * R * R},
            loop();
        {From, Other} ->
            From ! {self(), {error, Other}},
            loop()
    end.
调用结果:
1> c(area_server3).
{ok,area_server3}
2> Pid = area_server3:start().
<0.68.0>
3> area_server3:area(Pid, {rectangle, 10, 8}).
80
4> area_server3:area(Pid, {circle, 4}).
50.26544
8.4 创建一个进程需要花费多少时间
-module(processes).
-export([max/1]).
%% 创建N个进程并销毁, 查看其运行时间
max(N) ->
    %% 获取系统支持的最大进程数 可以在启动shell时通过"+P"进行设置
    Max = erlang:system_info(process_limit),
    io:format("Maximum allowed processes:~p~n", [Max]),
    %% 用于统计代码执行所耗的CPU时间和真实时间
    %% 即将要计时的代码段放在statistics(runtime), code..., statistics(runtime)之间
    statistics(runtime),
    statistics(wall_clock),
    %% 创建N个进程
    L = for(1, N, fun() ->spawn(fun() ->wait() end) end),
    {_, Time1} = statistics(runtime),
    {_, Time2} = statistics(wall_clock),
    %% 销毁进程
    lists:foreach(fun(Pid) ->Pid ! die end, L),
    U1 = Time1 * 1000 / N,
    U2 = Time2 * 1000 / N,
    io:format("Process spawn time=~p (~p) microseconds~n",[U1, U2]).
wait() ->
    receive 
        die ->void
    end.
for(N, N, F) ->[F()];
for(I, N, F) ->[F() | for(I+1, N, F)].
在我的Intel i7 2.9G处理器、8G内存的Mac Pro上运行效果如下:
matrix@MBP:8 $ erl +P 500000
Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:4:4] [async-threads:0] [hipe] [kernel-poll:false]
Eshell V5.9.1  (abort with ^G)
1> processes:max(20000).
Maximum allowed processes:500000
Process spawn time=3.0 (2.95) microseconds
ok
2> processes:max(50000).
Maximum allowed processes:500000
Process spawn time=2.8 (2.86) microseconds
ok
3> processes:max(100000).
Maximum allowed processes:500000
Process spawn time=2.8 (2.92) microseconds
ok
4> processes:max(200000).
Maximum allowed processes:500000
Process spawn time=2.6 (2.825) microseconds
ok
5> processes:max(400000).
Maximum allowed processes:500000
Process spawn time=2.6 (2.81) microseconds
ok
8.5 带超时的receive
为receive语句添加超时处理, 格式如下:
receice
    Pattern1 [when Guard1] ->
        Expressions1;
    Pattern2 [when Guard2] ->
        Expressions2;
    ...
after Time ->
    Expressions
end.
8.5.1 只有超时的receive
只有超时的receive其实就是实现的sleep功能
sleep(T) ->
    receive
    after T ->
        true
    end.
8.5.2 超时时间为0的receive
设置超时时间为0, 避免进程永久暂停
flush_buffer() ->
    receive
        %% 这里使用下划线变量(未绑定)来匹配任意消息
        _Any ->
            %%  _Any匹配任意消息后继续调用flush_buffer()将最终清空所有消息
            flush_buffer()
    %% 清空所有消息后如果没有设置超时时间为0将导致flush_buffer()函数的永久暂停
    after 0 ->
        true
    end.
设置超时时间为0, 实现优先接收
priority_receive() ->
    %% 如果没有消息匹配alarm, 程序将会走到超时设置的代码
    receive
        {alarm, X} ->{alarm, X}
    %% 在超时设置里使用Any将会匹配到第一条消息
    after 0 ->
        receive
            Any ->Any
        end
    end.
8.5.3 使用一个无限等待超时进行接收
如果将after后跟的时间值设置为infinity, 将会导致系统永远不会触发超时。
8.5.4 实现一个计时器
-module(stimer).
-export([start/2, cancel/1]).
%% 启动函数, 指定间隔时间和要执行的函数
start(Time, Fun) ->spawn(fun() ->timer(Time, Fun) end).
%% 结束函数, 向指定进程发送结束命令 
cancel(Pid) ->Pid ! cancel.
%% 计时器函数
%% 如果在超时时间之前接收到结束消息则销毁进程
%% 否则将执行指定的函数 
timer(Time, Fun) ->
    receive
        cancel ->void
    after Time ->
            Fun()
    end.
8.6 选择性接收
send原语将消息发送到一个进程的邮箱
receive原语将邮箱中的消息进行处理并删除
%% 进入receive语句后即启动计时器(如果有after语句)
receice
    %% 依次从邮箱中取出消息对Pattern进行模式匹配
    %% 匹配成功后将执行模式后面的表达式并删除消息
    Pattern1 [when Guard1] ->
        Expressions1;
    Pattern2 [when Guard2] ->
        Expressions2;
    ...
%% 如果没有找到可以匹配的消息则进程挂起 
after Time ->
    Expressions
end.
8.7 注册进程
考虑到安全性和便捷性, Erlang提供了进程注册的方式替换PID的方式实现进程间的通信。
注册进程
register(AnAtom, Pid), 将Pid注册一个名为AnAtom的原子
取消注册
unregister(AnAtom), 移除AnAtom相对于的进程信息
判断是否已注册
whereis(AnAtom) -> Pid | undefined, 如果已注册则返回Pid, 否则返回undefined
查看注册列表
registered() -> [AnAtom::atom()], 返回一个系统中所有已注册的名称列表
8.8 如何编写一个并发程序
作者提供的并发程序的编程模版
-module(ctemplate).
-compile(export_all).
%% 在启动函数中创建线程并调用loop函数
start() ->
    spawn(fun() ->loop([]) end).
%% 在远程过程调用中向指定的进程发送消息, 为区别不同的发送者调用self()带上自己的进程号
rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        {Pid, Response} ->Response
    end.
%% 在loop函数中处理不同的消息 
loop(X) ->
    receive
        Any ->
            io:format("Received:~p~n", [Any]),
            loop(X)
    end.
8.9 尾递归技术
使用求阶乘来说明尾递归更容易理解一些:
-module(fact).
-compile(export_all).
factorial(N) ->
    factorial(1, N).
%% 求阶乘的函数
%% Res用于记录当前结果, N用于记录乘数的变化
%% 如计算factorial(5), 展开过程为
%% (1, 5) -> (1*5, 4) -> (5*4, 3) -> (20*3, 2) -> (60*2, 1) -> (120, 0) -> 120
%% 始终只需要两个变量来记录状态, 相比于普通的递归, 极为节省栈空间 
factorial(Res, N) ->
    case N =:= 0 of
        true  ->Res;
        false ->factorial(Res*N, N-1)
    end.
8.10 使用MFA启动进程
MFA方式即通过指定模块、函数、参数的方式来启动进程
spawn(Mod, FuncName, Args)
相比于普通的方式, 其可以使程序在出于运行状态时仍然可以使用新版本代码进行升级。
8.11 习题
测试注册函数
%% 在main函数中连续调用两次start函数, 模拟并行
main(AnAtom, Fun) ->
    start(AnAtom, Fun),
    start(AnAtom, Fun).
%% 根据Fun创建进程, 注册之前首先调用whereis函数查看是否已注册 
start(AnAtom, Fun) ->
    Pid = spawn(Fun),
    Temp = whereis(AnAtom),
    case undefined =:= Temp of
        true  ->register(AnAtom, Pid);
        false ->io:format("~p had registered and Pid is ~p ~n", [AnAtom, Temp])
    end.
调用结果:
1> test1:main(test1, fun() -> io:format("just for test register") end). 
test1 had registered and Pid is <0.36.0> 
just for test register
just for test register
测试发送消息
-module(sendm).
-compile(export_all).
%% 创建N个进程并销毁, 查看其运行时间
sendmessage(N, M) ->
    %% 用于统计代码执行所耗的CPU时间和真实时间
    statistics(runtime),
    statistics(wall_clock),
    %% 创建N个进程
    L = for(1, N, fun() ->spawn(fun() ->wait() end) end),
    %% 给每个进程发送M条消息
    for(1, M, fun() ->lists:foreach(fun(Pid) ->Pid ! test end, L) end),
    %% 销毁每个进程
    lists:foreach(fun(Pid) ->Pid ! die end, L),
    {_, Time1} = statistics(runtime),
    {_, Time2} = statistics(wall_clock),
    U1 = Time1 * 1000,
    U2 = Time2 * 1000,
    io:format("Process send message time=~p (~p) microseconds~n",[U1, U2]).
wait() ->
    receive 
        test ->wait();
        die  ->void
    end.
for(N, N, F) ->[F()];
for(I, N, F) ->[F() | for(I+1, N, F)].
调用结果:
1> c(sendm).                      
{ok,sendm}
2> sendm:sendmessage(1000, 10000).
Process send message time=32760000 (56635000) microseconds
ok