Module control.sweeper

Periodically deletes stuff permanently that is marked as deleted.

This module takes care that mongodb records and file system folders that are marked for deletion, are physically removed after 31 days.

It also visits the temp directory and removes all subdirectories starting with tmp that are at least one one day old.

The sweeper is a function that is scheduled to run at a configured interval. Each worker of the running website has the sweeper scheduled.

But before each sweeper job executes, it checks the time of last execution. If that is less than a half interval ago, the sweeper job will return without doing anything.

In this way there will be always sweeper jobs scheduled, and if there are multiple workers, they will not do superfluous work. Note that when workers are killed and started, it remains guaranteed that sweeping will be done.

Expand source code Browse git
"""Periodically deletes stuff permanently that is marked as deleted.

This module takes care that mongodb records and file system folders
that are marked for deletion, are physically removed after 31 days.

It also visits the temp directory and removes all subdirectories starting
with `tmp` that are at least one one day old.

The sweeper is a function that is scheduled to run at a configured interval.
Each worker of the running website has the sweeper scheduled.

But before each sweeper job executes, it checks the time of last execution.
If that is less than a half interval ago, the sweeper job will return without
doing anything.

In this way there will be always sweeper jobs scheduled, and if there are multiple
workers, they will not do superfluous work.
Note that when workers are killed and started, it remains guaranteed that sweeping
will be done.
"""

import os
from apscheduler.schedulers.background import BackgroundScheduler

from .files import dirContents, fileExists, dirRemove
from .generic import lessAgo, mTime, isonow
from .mongo import MDELDT
from .files import FDEL
from .flask import runInfo

ON = True
"""Whether to invoke the sweeper or not.

Sometimes, for debugging or testing, it is handy to not start the sweeping process.
"""

DRY = False
"""Whether to perform the wipes on records and directories, or suppress the execution.

If True, all wipes will be announced, but not performed.
"""

DRYREP = "(dry)" if DRY else ""

SEC = 1 / 24 / 3600
"""A second as fraction of a day.

Some operations uses days as unit. This is the second with respect to the unit day.
"""

DELAY_UNDEL = None
"""The grace period for restoring deleted items.

Items that are marked as deleted less than this ago, can still be restored.
"""

DELAY_DEL = None
"""The grace period for permanently deleting deleted items.

Items that are marked as deleted less than this ago, will be permanently deleted
by the next sweeping action.
"""

DELAY_TMP = None
"""The grace period for deleting temp directories.

Sometimes temporary directories are not wiped properly after they have been used.
Those directories will be wiped after this period.
"""

INTERVAL = None
"""The interval between invocations of the sweeper function.

When workers schedule the sweeper job, they use this as the interval.
"""

if DRY:
    DELAY_UNDEL = 3600 * SEC
    DELAY_DEL = 55 * SEC
    DELAY_TMP = 35 * SEC
    INTERVAL = dict(seconds=10)
else:
    DELAY_UNDEL = 30
    DELAY_DEL = 31
    DELAY_TMP = 1
    INTERVAL = dict(days=1)

SWEEP_LEE = (
    0.4 * INTERVAL["days"]
    if "days" in INTERVAL
    else SEC * INTERVAL["seconds"] if "seconds" in INTERVAL else 0.5
)
"""The threshold for suppressing a sweep action.

If the latest sweep action occurred less than this ago,
the current sweep action will be suppressed.
"""


