0%

CSharp 异步处理的几种形式

引子

异步处理的几种方式

使用 委托/回调函数

示例: 实现一个异步HTTP加载的过程

HTTP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
using System.Threading;

namespace AsyncDelegateTest
{
/// <summary>
/// HTTP 回应
/// </summary>
public class HTTPResponse
{
/// <summary>
/// 回应错误码
/// </summary>
public int errorCode;
/// <summary>
/// 回应消息串
/// </summary>
public string response;
}
/// <summary>
/// 示例程序
/// </summary>
public class HTTPRequest
{
/// <summary>
/// 请求成功 委托定义
/// </summary>
/// <param name="originalRequest"></param>
/// <param name="response"></param>
public delegate void OnRequestFinishedDelegate(HTTPRequest originalRequest, HTTPResponse response);
/// <summary>
/// 下载进度委托示例
/// </summary>
/// <param name="originalRequest"></param>
/// <param name="response"></param>
public delegate void OnDownloadProgressDelegate(HTTPRequest originalRequest, long downloaded, long downloadLength);
/// <summary>
/// 上传进度委托示例
/// </summary>
/// <param name="originalRequest"></param>
/// <param name="uploaded"></param>
/// <param name="uploadLength"></param>
public delegate void OnUploadProgressDelegate(HTTPRequest originalRequest, long uploaded, long uploadLength);

/// <summary>
/// 异步回调方法
/// </summary>
public OnRequestFinishedDelegate onRequestFinished;
public OnDownloadProgressDelegate onDownloadProgress;
public OnUploadProgressDelegate onUploadProgress;

public string httptype;
public string url;
}

public static class HTTP
{
/// <summary>
/// 测试代码
/// </summary>
/// <param name="httpRequest"></param>
public static void Send(this HTTPRequest httpRequest)
{
ThreadStart reqestSimulate = () =>
{
int length = 10;
if (httpRequest.onUploadProgress != null)
{
for (int i = 0; i < length; i++)
{
Thread.Sleep(50);
httpRequest.onUploadProgress.Invoke(httpRequest, i, length);
}
}
if (httpRequest.onDownloadProgress != null)
{
for (int i = 0; i < length; i++)
{
Thread.Sleep(50);
httpRequest.onDownloadProgress.Invoke(httpRequest, i, length);
}
}
if (httpRequest.onRequestFinished != null)
{
HTTPResponse response = new HTTPResponse();
response.response = httpRequest.url + ":responsed!";
response.errorCode = 200;
httpRequest.onRequestFinished.Invoke(httpRequest, response);
}
};

Thread thread = new Thread(reqestSimulate);
thread.Start();
}

}

}

DelegateTest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
using System;

namespace AsyncDelegateTest
{
class Program
{
static void Main(string[] args)
{
AsyncDelegateTest();
}
/// <summary>
/// 异步委托函数测试
/// </summary>
static void AsyncDelegateTest()
{
HTTPRequest request = new HTTPRequest();
request.url = "http://www.baidu.com";
request.httptype = "get";
request.onDownloadProgress = (r, a, l) =>
{
Console.WriteLine(string.Format("onDownloadProgress:{0}/{1}", a, l));
};
request.onUploadProgress = (r, a, l) =>
{
Console.WriteLine(string.Format("onUploadProgress:{0}/{1}", a, l));
};
request.onRequestFinished = (request, respone) =>
{
Console.WriteLine(string.Format("onRequestFinished:{0}", respone.response));
};
request.Send();
Console.WriteLine("request.Send!");

Console.ReadKey();
}
}
}

定义三个 委托类型,进度改变的时候调用回调函数,通知进度更新

使用Task

任务和线程的区别:

1. 任务是架构在线程之上的,也就是说任务最终还是要抛给线程去执行。

2. 任务跟线程不是一对一的关系,比如开10个任务并不是说会开10个线程,这一点任务有点类似线程池,但是任务相比线程池有很小的开销和精确的控制。

