RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 1425710
Accepted
Yuriy Tigiev
Yuriy Tigiev
Asked:2022-09-01 17:21:06 +0000 UTC2022-09-01 17:21:06 +0000 UTC 2022-09-01 17:21:06 +0000 UTC

带锁的异步 - 功能

  • 772

不太清楚结构 async with lock:对于python。

我需要这样做,以便一次只执行一个函数来处理数据库。也就是说,对数据库有独占访问权。有几个功能可以与基础一起工作。

asyncioscheduler.add_job()按计划运行 N 个函数doWork1, doWork2, doWork3等,这些函数又调用函数与数据库 ( query1, query2, query3) 一起工作。问题是,async with lock 可以帮助限制同时执行一个用于处理数据库的函数吗?如果是这样,怎么做?这个想法是为这个任务使用一个队列,但它是可能的并且lock可以帮助简化实现。

https://docs-python.ru/standart-library/modul-asyncio-python/primitivy-sinhronizatsii-zadach-asyncio/


下面是正在解决的问题的工作伪代码。要求每单位时间只有一个分配的块工作async with lock:,无论正在执行哪个功能strategyBuy或strategySell这些功能的实例有多少正在运行。所有其他块必须“排队”并按顺序处理。一个问题是否可以 async with lock:帮助特定任务的给定实现?在示例的当前结果中,您可以看到条目Bought2.出现了多次,这表明 for 的代码段async with lock:是 strategyBuy并行执行的,这是不可接受的。理论上Bought2.,只有在数据库中没有记录或没有带有side: 'buy'and的记录时才会显示parent_id is null。在第一次运行时,这些条件都是 True(仅适用于第一个条目)。

import random
import asyncio
import aiosqlite as aiosql
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import uuid
import numpy as np

myState = 0


async def dbConnect():

    sqlConn = await aiosql.connect('test.db')

    await sqlConn.execute("""
                        CREATE TABLE IF NOT EXISTS tbl(
                            id TEXT NOT NULL,
                            side TEXT NOT NULL,
                            price NUMERIC NOT NULL,
                            parent_id TEXT NULL,
                            buy_ids TEXT NULL
                        )""")

    return sqlConn


async def check():
    return (random.uniform(0, 1) < 0.05)


async def fetchPrice():
    return random.uniform(0, 1)


async def sell():
    pass


async def getOpenBuyOrders(sqlConn):

    cursor = await sqlConn.execute(
        """SELECT id, price, parent_id
            FROM tbl
            WHERE side='buy'
                AND parent_id IS NULL""")

    return await cursor.fetchall()


async def saveBuyOrder(sqlConn, price):
    buyId = uuid.uuid1()
    await sqlConn.execute(f"INSERT INTO tbl(id, price, side) VALUES (?, ?, 'buy')", (buyId.hex, price))
    await sqlConn.commit()


async def saveSellOrder(sqlConn, price, buyIDs):
    sellId = uuid.uuid1()
    await sqlConn.execute(f"INSERT INTO tbl (id, price, buy_ids, side) VALUES (?, ?, ?, 'sell')", (sellId.hex, price, str(buyIDs)))

    buyIDString = ','.join(['?']*len(buyIDs))
    await sqlConn.execute(f"UPDATE tbl SET parent_id = ? WHERE  id in (?)", (sellId.hex, buyIDString))

    await sqlConn.commit()


async def strategyBuy(sqlConn, i):

    lock = asyncio.Lock()

    if(await check()):

        currentPrice = await fetchPrice()

        async with lock:
            global myState
            myState = myState + 1
            openBuyOrders = await getOpenBuyOrders(sqlConn)
            if(openBuyOrders):
                avgPrice = np.mean([openBuyOrder[1]
                                    for openBuyOrder in openBuyOrders])

                if(avgPrice > (currentPrice * 1.05)):
                    await saveBuyOrder(sqlConn, currentPrice)
                    print(f"Bought1. Task: {i}, myState:{myState}")

            else:
                await saveBuyOrder(sqlConn, currentPrice)
                print(f"Bought2. Task: {i}, myState:{myState}")

    return None