class Sweeper:
    def __init__(self, Settings, Messages, Mongo):
        self.Settings = Settings
        self.Mongo = Mongo
        self.Messages = Messages
        Messages.debugAdd(self)

        scheduler = BackgroundScheduler()
        self.scheduler = scheduler

    def maySchedule(self):
        """Whether a process is allowed to schedule the sweeper.

        Scheduling is suppressed if `ON` is False.

        Also, when Flask runs in debug mode, there are two processes working.
        The second process is the one that gets restarted when errors occur or
        code is updated. It is this process that may schedule sweepers, not the
        first process.
        """
        if not ON:
            return False

        Messages = self.Messages
        Settings = self.Settings
        debugMode = Settings.debugMode

        runMain = runInfo()
        startIt = debugMode and runMain or not debugMode
        head = f"SWEEPER{DRYREP} by worker {os.getpid()}: "

        if startIt:
            now = isonow()
            Messages.info(logmsg=f"{head}scheduled at {now}")
        else:
            Messages.info(logmsg=f"{head}deferred to debug instance")
        return startIt

    def start(self):
        """Schedules the sweeper job.
        """
        if self.maySchedule():
            scheduler = self.scheduler
            sweeper = self.clean()
            scheduler.add_job(sweeper, "interval", **INTERVAL)
            scheduler.start()

    def clean(self):
        """Provides the sweeper function.

        This method is not the sweeper function itself, but it *returns*
        the sweeper function, which has some variables from the rest of the
        program bound in.

        The sweeper function has three separate parts:

        *   *sweepMongo* (for the database records)
        *   *sweepDirectories* (for the project/edition directories)
        *   *sweepTemp* (for the temporary directories)
        """
        Messages = self.Messages
        Mongo = self.Mongo
        Messages = self.Messages
        Settings = self.Settings
        siteCrit = Settings.siteCrit

        def mayExecute():
            if not ON:
                return False

            site = Mongo.getRecord("site", siteCrit)
            sstm = site.sweeperStartTm or None
            head = f"SWEEPER{DRYREP} by worker {os.getpid()}: "

            now = isonow()

            if sstm is None or not lessAgo(SWEEP_LEE, sstm, iso=True):
                Mongo.updateRecord("site", siteCrit, dict(sweeperStartTm=now))
                result = True
            else:
                Messages.info(
                    logmsg=f"{head}skipped sweeping at {now} "
                    f"because too close to last sweep at {sstm}"
                )
                result = False

            return result

        def sweeper():
            if mayExecute():
                head = f"SWEEPER{DRYREP}: "
                self.sweepMongo()
                self.sweepDirectories()
                self.sweepTemp()
                now = isonow()
                Messages.info(logmsg=f"{head}sweep completed at {now}")

        return sweeper

    def sweepMongo(self):
        """Permanently deletes records marked as deleted in all tables.
        """
        Mongo = self.Mongo
        Messages = self.Messages

        tables = """
            edition
            editionUser
            keyword
            project
            projectUser
            site
            user
        """.strip().split()

        head = f"SWEEPER{DRYREP}-MONGO: "

        for table in tables:
            recordIds = [
                r._id
                for r in Mongo.getList(table, {}, deleted=True)
                if not lessAgo(DELAY_DEL, r.get(MDELDT, None))
            ]

            n = len(recordIds)

            if n:
                plural = "" if n == 1 else "s"
                Messages.info(logmsg=f"{head}{n:>3} {table} record{plural} to be wiped")

                if DRY:
                    for recordId in recordIds:
                        Messages.info(logmsg=f"{head}delete {table} record {recordId}")
                else:
                    Mongo.hardDeleteRecords(
                        table, dict(_id={"$in": recordIds}), "sweeper"
                    )

    def sweepDirectories(self):
        """Wipes all project/edition directories that are marked as deleted.

        Such directories are marked as deleted if they contain a file named
        `__deleted__.txt`.

        Note that it should not occur that projects are marked as deleted while
        they contain editions that are not deleted. But in case this should happen,
        the deletion of the project directory is prevented.
        """
        Messages = self.Messages
        Settings = self.Settings
        workingDir = Settings.workingDir
        projectsDir = f"{workingDir}/project"

        nP = 0
        nE = 0

        head = f"SWEEPER{DRYREP}-FOLDERS"

        for project in dirContents(projectsDir)[1]:
            headProj = f"{head} project/{project}"
            projectDir = f"{projectsDir}/{project}"
            editionsDir = f"{projectDir}/edition"

            for edition in dirContents(editionsDir)[1]:
                headEd = f"{headProj}/edition/{edition}"
                editionDir = f"{editionsDir}/{edition}"
                eDelFile = f"{editionDir}/{FDEL}"

                if fileExists(eDelFile) and not lessAgo(
                    DELAY_DEL, mTime(eDelFile), iso=False
                ):
                    nE += 1

                    if DRY:
                        Messages.info(logmsg=headEd)
                    else:
                        try:
                            dirRemove(editionDir)
                            Messages.info(logmsg=f"{headEd}: wiped")
                        except Exception as e:
                            Messages.error(
                                logmsg=f"{headEd}: failed to wipe because of {e}"
                            )

            pDelFile = f"{projectDir}/{FDEL}"

            if fileExists(pDelFile) and not lessAgo(
                DELAY_DEL, mTime(pDelFile), iso=False
            ):
                if len(dirContents(editionsDir)[1]):
                    Messages.error(
                        logmsg=f"{headProj}: will not wipe because it is not empty"
                    )
                else:
                    nP += 1

                    if DRY:
                        Messages.info(logmsg=headProj)
                    else:
                        try:
                            dirRemove(projectDir)
                            Messages.info(logmsg=f"{headProj}: wiped")
                        except Exception as e:
                            Messages.error(
                                logmsg=f"{headProj}: failed to wipe because of {e}"
                            )

        if nP > 0:
            plural = "" if nP == 1 else "s"
            Messages.info(logmsg=f"{head}: deleted {nP:>3} project{plural}")

        if nE > 0:
            plural = "" if nE == 1 else "s"
            Messages.info(logmsg=f"{head}: deleted {nE:>3} edition{plural}")

    def sweepTemp(self):
        """Wipes all temporary directories of a certain age, typically 1 day.

        These directories all resides at the toplevel of the temp dir, and their
        names start with `tmp`.
        """
        Messages = self.Messages
        Settings = self.Settings
        tempDir = Settings.tempDir

        head = f"SWEEPER{DRYREP}-TMP: "

        nT = 0

        for tmp in dirContents(tempDir)[1]:
            tmpd = f"{tempDir}/{tmp}"

            if tmp.startswith("tmp") and not lessAgo(DELAY_TMP, mTime(tmpd), iso=False):
                nT += 1

                if DRY:
                    Messages.info(logmsg=f"{head}tempdir {tmp}")
                else:
                    try:
                        dirRemove(tmpd)
                    except Exception as e:
                        Messages.error(
                            logmsg=f"{head}Failed to remove {tmpd} " f"because of {e}"
                        )

        if nT > 0:
            plural = "" if nT == 1 else "s"
            Messages.info(logmsg=f"{head}deleted {nT:>3} tempdir{plural}")