3. Task的优势

ThreadPool相比Thread来说具备了很多优势,但是ThreadPool却又存在一些使用上的不方便。比如:

  • ThreadPool不支持线程的取消、完成、失败通知等交互性操作;
  • ThreadPool不支持线程执行的先后次序;

以往,如果开发者要实现上述功能,需要完成很多额外的工作,现在,微软提供了一个功能更强大的概念:Task。Task在线程池的基础上进行了优化,并提供了更多的API。在Framework 4.0中,如果我们要编写多线程程序,Task显然已经优于传统的方式。

创建无返回值的Task

1. new

1
2
3
4
5
6
7
8
9
10
static void TaskMethod(string taskname){
//Thread.Sleep(500);
//Task TODO
}

static void Main(string[] args){
Task task1 = new Task(() => TaskMethod("Task 1")); //TaskMethod 具体任务操作
task1.Start();
Task.WaitAll(task1);//等待任务结束
}

2. Task.Factory.StartNew

构造函数创建的task,必须手动Start,而通过工厂创建的Task直接就启动了。

1
Task.Factory.StartNew(() => TaskMethod("Task 3"));    //直接异步的方法

或者

1
2
var t3=Task.Factory.StartNew(() => TaskMethod("Task 3"));
Task.WaitAll(t3);//等待任务结束

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTaskTest
{
/// <summary>
/// 测试不带泛型参数的Task
/// </summary>
public class TaskTest_NonGeneric
{
static void TaskMethod(string taskname)
{
Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
taskname, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
Thread.Sleep(500);
}

public static void Test()
{
Task task1 = new Task(() => TaskMethod("T1")); //TaskMethod 具体任务操作
task1.Start();
Task.WaitAll(task1);//等待任务结束

Task.Factory.StartNew(() => TaskMethod("T2")); //直接异步的方法

var t3 = Task.Factory.StartNew(() => TaskMethod("T3"));
}
}
}

创建带返回值的Task

1. New

1
2
3
Task<int> task = CreateTask("Task 1");
task.Start();
int result = task.Result;

2. 让Task 任务在主线程运行

1
2
3
4
Task<int> task = CreateTask("Task 2");
//该任务会运行在主线程中
task.RunSynchronously();
int result = task.Result;

3. 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTaskTest
{
public class TaskTest
{
static int TaskMethod(string taskname)
{
Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
taskname, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
Thread.Sleep(500);
return 100;
}
static Task<int> CreateTask(string name)
{
return new Task<int>(() => TaskMethod(name));
}

public static void Test()
{
TaskMethod("Main Thread Task");

Task<int> task = CreateTask("T1");
task.Start();
int result = task.Result;
Console.WriteLine("T1 Result is: {0}", result);

task = CreateTask("T2");
//该任务会运行在主线程中
task.RunSynchronously();
result = task.Result;
Console.WriteLine("T2 Result is: {0}", result);

}
}
}

任务调度器 TaskScheduler

可以自定义任务调度器,将某些任务放入一个线程中,例如一串渲染任务,放入渲染线程中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;


