Skip to content

MemoryIO#

authx._internal._memory.MemoryIO #

MemoryIO()

Initialize an instance of MemoryIO.

Creates a dictionary to store the session data.

Source code in authx/_internal/_memory.py
def __init__(self) -> None:
    """Initialize an instance of MemoryIO.

    Creates a dictionary to store the session data.
    """
    self.raw_memory_store = {}

raw_memory_store instance-attribute #

raw_memory_store = {}

MemoryIO is a class that implements the IO interface for the session store.

It is used to store session data in memory.

has_session_id async #

has_session_id(session_id)
PARAMETER DESCRIPTION
session_id

TYPE: str

Source code in authx/_internal/_memory.py
async def has_session_id(self, session_id: str) -> bool:
    return session_id in self.raw_memory_store

has_no_session_id async #

has_no_session_id(session_id)
PARAMETER DESCRIPTION
session_id

TYPE: str

Source code in authx/_internal/_memory.py
async def has_no_session_id(self, session_id: str) -> bool:
    return session_id not in self.raw_memory_store

create_store async #

create_store(session_id)
PARAMETER DESCRIPTION
session_id

TYPE: str

Source code in authx/_internal/_memory.py
async def create_store(self, session_id: str) -> dict[str, Any]:
    self.raw_memory_store[session_id] = {
        "created_at": int(time.time()),
        "store": {},
    }
    await self.save_store(session_id)
    return self.raw_memory_store.get(session_id, {}).get("store", {})

get_store async #

get_store(session_id)
PARAMETER DESCRIPTION
session_id

TYPE: str

Source code in authx/_internal/_memory.py
async def get_store(self, session_id: str) -> Optional[dict[str, Any]]:
    if self.raw_memory_store.get(session_id):
        return self.raw_memory_store.get(session_id, {}).get("store")
    else:
        return None

save_store async #

save_store(session_id)
PARAMETER DESCRIPTION
session_id

TYPE: str

Source code in authx/_internal/_memory.py
async def save_store(self, session_id: str) -> None:
    await self.get_store(session_id)

gc async #

gc()
Source code in authx/_internal/_memory.py
async def gc(self) -> None:
    if len(self.raw_memory_store) >= 100:
        await self.cleanup_old_sessions()

cleanup_old_sessions async #

cleanup_old_sessions()
Source code in authx/_internal/_memory.py
async def cleanup_old_sessions(self) -> None:
    current_time = int(time.time())
    sessions_to_delete = [
        session_id
        for session_id, session_info in self.raw_memory_store.items()
        if current_time - session_info["created_at"] > 3600 * 12
    ]
    for session_id in sessions_to_delete:
        del self.raw_memory_store[session_id]