async def strategySell(sqlConn, i):

    lock = asyncio.Lock()

    if(await check()):

        currentPrice = await fetchPrice()

        async with lock:
            global myState
            myState = myState + 1
            openBuyOrders = await getOpenBuyOrders(sqlConn)

            if(openBuyOrders):
                avgPrice = np.mean([openBuyOrder[1]
                                    for openBuyOrder in openBuyOrders])

                if((avgPrice * 1.05) < currentPrice):

                    buyIDs = [openBuyOrder[0]
                              for openBuyOrder in openBuyOrders]
                    await saveSellOrder(sqlConn, currentPrice, buyIDs)
                    print(f"Sold. Task: {i}, myState:{myState}")

    return None


async def main():

    sqlConn = await dbConnect()

    scheduler = AsyncIOScheduler()

    for i in range(100):
        scheduler.add_job(strategyBuy, 'interval', seconds=10, args=[sqlConn, i],
                          start_date='2000-01-01 00:00:00', timezone='UTC')

        scheduler.add_job(strategySell, 'interval', seconds=10, args=[sqlConn, i],
                          start_date='2000-01-01 00:00:00', timezone='UTC')

    scheduler.start()

if __name__ == "__main__":

    loop = asyncio.new_event_loop()
    task = loop.create_task(main())
    loop.run_forever()

结果

Bought2. Task: 55, myState:15
Bought2. Task: 13, myState:15
Bought2. Task: 80, myState:15
Bought2. Task: 57, myState:15
Bought2. Task: 24, myState:15
Bought2. Task: 44, myState:15
Bought2. Task: 18, myState:15
Bought2. Task: 7, myState:15
Bought2. Task: 49, myState:15
Bought1. Task: 86, myState:19
Bought1. Task: 97, myState:19
python python-3.x
  • 3 3 个回答
  • 52 Views