namespace AsyncTaskTest
{
/// <summary>
/// 自定义任务调度器
/// 将任务 用同一个线程进行调度
/// </summary>
class MyTaskScheduler : TaskScheduler
{
public static new TaskScheduler Current { get; } = new MyTaskScheduler();
public static new TaskScheduler Default { get; } = Current;

/// <summary>
/// 线程安全集合
/// </summary>
private readonly BlockingCollection<Task> m_queue = new BlockingCollection<Task>();

MyTaskScheduler()
{
Thread thread = new Thread(Run);
thread.IsBackground = true;//设为为后台线程,当主线程结束时线程自动结束
thread.Start();
}

private void Run()
{
Console.WriteLine($"MyTaskScheduler, ThreadID: {Thread.CurrentThread.ManagedThreadId}");
Task t;
while (m_queue.TryTake(out t, Timeout.Infinite))
{
TryExecuteTask(t);//在当前线程执行Task
}
}

protected override IEnumerable<Task> GetScheduledTasks()
{
return m_queue;
}


/// <summary>
/// t.Start(MyTaskScheduler.Current)时,将Task加入到队列中
/// </summary>
/// <param name="task"></param>
protected override void QueueTask(Task task)
{
m_queue.Add(task);
}

//当执行该函数时,程序正在尝试以同步的方式执行Task代码
protected override bool TryExecuteTaskInline(Task task, bool taskWaspreviouslyQueued)
{
return false;
}
}

/// <summary>
/// 任务调度器测试
/// </summary>
class TaskSchedulerTest
{
static void TaskMethod(string taskname)
{
Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
taskname, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
Thread.Sleep(500);
}
static Task CreateTask(string name)
{
return new Task(() => TaskMethod(name));
}

public static void Test()
{
Console.WriteLine($"Main, ThreadID: {Thread.CurrentThread.ManagedThreadId}");
for (int i = 0; i < 10; i++)
{
var t = CreateTask("T" + (i+1));
t.Start(MyTaskScheduler.Current);
}
}
}
}

任务组合 ContinueWith

将任务和任务之间 进行串联,并联执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTaskTest
{
/// <summary>
/// 任务组合
/// </summary>
class ContinueWithTest
{
private static readonly Random rand = new Random();
static void TaskMethod(string taskname)
{
Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
taskname, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
int sleepTime = rand.Next(100, 500);
Thread.Sleep(sleepTime);
Console.WriteLine("Task {0} Completed!",taskname);
}

static Task CreateTask(string name)
{
return new Task(() => TaskMethod(name));
}

public static void Test()
{
Task task1 = CreateTask("Task1");
Task task2 = CreateTask("Task2");

task1.Start();

Console.WriteLine("主线程执行其他处理");
task1.ContinueWith(t => {
task2.Start();
});

}

public static void Test1()
{
Task task1 = CreateTask("Task1");
Task task2 = CreateTask("Task2");
Task task3 = CreateTask("Task3");
Task task4 = CreateTask("Task4");

// t1->[t2,t3]->t4

task1.Start();

Console.WriteLine("主线程执行其他处理");
task1.ContinueWith(t => {
task2.Start();
task3.Start();
});

Task t23 = Task.WhenAll(task2, task3);
t23.ContinueWith(t =>
{
task4.Start();
});

}
}
}

使用Promise

引用库 RSG-CSharpPromise

示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public IPromise<string> Download(string url)
{
var promise = new Promise<string>(); // Create promise.
using (var client = new WebClient())
{
client.DownloadStringCompleted += // Monitor event for download completed.
(s, ev) =>
{
if (ev.Error != null)
{
promise.Reject(ev.Error); // Error during download, reject the promise.
}
else
{
promise.Resolve(ev.Result); // Downloaded completed successfully, resolve the promise.
}
};

client.DownloadStringAsync(new Uri(url), null); // Initiate async op.
}

return promise; // Return the promise so the caller can await resolution (or error).
}


void Test(){
Download("http://www.google.com")
.Then(html =>
Console.WriteLine(html)
);
}

更多不介绍了

使用async/await

async/await的基础用法

async/await 结构可分成三部分:

  1. 调用方法:该方法调用异步方法,然后在异步方法执行其任务的时候继续执行;
  2. 异步方法:该方法异步执行工作,然后立刻返回到调用方法;
  3. await 表达式:用于异步方法内部,指出需要异步执行的任务。一个异步方法可以包含多个 await 表达式(不存在 await 表达式的话 IDE 会发出警告)。