Global variables

var DELAY_DEL

The grace period for permanently deleting deleted items.

Items that are marked as deleted less than this ago, will be permanently deleted by the next sweeping action.

var DELAY_TMP

The grace period for deleting temp directories.

Sometimes temporary directories are not wiped properly after they have been used. Those directories will be wiped after this period.

var DELAY_UNDEL

The grace period for restoring deleted items.

Items that are marked as deleted less than this ago, can still be restored.

var DRY

Whether to perform the wipes on records and directories, or suppress the execution.

If True, all wipes will be announced, but not performed.

var INTERVAL

The interval between invocations of the sweeper function.

When workers schedule the sweeper job, they use this as the interval.

var ON

Whether to invoke the sweeper or not.

Sometimes, for debugging or testing, it is handy to not start the sweeping process.

var SEC

A second as fraction of a day.

Some operations uses days as unit. This is the second with respect to the unit day.

var SWEEP_LEE

The threshold for suppressing a sweep action.

If the latest sweep action occurred less than this ago, the current sweep action will be suppressed.

Classes

class Sweeper (Settings, Messages, Mongo)
Expand source code Browse git
class Sweeper:
    def __init__(self, Settings, Messages, Mongo):
        self.Settings = Settings
        self.Mongo = Mongo
        self.Messages = Messages
        Messages.debugAdd(self)

        scheduler = BackgroundScheduler()
        self.scheduler = scheduler

    def maySchedule(self):
        """Whether a process is allowed to schedule the sweeper.

        Scheduling is suppressed if `ON` is False.

        Also, when Flask runs in debug mode, there are two processes working.
        The second process is the one that gets restarted when errors occur or
        code is updated. It is this process that may schedule sweepers, not the
        first process.
        """
        if not ON:
            return False

        Messages = self.Messages
        Settings = self.Settings
        debugMode = Settings.debugMode

        runMain = runInfo()
        startIt = debugMode and runMain or not debugMode
        head = f"SWEEPER{DRYREP} by worker {os.getpid()}: "

        if startIt:
            now = isonow()
            Messages.info(logmsg=f"{head}scheduled at {now}")
        else:
            Messages.info(logmsg=f"{head}deferred to debug instance")
        return startIt

    def start(self):
        """Schedules the sweeper job.
        """
        if self.maySchedule():
            scheduler = self.scheduler
            sweeper = self.clean()
            scheduler.add_job(sweeper, "interval", **INTERVAL)
            scheduler.start()

    def clean(self):
        """Provides the sweeper function.

        This method is not the sweeper function itself, but it *returns*
        the sweeper function, which has some variables from the rest of the
        program bound in.

        The sweeper function has three separate parts:

        *   *sweepMongo* (for the database records)
        *   *sweepDirectories* (for the project/edition directories)
        *   *sweepTemp* (for the temporary directories)
        """
        Messages = self.Messages
        Mongo = self.Mongo
        Messages = self.Messages
        Settings = self.Settings
        siteCrit = Settings.siteCrit

        def mayExecute():
            if not ON:
                return False

            site = Mongo.getRecord("site", siteCrit)
            sstm = site.sweeperStartTm or None
            head = f"SWEEPER{DRYREP} by worker {os.getpid()}: "

            now = isonow()

            if sstm is None or not lessAgo(SWEEP_LEE, sstm, iso=True):
                Mongo.updateRecord("site", siteCrit, dict(sweeperStartTm=now))
                result = True
            else:
                Messages.info(
                    logmsg=f"{head}skipped sweeping at {now} "
                    f"because too close to last sweep at {sstm}"
                )
                result = False

            return result

        def sweeper():
            if mayExecute():
                head = f"SWEEPER{DRYREP}: "
                self.sweepMongo()
                self.sweepDirectories()
                self.sweepTemp()
                now = isonow()
                Messages.info(logmsg=f"{head}sweep completed at {now}")

        return sweeper

    def sweepMongo(self):
        """Permanently deletes records marked as deleted in all tables.
        """
        Mongo = self.Mongo
        Messages = self.Messages

        tables = """
            edition
            editionUser
            keyword
            project
            projectUser
            site
            user
        """.strip().split()

        head = f"SWEEPER{DRYREP}-MONGO: "

        for table in tables:
            recordIds = [
                r._id
                for r in Mongo.getList(table, {}, deleted=True)
                if not lessAgo(DELAY_DEL, r.get(MDELDT, None))
            ]

            n = len(recordIds)

            if n:
                plural = "" if n == 1 else "s"
                Messages.info(logmsg=f"{head}{n:>3} {table} record{plural} to be wiped")

                if DRY:
                    for recordId in recordIds:
                        Messages.info(logmsg=f"{head}delete {table} record {recordId}")
                else:
                    Mongo.hardDeleteRecords(
                        table, dict(_id={"$in": recordIds}), "sweeper"
                    )

    def sweepDirectories(self):
        """Wipes all project/edition directories that are marked as deleted.

        Such directories are marked as deleted if they contain a file named
        `__deleted__.txt`.

        Note that it should not occur that projects are marked as deleted while
        they contain editions that are not deleted. But in case this should happen,
        the deletion of the project directory is prevented.
        """
        Messages = self.Messages
        Settings = self.Settings
        workingDir = Settings.workingDir
        projectsDir = f"{workingDir}/project"

        nP = 0
        nE = 0

        head = f"SWEEPER{DRYREP}-FOLDERS"

        for project in dirContents(projectsDir)[1]:
            headProj = f"{head} project/{project}"
            projectDir = f"{projectsDir}/{project}"
            editionsDir = f"{projectDir}/edition"

            for edition in dirContents(editionsDir)[1]:
                headEd = f"{headProj}/edition/{edition}"
                editionDir = f"{editionsDir}/{edition}"
                eDelFile = f"{editionDir}/{FDEL}"

                if fileExists(eDelFile) and not lessAgo(
                    DELAY_DEL, mTime(eDelFile), iso=False
                ):
                    nE += 1

                    if DRY:
                        Messages.info(logmsg=headEd)
                    else:
                        try:
                            dirRemove(editionDir)
                            Messages.info(logmsg=f"{headEd}: wiped")
                        except Exception as e:
                            Messages.error(
                                logmsg=f"{headEd}: failed to wipe because of {e}"
                            )

            pDelFile = f"{projectDir}/{FDEL}"

            if fileExists(pDelFile) and not lessAgo(
                DELAY_DEL, mTime(pDelFile), iso=False
            ):
                if len(dirContents(editionsDir)[1]):
                    Messages.error(
                        logmsg=f"{headProj}: will not wipe because it is not empty"
                    )
                else:
                    nP += 1

                    if DRY:
                        Messages.info(logmsg=headProj)
                    else:
                        try:
                            dirRemove(projectDir)
                            Messages.info(logmsg=f"{headProj}: wiped")
                        except Exception as e:
                            Messages.error(
                                logmsg=f"{headProj}: failed to wipe because of {e}"
                            )

        if nP > 0:
            plural = "" if nP == 1 else "s"
            Messages.info(logmsg=f"{head}: deleted {nP:>3} project{plural}")

        if nE > 0:
            plural = "" if nE == 1 else "s"
            Messages.info(logmsg=f"{head}: deleted {nE:>3} edition{plural}")

    def sweepTemp(self):
        """Wipes all temporary directories of a certain age, typically 1 day.

        These directories all resides at the toplevel of the temp dir, and their
        names start with `tmp`.
        """
        Messages = self.Messages
        Settings = self.Settings
        tempDir = Settings.tempDir

        head = f"SWEEPER{DRYREP}-TMP: "

        nT = 0

        for tmp in dirContents(tempDir)[1]:
            tmpd = f"{tempDir}/{tmp}"

            if tmp.startswith("tmp") and not lessAgo(DELAY_TMP, mTime(tmpd), iso=False):
                nT += 1

                if DRY:
                    Messages.info(logmsg=f"{head}tempdir {tmp}")
                else:
                    try:
                        dirRemove(tmpd)
                    except Exception as e:
                        Messages.error(
                            logmsg=f"{head}Failed to remove {tmpd} " f"because of {e}"
                        )

        if nT > 0:
            plural = "" if nT == 1 else "s"
            Messages.info(logmsg=f"{head}deleted {nT:>3} tempdir{plural}")

Methods

def clean(self)

Provides the sweeper function.

This method is not the sweeper function itself, but it returns the sweeper function, which has some variables from the rest of the program bound in.

The sweeper function has three separate parts:

  • sweepMongo (for the database records)
  • sweepDirectories (for the project/edition directories)
  • sweepTemp (for the temporary directories)
def maySchedule(self)

Whether a process is allowed to schedule the sweeper.

Scheduling is suppressed if ON is False.

Also, when Flask runs in debug mode, there are two processes working. The second process is the one that gets restarted when errors occur or code is updated. It is this process that may schedule sweepers, not the first process.

def start(self)

Schedules the sweeper job.

def sweepDirectories(self)

Wipes all project/edition directories that are marked as deleted.

Such directories are marked as deleted if they contain a file named __deleted__.txt.

Note that it should not occur that projects are marked as deleted while they contain editions that are not deleted. But in case this should happen, the deletion of the project directory is prevented.

def sweepMongo(self)

Permanently deletes records marked as deleted in all tables.

def sweepTemp(self)

Wipes all temporary directories of a certain age, typically 1 day.

These directories all resides at the toplevel of the temp dir, and their names start with tmp.