3 个回答

  • Voted
  1. vadim vaduxa
    2022-09-01T17:52:01Z2022-09-01T17:52:01Z

    以及检查类似示例的问题是什么?可以看出,函数本身是并行执行的,除了带有锁块

    import asyncio
    
    async def t1():
        print('t1 start')
        # await lock.acquire()
        async with lock:
            print('t1 lock')
            await asyncio.sleep(2)
            print('t1 sleep')
            # lock.release()
            print('t1 release')
    
    async def t2():
        print('t2 start')
        async with lock:
            # await lock.acquire()
            print('t2 lock')
            await asyncio.sleep(2)
            print('t2 sleep')
            # lock.release()
            print('t2 release')
    
    lock = asyncio.Lock()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([loop.create_task(t1()), loop.create_task(t2()),]))
    loop.close()
    

    出去

    t1 start
    t1 lock
    t2 start
    t1 sleep
    t1 release
    t2 lock
    t2 sleep
    t2 release
    
    • 2
  2. Best Answer
    vadim vaduxa
    2022-09-03T17:16:33Z2022-09-03T17:16:33Z

    好吧,显然lock = asyncio.Lock()你需要把它放在块里if __name__ == "__main__":,一切都会好起来的

    • 2
  3. Yuriy Tigiev
    2022-09-04T01:47:11Z2022-09-04T01:47:11Z

    谢谢 vadim vaduxa 我正在发布 脚本的工作版本

    import random
    import asyncio
    import aiosqlite as aiosql
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    import uuid
    import numpy as np
    from datetime import datetime
    
    
    async def dbConnect():
    
        sqlConn = await aiosql.connect('test.db')
    
        await sqlConn.execute("""
                            CREATE TABLE IF NOT EXISTS tbl(
                                id TEXT NOT NULL,
                                side TEXT NOT NULL,
                                price NUMERIC NOT NULL,
                                parent_id TEXT NULL,
                                buy_ids TEXT NULL
                            )""")
    
        return sqlConn
    
    
    async def check():
        return (random.uniform(0, 1) < 0.05)
    
    
    async def fetchPrice():
        return random.uniform(0, 1)
    
    
    async def sell():
        pass
    
    
    async def getOpenBuyOrders(sqlConn):
    
        cursor = await sqlConn.execute(
            """SELECT id, price, parent_id
                FROM tbl
                WHERE side='buy'
                    AND parent_id IS NULL""")
    
        return await cursor.fetchall()
    
    
    async def saveBuyOrder(sqlConn, price):
        buyId = uuid.uuid1()
        await sqlConn.execute(f"INSERT INTO tbl(id, price, side) VALUES (?, ?, 'buy')", (buyId.hex, price))
        await sqlConn.commit()
    
    
    async def saveSellOrder(sqlConn, price, buyIDs):
        sellId = uuid.uuid1()
        await sqlConn.execute(f"INSERT INTO tbl (id, price, buy_ids, side) VALUES (?, ?, ?, 'sell')", (sellId.hex, price, str(buyIDs)))
    
        buyIDString = "'"+("','".join(buyIDs*len(buyIDs)))+"'"
        await sqlConn.execute(f"UPDATE tbl SET parent_id = '{sellId.hex}' WHERE  id in ({buyIDString})")
    
        await sqlConn.commit()
    
    
    async def strategyBuy(sqlConn, i):
    
        if(await check()):
    
            currentPrice = await fetchPrice()
    
            if not lock.locked():
                async with lock:
                    global myState
                    openBuyOrders = await getOpenBuyOrders(sqlConn)
                    if(openBuyOrders):
                        avgPrice = np.mean([openBuyOrder[1]
                                            for openBuyOrder in openBuyOrders])
    
                        if(avgPrice > (currentPrice * 1.05)):
                            await saveBuyOrder(sqlConn, currentPrice)
                            myState = myState + 1
                            print(
                                f"{datetime.now()} Bought1. Task: {i}, myState:{myState}")
    
                    else:
                        await saveBuyOrder(sqlConn, currentPrice)
    
                        myState = myState + 1
                        print(f"{datetime.now()} Bought2. Task: {i}, myState:{myState}")
    
        return None
    
    
    async def strategySell(sqlConn, i):
    
        if(await check()):
    
            currentPrice = await fetchPrice()
    
            if not lock.locked():
                async with lock:
    
                    openBuyOrders = await getOpenBuyOrders(sqlConn)
    
                    if(openBuyOrders):
                        avgPrice = np.mean([openBuyOrder[1]
                                            for openBuyOrder in openBuyOrders])
    
                        if((avgPrice * 1.05) < currentPrice):
    
                            buyIDs = [openBuyOrder[0]
                                      for openBuyOrder in openBuyOrders]
                            await saveSellOrder(sqlConn, currentPrice, buyIDs)
                            global myState
                            myState = myState + 1
                            print(
                                f"{datetime.now()} Sold. Task: {i}, myState:{myState}")
    
        return None
    
    
    async def main():
    
        sqlConn = await dbConnect()
    
        scheduler = AsyncIOScheduler()
    
        for i in range(1000):
            scheduler.add_job(strategyBuy, 'interval', seconds=20, args=[sqlConn, i],
                              start_date='2000-01-01 00:00:00', timezone='UTC')
    
            scheduler.add_job(strategySell, 'interval', seconds=20, args=[sqlConn, i],
                              start_date='2000-01-01 00:00:00', timezone='UTC')
    
        scheduler.start()
    
    if __name__ == "__main__":
        lock = asyncio.Lock()
        myState = 0
        loop = asyncio.new_event_loop()
        task = loop.create_task(main())
        loop.run_forever()
    
    • 0

相关问题

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    我看不懂措辞

    • 1 个回答
  • Marko Smith

    请求的模块“del”不提供名为“default”的导出

    • 3 个回答
  • Marko Smith

    "!+tab" 在 HTML 的 vs 代码中不起作用

    • 5 个回答
  • Marko Smith

    我正在尝试解决“猜词”的问题。Python

    • 2 个回答
  • Marko Smith

    可以使用哪些命令将当前指针移动到指定的提交而不更改工作目录中的文件?

    • 1 个回答
  • Marko Smith

    Python解析野莓

    • 1 个回答
  • Marko Smith

    问题:“警告:检查最新版本的 pip 时出错。”

    • 2 个回答
  • Marko Smith

    帮助编写一个用值填充变量的循环。解决这个问题

    • 2 个回答
  • Marko Smith

    尽管依赖数组为空,但在渲染上调用了 2 次 useEffect

    • 2 个回答
  • Marko Smith

    数据不通过 Telegram.WebApp.sendData 发送

    • 1 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5