async/await的优点

    1. 方便级联调用:即调用依次发生的场景;
    1. 同步代码编写方式: Promise使用then函数进行链式调用,一直点点点,是一种从左向右的横向写法;async/await从上到下,顺序执行,就像写同步代码一样,更符合代码编写习惯;
    1. 多个参数传递: Promise的then函数只能传递一个参数,虽然可以通过包装成对象来传递多个参数,但是会导致传递冗余信息,频繁的解析又重新组合参数,比较麻烦;async/await没有这个限制,可以当做普通的局部变量来处理,用let或者const定义的块级变量想怎么用就怎么用,想定义几个就定义几个,完全没有限制,也没有冗余工作;
    1. 同步代码和异步代码可以一起编写: 使用Promise的时候最好将同步代码和异步代码放在不同的then节点中,这样结构更加清晰;async/await整个书写习惯都是同步的,不需要纠结同步和异步的区别,当然,异步过程需要包装成一个Promise对象放在await关键字后面;
    1. 基于协程: Promise是根据函数式编程的范式,对异步过程进行了一层封装,async/await基于协程的机制,是真正的“保存上下文,控制权切换……控制权恢复,取回上下文”这种机制,是对异步过程更精确的一种描述;
    1. async/await 是对Promise的优化: async/await 是基于Promise的,是进一步的一种优化,不过在写代码时,Promise本身的API出现得很少,很接近同步代码的写法;

代码演示 两种写法的区别

T1->[T2,T3]->T4

Promise写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
using RSG;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
/// <summary>
/// Promise
/// </summary>
class AsyncPromiseTest
{
private static readonly Random rand = new Random();
static void TaskMethod(string taskname, Promise p)
{
Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
taskname, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
Thread.Sleep(rand.Next(100, 500));
Console.WriteLine("Task {0} Completed", taskname);
p.Resolve();

}

static Task CreateTask(string name, Promise p)
{
return new Task(() => TaskMethod(name, p));
}

/// <summary>
/// void 返回值的 async
/// t1->(t2,t3)->t4
/// </summary>
static IPromise CalcAsync()
{
Promise p1 = new Promise();
Promise p2 = new Promise();
Promise p3 = new Promise();
Promise p4 = new Promise();

Task task1 = CreateTask("T1", p1);
Task task2 = CreateTask("T2", p2);
Task task3 = CreateTask("T3", p3);
Task task4 = CreateTask("T4", p4);

p1.Then(() => {
task2.Start();
task3.Start();
});

Promise.All(p2, p3).Then(() =>
{
task4.Start();
});

task1.Start();

return p4.Then(() =>
{

});
}

public static void Test()
{
var ipromise = CalcAsync(); // void 类型不能 接受返回值
ipromise.Then(() =>
{
Console.WriteLine("Calc Completed.");
});
}
}
}

async/await 写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
public class AsyncTest_NonGeneric
{
private static readonly Random rand = new Random();
static void TaskMethod(string taskname)
{
Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
taskname, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
Thread.Sleep(rand.Next(100,500));
Console.WriteLine("Task {0} Completed", taskname);
}

static Task CreateTask(string name)
{
return new Task(() => TaskMethod(name));
}

/// <summary>
/// void 返回值的 async
/// t1->(t2,t3)->t4
/// </summary>
static async void CalcAsync()
{
Task task1 = CreateTask("T1");
task1.Start();
await task1;
Console.WriteLine("T1 Completed !" + "CalcAsync Thread ID is :" + Thread.CurrentThread.ManagedThreadId);

Task task2 = CreateTask("T2");
task2.Start();
Task task3 = CreateTask("T3");
task3.Start();

await task2;
Console.WriteLine("T2 Completed !" + "CalcAsync Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
await task3;
Console.WriteLine("T3 Completed !" + "CalcAsync Thread ID is :" + Thread.CurrentThread.ManagedThreadId);

Task task4 = CreateTask("T4");
task4.Start();
await task4;

Console.WriteLine("CalcAsync Completed !" + "CalcAsync Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
}

public static void Test()
{
Console.WriteLine("Main Thread CalcAsync Before");
CalcAsync(); // void 类型不能 接受返回值
Console.WriteLine("Main Thread CalcAsync After");
}
}
}

代码

点击下载

欢迎关注我的其它发布渠道