Azure辅助角色多线程队列处理

yxwkf 阅读:11 2024-10-25 08:56:14 评论:0

我有一个带有经典配置 WebRole + Worker 角色的 azure 云服务。 Worker 从队列中获取消息,对其进行处理,而不是一次删除一次。

我的代码是这样的:

public override void Run() 
    { 
        Trace.TraceInformation("Worker is running"); 
            try 
            { 
                this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
            } 
            finally 
            { 
                this.runCompleteEvent.Set(); 
            } 
    } 
 
public override bool OnStart() 
        { 
            ServicePointManager.DefaultConnectionLimit = 500; 
            bool result = base.OnStart(); 
            Trace.TraceInformation("WorkerAnalytics has been started"); 
            return result; 
        } 
 
 
 
private async Task RunAsync(CancellationToken cancellationToken) 
        { 
            var queue = ....//omitted info for brevity 
            CloudQueueMessage retrievedMessage = null; 
 
            while (!cancellationToken.IsCancellationRequested) 
            { 
                 try 
                    { 
                        retrievedMessage = await queue.GetMessageAsync(); 
                        if (retrievedMessage != null) 
                        { 
                            await ProcessMessage(retrievedMessage); 
                        } 
                        else 
                        { 
                            System.Threading.Thread.Sleep(500); 
                        } 
                    } 
                    catch (Exception e) 
                    { 
                        System.Threading.Thread.Sleep(500); 
                    } 
                } 
            } 
 
        } 

现在这工作完美,但 CPU 非常低,为 3%,一次只处理一个元素(每个元素大约 1 秒),但队列每秒大约有 1000 个新元素,这还不够。

如何使用机器拥有的所有 CPU 能力一次处理更多队列消息,并且不会使代码变得过于复杂?

还有 ServicePointManager.DefaultConnectionLimit 的用途是什么?

我花了几个小时寻找有效的辅助角色多线程解决方案,但现在都是 WebJobs 或使事情变得复杂的旧框架。

谢谢

请您参考如下方法:

您可以尝试运行 RunAsync() 的多个任务。

var tasks = new List<Task>(); 
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token)); 
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token)); 
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token)); 
Task.WaitAll(tasks.ToArray()); 


标签:多线程
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号