From 64bd20e0bf5140a4f4270854515668b71497056c Mon Sep 17 00:00:00 2001 From: Alexey Skobkin Date: Sat, 19 Oct 2019 03:01:07 +0300 Subject: [PATCH] Optimizing data processing by separating old database data by chunks. --- magnetico-migrator.go | 41 +++++++++++++------------ old/sqlite.go | 70 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 19 deletions(-) diff --git a/magnetico-migrator.go b/magnetico-migrator.go index e37ebf8..3e164a5 100644 --- a/magnetico-migrator.go +++ b/magnetico-migrator.go @@ -17,6 +17,7 @@ import ( var ( goDatabaseUrl = kingpin.Flag("go-database", "magneticod Go version database URL.").Short('g').String() pyDatabaseUrl = kingpin.Flag("py-database", "Python version database URL.").Short('p').Required().String() + chunkSize = kingpin.Flag("chunk-size", "How many torrents to retrieve from old DB at once").Short('c').Default("50").Uint64() progress pb.ProgressBar ) @@ -63,35 +64,37 @@ func main() { } func mergeDatabases(pyDb sql.DB, goDb persistence.Database) error { - // Selecting torrents - tRows, err := pyDb.Query("SELECT id, info_hash, name, total_size, discovered_on FROM torrents ORDER BY id ASC;") + maxId, err := old.GetLastTorrentId(pyDb) if err != nil { - zap.L().Fatal("Error when querying torrents from old database.") + zap.L().Fatal("Error while getting last torrent ID from old database") } - defer tRows.Close() - for tRows.Next() { - var torrent persistence.TorrentMetadata + var lastId uint64 = 0 - progress.Increment() - - err := tRows.Scan(&torrent.ID, &torrent.InfoHash, &torrent.Name, &torrent.Size, &torrent.DiscoveredOn) + for lastId < maxId { + chunk, err := old.GetTorrentsChunk(pyDb, lastId, *chunkSize) if err != nil { - zap.L().Error("Error scanning torrent row.", zap.Error(err)) - - continue + zap.L().Error("Error while getting torrents chunk", zap.Uint64("last-id", lastId)) } - files, err := old.GetFilesForTorrent(pyDb, torrent) - if err != nil { - zap.L().Error("Error getting files for torrent.", zap.String("hash", hex.EncodeToString(torrent.InfoHash)), zap.Error(err)) + if len(chunk) == 0 { + if (maxId - lastId) <= *chunkSize { + zap.L().Warn("Probably finished processing all data (earlier). Finishing.", zap.Uint64("last", lastId), zap.Uint64("max", maxId)) - continue + return nil + } else { + zap.L().Error("Prematurely received empty chunk, aborting", zap.Uint64("last", lastId), zap.Uint64("max", maxId)) + } } - err = importTorrent(goDb, torrent, files) - if err != nil { - zap.L().Error("Error when importing torrent to Go database.", zap.String("hash", hex.EncodeToString(torrent.InfoHash)), zap.Error(err)) + for _, piece := range chunk { + lastId = piece.Torrent.ID + progress.Increment() + + err = importTorrent(goDb, piece.Torrent, piece.Files) + if err != nil { + zap.L().Error("Error when importing torrent to Go database.", zap.String("hash", hex.EncodeToString(piece.Torrent.InfoHash)), zap.Error(err)) + } } } diff --git a/old/sqlite.go b/old/sqlite.go index fa9cf8d..eecb420 100644 --- a/old/sqlite.go +++ b/old/sqlite.go @@ -2,11 +2,18 @@ package old import ( "database/sql" + "encoding/hex" "fmt" "github.com/skobkin/magnetico/pkg/persistence" + "go.uber.org/zap" "net/url" ) +type TorrentData struct { + Torrent persistence.TorrentMetadata + Files []persistence.File +} + func OpenSqliteDb(url_ url.URL) (*sql.DB, error) { url_.Scheme = "" return sql.Open("sqlite3", url_.String()) @@ -36,6 +43,69 @@ func GetNumberOfTorrents(db sql.DB) (uint, error) { } } +func GetLastTorrentId(db sql.DB) (uint64, error) { + rows, err := db.Query("SELECT MAX(id) FROM torrents;") + if err != nil { + return 0, err + } + defer rows.Close() + + if rows.Next() != true { + return 0, fmt.Errorf("No rows returned from `SELECT MAX(id)`") + } + + var n *uint64 + if err = rows.Scan(&n); err != nil { + return 0, err + } + + // If the database is empty (i.e. 0 entries in 'torrents') then the query will return nil. + if n == nil { + return 0, nil + } else { + return *n, nil + } +} + +func GetTorrentsChunk(db sql.DB, fromId uint64, size uint64) ([]TorrentData, error) { + chunk := make([]TorrentData, 0) + + // Selecting torrents + tRows, err := db.Query(` + SELECT id, info_hash, name, total_size, discovered_on + FROM torrents + WHERE id > ? + ORDER BY id ASC + LIMIT ?; + `, fromId, size) + if err != nil { + zap.L().Fatal("Error when querying torrents from old database.") + } + defer tRows.Close() + + for tRows.Next() { + var torrent persistence.TorrentMetadata + + err := tRows.Scan(&torrent.ID, &torrent.InfoHash, &torrent.Name, &torrent.Size, &torrent.DiscoveredOn) + if err != nil { + zap.L().Error("Error scanning torrent row.", zap.Error(err)) + + return chunk, err + } + + files, err := GetFilesForTorrent(db, torrent) + if err != nil { + zap.L().Error("Error getting files for torrent.", zap.String("hash", hex.EncodeToString(torrent.InfoHash)), zap.Error(err)) + + return chunk, err + } + + chunk = append(chunk, TorrentData{Torrent: torrent, Files: files}) + } + + return chunk, nil +} + func GetFilesForTorrent(pyDb sql.DB, torrent persistence.TorrentMetadata) ([]persistence.File, error) { files := make([]persistence.File, 0)